From 92db405a716a1b294387acdb45fd29a456986f6d Mon Sep 17 00:00:00 2001 From: gasparms Date: Fri, 13 Feb 2015 13:26:36 +0100 Subject: [PATCH 1/6] Fix Streaming Programming Guide. Change files according the selected language --- docs/streaming-programming-guide.md | 21 +++- .../JavaStatefulNetworkWordCount.java | 111 ++++++++++++++++++ 2 files changed, 128 insertions(+), 4 deletions(-) create mode 100644 examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 96fb12ce5e0b..1e618ba2a721 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -878,6 +878,12 @@ This is applied on a DStream containing words (say, the `pairs` DStream containi val runningCounts = pairs.updateStateByKey[Int](updateFunction _) {% endhighlight %} +The update function will be called for each word, with `newValues` having a sequence of 1's (from +the `(word, 1)` pairs) and the `runningCount` having the previous count. For the complete +Scala code, take a look at the example +[StatefulNetworkWordCount.scala]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache +/spark/examples/streaming/StatefulNetworkWordCount.scala). +
@@ -899,6 +905,13 @@ This is applied on a DStream containing words (say, the `pairs` DStream containi JavaPairDStream runningCounts = pairs.updateStateByKey(updateFunction); {% endhighlight %} +The update function will be called for each word, with `newValues` having a sequence of 1's (from +the `(word, 1)` pairs) and the `runningCount` having the previous count. For the complete +Scala code, take a look at the example +[JavaStatefulNetworkWordCount.java]({{site +.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming +/JavaStatefulNetworkWordCount.java). +
@@ -916,14 +929,14 @@ This is applied on a DStream containing words (say, the `pairs` DStream containi runningCounts = pairs.updateStateByKey(updateFunction) {% endhighlight %} -
- - The update function will be called for each word, with `newValues` having a sequence of 1's (from the `(word, 1)` pairs) and the `runningCount` having the previous count. For the complete -Scala code, take a look at the example +Python code, take a look at the example [stateful_network_wordcount.py]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/stateful_network_wordcount.py). + + + Note that using `updateStateByKey` requires the checkpoint directory to be configured, which is discussed in detail in the [checkpointing](#checkpointing) section. diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java new file mode 100644 index 000000000000..de9df2a13583 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming; + +import java.util.List; +import java.util.regex.Pattern; + +import scala.Tuple2; + +import com.google.common.base.Optional; +import com.google.common.collect.Lists; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.StorageLevels; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.streaming.Durations; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; + +/** + * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every + * second starting with initial value of word count. + * Usage: JavaStatefulNetworkWordCount + * and describe the TCP server that Spark Streaming would connect to receive + * data. + *

+ * To run this on your local machine, you need to first run a Netcat server + * `$ nc -lk 9999` + * and then run the example + * `$ bin/run-example + * org.apache.spark.examples.streaming.JavaStatefulNetworkWordCount localhost 9999` + */ +public class JavaStatefulNetworkWordCount { + private static final Pattern SPACE = Pattern.compile(" "); + + public static void main(String[] args) { + if (args.length < 2) { + System.err.println("Usage: StatefulNetworkWordCount "); + System.exit(1); + } + + StreamingExamples.setStreamingLogLevels(); + + // Create the context with a 1 second batch size + SparkConf sparkConf = new SparkConf().setAppName("JavaStatefulNetworkWordCount"); + JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); + ssc.checkpoint("."); + + // Create a JavaReceiverInputDStream on target ip:port and count the + // words in input stream of \n delimited text (eg. generated by 'nc') + // Note that no duplication in storage level only for running locally. + // Replication necessary in distributed scenario for fault tolerance. + JavaReceiverInputDStream lines = ssc.socketTextStream( + args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER_2); + + JavaDStream words = lines.flatMap(new FlatMapFunction() { + @Override + public Iterable call(String x) { + return Lists.newArrayList(SPACE.split(x)); + } + }); + + JavaPairDStream wordsDstream = words.mapToPair( + new PairFunction() { + @Override + public Tuple2 call(String s) { + return new Tuple2(s, 1); + } + }); + + // Update the cumulative count function + final Function2, Optional, Optional> updateFunction = new + Function2, Optional, Optional>() { + @Override public Optional call(List values, Optional state) + throws Exception { + Integer newSum = state.or(0); + for (Integer value : values) { + newSum += value; + } + + return Optional.of(newSum); + } + }; + + // This will give a Dstream made of state (which is the cumulative count of the words) + JavaPairDStream stateDstream = wordsDstream.updateStateByKey(updateFunction); + + stateDstream.print(); + ssc.start(); + ssc.awaitTermination(); + } +} From e92e6b80d14288c97223be71a587ce7187a870e2 Mon Sep 17 00:00:00 2001 From: gasparms Date: Fri, 13 Feb 2015 13:36:40 +0100 Subject: [PATCH 2/6] Fix incoherence --- docs/streaming-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 1e618ba2a721..997de9511ca3 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -907,7 +907,7 @@ JavaPairDStream runningCounts = pairs.updateStateByKey(updateFu The update function will be called for each word, with `newValues` having a sequence of 1's (from the `(word, 1)` pairs) and the `runningCount` having the previous count. For the complete -Scala code, take a look at the example +Java code, take a look at the example [JavaStatefulNetworkWordCount.java]({{site .SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming /JavaStatefulNetworkWordCount.java). From 4d8785c338b9a60d1556a2a902ce35486922643f Mon Sep 17 00:00:00 2001 From: gasparms Date: Fri, 13 Feb 2015 17:27:01 +0100 Subject: [PATCH 3/6] Remove throw exception --- .../JavaStatefulNetworkWordCount.java | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java index de9df2a13583..2f9661333827 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java @@ -60,6 +60,19 @@ public static void main(String[] args) { StreamingExamples.setStreamingLogLevels(); + // Update the cumulative count function + final Function2, Optional, Optional> updateFunction = new + Function2, Optional, Optional>() { + @Override public Optional call(List values, Optional state) { + Integer newSum = state.or(0); + for (Integer value : values) { + newSum += value; + } + + return Optional.of(newSum); + } + }; + // Create the context with a 1 second batch size SparkConf sparkConf = new SparkConf().setAppName("JavaStatefulNetworkWordCount"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); @@ -87,20 +100,6 @@ public Tuple2 call(String s) { } }); - // Update the cumulative count function - final Function2, Optional, Optional> updateFunction = new - Function2, Optional, Optional>() { - @Override public Optional call(List values, Optional state) - throws Exception { - Integer newSum = state.or(0); - for (Integer value : values) { - newSum += value; - } - - return Optional.of(newSum); - } - }; - // This will give a Dstream made of state (which is the cumulative count of the words) JavaPairDStream stateDstream = wordsDstream.updateStateByKey(updateFunction); From f527328fc82c2d01b949ab8fbc7cfe1197fdb81a Mon Sep 17 00:00:00 2001 From: gasparms Date: Fri, 13 Feb 2015 18:03:22 +0100 Subject: [PATCH 4/6] Improve example to look like scala example --- .../JavaStatefulNetworkWordCount.java | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java index 2f9661333827..5284ba9343f0 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java @@ -17,15 +17,14 @@ package org.apache.spark.examples.streaming; +import static java.util.Arrays.asList; + import java.util.List; import java.util.regex.Pattern; -import scala.Tuple2; - -import com.google.common.base.Optional; -import com.google.common.collect.Lists; - +import org.apache.spark.HashPartitioner; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.StorageLevels; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; @@ -36,6 +35,11 @@ import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; +import com.google.common.base.Optional; +import com.google.common.collect.Lists; + +import scala.Tuple2; + /** * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every * second starting with initial value of word count. @@ -78,10 +82,11 @@ public static void main(String[] args) { JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); ssc.checkpoint("."); - // Create a JavaReceiverInputDStream on target ip:port and count the - // words in input stream of \n delimited text (eg. generated by 'nc') - // Note that no duplication in storage level only for running locally. - // Replication necessary in distributed scenario for fault tolerance. + // Initial RDD input to updateStateByKey + JavaPairRDD initialRDD = ssc.sc() + .parallelizePairs(asList(new Tuple2("hello", 1), new Tuple2 + ("world", 1))); + JavaReceiverInputDStream lines = ssc.socketTextStream( args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER_2); @@ -101,7 +106,8 @@ public Tuple2 call(String s) { }); // This will give a Dstream made of state (which is the cumulative count of the words) - JavaPairDStream stateDstream = wordsDstream.updateStateByKey(updateFunction); + JavaPairDStream stateDstream = wordsDstream.updateStateByKey(updateFunction, new + HashPartitioner(ssc.sc().defaultParallelism()), initialRDD); stateDstream.print(); ssc.start(); From ec202b0b88f970e4c764917b1a16ecd8021b974e Mon Sep 17 00:00:00 2001 From: gasparms Date: Fri, 13 Feb 2015 20:29:40 +0100 Subject: [PATCH 5/6] Follow spark style guide --- .../JavaStatefulNetworkWordCount.java | 127 +++++++++--------- 1 file changed, 63 insertions(+), 64 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java index 5284ba9343f0..7c03e2ff328c 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java @@ -17,11 +17,15 @@ package org.apache.spark.examples.streaming; -import static java.util.Arrays.asList; - +import java.util.Arrays; import java.util.List; import java.util.regex.Pattern; +import scala.Tuple2; + +import com.google.common.base.Optional; +import com.google.common.collect.Lists; + import org.apache.spark.HashPartitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; @@ -35,10 +39,6 @@ import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; -import com.google.common.base.Optional; -import com.google.common.collect.Lists; - -import scala.Tuple2; /** * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every @@ -54,63 +54,62 @@ * org.apache.spark.examples.streaming.JavaStatefulNetworkWordCount localhost 9999` */ public class JavaStatefulNetworkWordCount { - private static final Pattern SPACE = Pattern.compile(" "); - - public static void main(String[] args) { - if (args.length < 2) { - System.err.println("Usage: StatefulNetworkWordCount "); - System.exit(1); - } - - StreamingExamples.setStreamingLogLevels(); - - // Update the cumulative count function - final Function2, Optional, Optional> updateFunction = new - Function2, Optional, Optional>() { - @Override public Optional call(List values, Optional state) { - Integer newSum = state.or(0); - for (Integer value : values) { - newSum += value; - } - - return Optional.of(newSum); - } - }; - - // Create the context with a 1 second batch size - SparkConf sparkConf = new SparkConf().setAppName("JavaStatefulNetworkWordCount"); - JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); - ssc.checkpoint("."); - - // Initial RDD input to updateStateByKey - JavaPairRDD initialRDD = ssc.sc() - .parallelizePairs(asList(new Tuple2("hello", 1), new Tuple2 - ("world", 1))); - - JavaReceiverInputDStream lines = ssc.socketTextStream( - args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER_2); - - JavaDStream words = lines.flatMap(new FlatMapFunction() { - @Override - public Iterable call(String x) { - return Lists.newArrayList(SPACE.split(x)); - } - }); - - JavaPairDStream wordsDstream = words.mapToPair( - new PairFunction() { - @Override - public Tuple2 call(String s) { - return new Tuple2(s, 1); - } - }); - - // This will give a Dstream made of state (which is the cumulative count of the words) - JavaPairDStream stateDstream = wordsDstream.updateStateByKey(updateFunction, new - HashPartitioner(ssc.sc().defaultParallelism()), initialRDD); - - stateDstream.print(); - ssc.start(); - ssc.awaitTermination(); + private static final Pattern SPACE = Pattern.compile(" "); + + public static void main(String[] args) { + if (args.length < 2) { + System.err.println("Usage: JavaStatefulNetworkWordCount "); + System.exit(1); } + + StreamingExamples.setStreamingLogLevels(); + + // Update the cumulative count function + final Function2, Optional, Optional> updateFunction = new + Function2, Optional, Optional>() { + @Override + public Optional call(List values, Optional state) { + Integer newSum = state.or(0); + for (Integer value : values) { + newSum += value; + } + return Optional.of(newSum); + } + }; + + // Create the context with a 1 second batch size + SparkConf sparkConf = new SparkConf().setAppName("JavaStatefulNetworkWordCount"); + JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); + ssc.checkpoint("."); + + // Initial RDD input to updateStateByKey + JavaPairRDD initialRDD = ssc.sc() + .parallelizePairs(Arrays.asList(new Tuple2("hello", 1), new Tuple2 + ("world", 1))); + + JavaReceiverInputDStream lines = ssc.socketTextStream( + args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER_2); + + JavaDStream words = lines.flatMap(new FlatMapFunction() { + @Override + public Iterable call(String x) { + return Lists.newArrayList(SPACE.split(x)); + } + }); + + JavaPairDStream wordsDstream = words.mapToPair(new PairFunction() { + @Override + public Tuple2 call(String s) { + return new Tuple2(s, 1); + } + }); + + // This will give a Dstream made of state (which is the cumulative count of the words) + JavaPairDStream stateDstream = wordsDstream.updateStateByKey(updateFunction, new + HashPartitioner(ssc.sc().defaultParallelism()), initialRDD); + + stateDstream.print(); + ssc.start(); + ssc.awaitTermination(); + } } From 7f37f89a2893cc2e1059d6535508c6766ee63c01 Mon Sep 17 00:00:00 2001 From: gasparms Date: Sat, 14 Feb 2015 15:55:38 +0100 Subject: [PATCH 6/6] More style changes --- .../streaming/JavaStatefulNetworkWordCount.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java index 7c03e2ff328c..09491fe30082 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java @@ -83,9 +83,9 @@ public Optional call(List values, Optional state) { ssc.checkpoint("."); // Initial RDD input to updateStateByKey - JavaPairRDD initialRDD = ssc.sc() - .parallelizePairs(Arrays.asList(new Tuple2("hello", 1), new Tuple2 - ("world", 1))); + List> tuples = Arrays.asList(new Tuple2("hello", 1), + new Tuple2("world", 1)); + JavaPairRDD initialRDD = ssc.sc().parallelizePairs(tuples); JavaReceiverInputDStream lines = ssc.socketTextStream( args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER_2); @@ -105,8 +105,8 @@ public Tuple2 call(String s) { }); // This will give a Dstream made of state (which is the cumulative count of the words) - JavaPairDStream stateDstream = wordsDstream.updateStateByKey(updateFunction, new - HashPartitioner(ssc.sc().defaultParallelism()), initialRDD); + JavaPairDStream stateDstream = wordsDstream.updateStateByKey(updateFunction, + new HashPartitioner(ssc.sc().defaultParallelism()), initialRDD); stateDstream.print(); ssc.start();