@@ -60,6 +60,19 @@ public static void main(String[] args) {
6060
6161 StreamingExamples .setStreamingLogLevels ();
6262
63+ // Update the cumulative count function
64+ final Function2 <List <Integer >, Optional <Integer >, Optional <Integer >> updateFunction = new
65+ Function2 <List <Integer >, Optional <Integer >, Optional <Integer >>() {
66+ @ Override public Optional <Integer > call (List <Integer > values , Optional <Integer > state ) {
67+ Integer newSum = state .or (0 );
68+ for (Integer value : values ) {
69+ newSum += value ;
70+ }
71+
72+ return Optional .of (newSum );
73+ }
74+ };
75+
6376 // Create the context with a 1 second batch size
6477 SparkConf sparkConf = new SparkConf ().setAppName ("JavaStatefulNetworkWordCount" );
6578 JavaStreamingContext ssc = new JavaStreamingContext (sparkConf , Durations .seconds (1 ));
@@ -87,20 +100,6 @@ public Tuple2<String, Integer> call(String s) {
87100 }
88101 });
89102
90- // Update the cumulative count function
91- final Function2 <List <Integer >, Optional <Integer >, Optional <Integer >> updateFunction = new
92- Function2 <List <Integer >, Optional <Integer >, Optional <Integer >>() {
93- @ Override public Optional <Integer > call (List <Integer > values , Optional <Integer > state )
94- throws Exception {
95- Integer newSum = state .or (0 );
96- for (Integer value : values ) {
97- newSum += value ;
98- }
99-
100- return Optional .of (newSum );
101- }
102- };
103-
104103 // This will give a Dstream made of state (which is the cumulative count of the words)
105104 JavaPairDStream <String , Integer > stateDstream = wordsDstream .updateStateByKey (updateFunction );
106105
0 commit comments