@@ -76,16 +76,19 @@ Besides Spark's configuration, we specify that any DStream will be processed
7676in 1 second batches.
7777
7878{% highlight scala %}
79- // Create a StreamingContext with a SparkConf configuration
80- val ssc = new StreamingContext(sparkConf, Seconds(1))
79+ import org.apache.spark.api.java.function._
80+ import org.apache.spark.streaming._
81+ import org.apache.spark.streaming.api._
82+ // Create a StreamingContext with a local master
83+ val ssc = new StreamingContext("local", "NetworkWordCount", Seconds(1))
8184{% endhighlight %}
8285
8386Using this context, we then create a new DStream
8487by specifying the IP address and port of the data server.
8588
8689{% highlight scala %}
87- // Create a DStream that will connect to serverIP: serverPort
88- val lines = ssc.socketTextStream(serverIP, serverPort )
90+ // Create a DStream that will connect to serverIP: serverPort , like localhost:9999
91+ val lines = ssc.socketTextStream("localhost", 9999 )
8992{% endhighlight %}
9093
9194This ` lines ` DStream represents the stream of data that will be received from the data
@@ -103,6 +106,7 @@ each line will be split into multiple words and the stream of words is represent
103106` words ` DStream. Next, we want to count these words.
104107
105108{% highlight scala %}
109+ import org.apache.spark.streaming.StreamingContext._
106110// Count each word in each batch
107111val pairs = words.map(word => (word, 1))
108112val wordCounts = pairs.reduceByKey(_ + _ )
@@ -138,16 +142,20 @@ functionality. Besides Spark's configuration, we specify that any DStream would
138142in 1 second batches.
139143
140144{% highlight java %}
141- // Create a StreamingContext with a SparkConf configuration
142- JavaStreamingContext jssc = StreamingContext(sparkConf, new Duration(1000))
145+ import org.apache.spark.api.java.function.* ;
146+ import org.apache.spark.streaming.* ;
147+ import org.apache.spark.streaming.api.java.* ;
148+ import scala.Tuple2;
149+ // Create a StreamingContext with a local master
150+ JavaStreamingContext jssc = new JavaStreamingContext("local", "JavaNetworkWordCount", new Duration(1000))
143151{% endhighlight %}
144152
145153Using this context, we then create a new DStream
146154by specifying the IP address and port of the data server.
147155
148156{% highlight java %}
149- // Create a DStream that will connect to serverIP: serverPort
150- JavaDStream<String > lines = jssc.socketTextStream(serverIP, serverPort );
157+ // Create a DStream that will connect to serverIP: serverPort , like localhost:9999
158+ JavaDStream<String > lines = jssc.socketTextStream("localhost", 9999 );
151159{% endhighlight %}
152160
153161This ` lines ` DStream represents the stream of data that will be received from the data
@@ -159,7 +167,7 @@ space into words.
159167JavaDStream<String > words = lines.flatMap(
160168 new FlatMapFunction<String, String>() {
161169 @Override public Iterable<String > call(String x) {
162- return Lists.newArrayList (x.split(" "));
170+ return Arrays.asList (x.split(" "));
163171 }
164172 });
165173{% endhighlight %}
@@ -359,7 +367,7 @@ as explained earlier. Finally, the last two parameters are needed to deploy your
359367 if running in distributed mode, as described in the
360368 [ Spark programming guide] ( scala-programming-guide.html#deploying-code-on-a-cluster ) .
361369 Additionally, the underlying SparkContext can be accessed as
362- ` streamingContext .sparkContext` .
370+ ` ssc .sparkContext` .
363371
364372The batch interval must be set based on the latency requirements of your application
365373and available cluster resources. See the [ Performance Tuning] ( #setting-the-right-batch-size )
@@ -399,7 +407,7 @@ These operations are discussed in detail in later sections.
399407
400408## Input Sources
401409
402- We have already taken a look at the ` streamingContext .socketTextStream(...)` in the [ quick
410+ We have already taken a look at the ` ssc .socketTextStream(...)` in the [ quick
403411example] ( #a-quick-example ) which creates a DStream from text
404412data received over a TCP socket connection. Besides sockets, the core Spark Streaming API provides
405413methods for creating DStreams from files and Akka actors as input sources.
@@ -409,12 +417,12 @@ Specifically, for files, the DStream can be created as
409417<div class =" codetabs " >
410418<div data-lang =" scala " >
411419{% highlight scala %}
412- streamingContext .fileStream(dataDirectory)
420+ ssc .fileStream(dataDirectory)
413421{% endhighlight %}
414422</div >
415423<div data-lang =" java " >
416424{% highlight java %}
417- javaStreamingContext .fileStream(dataDirectory);
425+ jssc .fileStream(dataDirectory);
418426{% endhighlight %}
419427</div >
420428</div >
@@ -443,13 +451,13 @@ project dependencies, you can create a DStream from Kafka as
443451<div data-lang =" scala " >
444452{% highlight scala %}
445453import org.apache.spark.streaming.kafka._
446- KafkaUtils.createStream(streamingContext , kafkaParams, ...)
454+ KafkaUtils.createStream(ssc , kafkaParams, ...)
447455{% endhighlight %}
448456</div >
449457<div data-lang =" java " >
450458{% highlight java %}
451- import org.apache.spark.streaming.kafka.*
452- KafkaUtils.createStream(javaStreamingContext , kafkaParams, ...);
459+ import org.apache.spark.streaming.kafka.* ;
460+ KafkaUtils.createStream(jssc , kafkaParams, ...);
453461{% endhighlight %}
454462</div >
455463</div >
@@ -578,13 +586,14 @@ val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
578586<div data-lang =" java " markdown =" 1 " >
579587
580588{% highlight java %}
589+ import com.google.common.base.Optional;
581590Function2<List<Integer >, Optional<Integer >, Optional<Integer >> updateFunction =
582591 new Function2<List<Integer >, Optional<Integer >, Optional<Integer >>() {
583592 @Override public Optional<Integer > call(List<Integer > values, Optional<Integer > state) {
584593 Integer newSum = ... // add the new values with the previous running count to get the new count
585- return Optional.of(newSum)
594+ return Optional.of(newSum);
586595 }
587- }
596+ };
588597{% endhighlight %}
589598
590599This is applied on a DStream containing words (say, the ` pairs ` DStream containing `(word,
@@ -617,9 +626,9 @@ spam information (maybe generated with Spark as well) and then filtering based o
617626<div data-lang =" scala " markdown =" 1 " >
618627
619628{% highlight scala %}
620- val spamInfoRDD = sparkContext.hadoopFile (...) // RDD containing spam information
629+ val spamInfoRDD = ssc. sparkContext.newAPIHadoopRDD (...) // RDD containing spam information
621630
622- val cleanedDStream = inputDStream .transform(rdd => {
631+ val cleanedDStream = wordCounts .transform(rdd => {
623632 rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
624633 ...
625634})
@@ -629,13 +638,14 @@ val cleanedDStream = inputDStream.transform(rdd => {
629638<div data-lang =" java " markdown =" 1 " >
630639
631640{% highlight java %}
641+ import org.apache.spark.streaming.api.java.* ;
632642// RDD containing spam information
633- JavaPairRDD<String, Double> spamInfoRDD = javaSparkContext.hadoopFile (...);
643+ final JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD (...);
634644
635- JavaPairDStream<String, Integer> cleanedDStream = inputDStream .transform(
645+ JavaPairDStream<String, Integer> cleanedDStream = wordCounts .transform(
636646 new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() {
637647 @Override public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> rdd) throws Exception {
638- rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
648+ rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
639649 ...
640650 }
641651 });
@@ -684,7 +694,7 @@ operation `reduceByKeyAndWindow`.
684694
685695{% highlight scala %}
686696// Reduce last 30 seconds of data, every 10 seconds
687- val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _ , Seconds(30), Seconds(10))
697+ val windowedWordCounts = pairs.reduceByKeyAndWindow((a : Int ,b : Int ) => (a + b) , Seconds(30), Seconds(10))
688698{% endhighlight %}
689699
690700</div >
@@ -699,7 +709,7 @@ Function2<Integer, Integer, Integer> reduceFunc = new Function2<Integer, Integer
699709};
700710
701711// Reduce last 30 seconds of data, every 10 seconds
702- JavaPairDStream<String, Integer> windowedWordCounts = pair .reduceByKeyAndWindow(reduceFunc, new Duration(30000), new Duration(10000));
712+ JavaPairDStream<String, Integer> windowedWordCounts = pairs .reduceByKeyAndWindow(reduceFunc, new Duration(30000), new Duration(10000));
703713{% endhighlight %}
704714
705715</div >
@@ -1087,7 +1097,7 @@ This behavior is made simple by using `JavaStreamingContext.getOrCreate`. This i
10871097{% highlight java %}
10881098// Create a factory object that can create a and setup a new JavaStreamingContext
10891099JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
1090- JavaStreamingContextFactory create() {
1100+ @ Override public JavaStreamingContext create() {
10911101 JavaStreamingContext jssc = new JavaStreamingContext(...); // new context
10921102 JavaDStream<String > lines = jssc.socketTextStream(...); // create DStreams
10931103 ...
0 commit comments