Skip to content

Commit f9ed2b6

Browse files
zsxwingtdas
authored andcommitted
[SPARK-4608][Streaming] Reorganize StreamingContext implicit to improve API convenience
There is only one implicit function `toPairDStreamFunctions` in `StreamingContext`. This PR did similar reorganization like [SPARK-4397](https://issues.apache.org/jira/browse/SPARK-4397). Compiled the following codes with Spark Streaming 1.1.0 and ran it with this PR. Everything is fine. ```Scala import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ object StreamingApp { def main(args: Array[String]) { val conf = new SparkConf().setMaster("local[2]").setAppName("FileWordCount") val ssc = new StreamingContext(conf, Seconds(10)) val lines = ssc.textFileStream("/some/path") val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } } ``` Author: zsxwing <[email protected]> Closes #3464 from zsxwing/SPARK-4608 and squashes the following commits: aa6d44a [zsxwing] Fix a copy-paste error f74c190 [zsxwing] Merge branch 'master' into SPARK-4608 e6f9cc9 [zsxwing] Update the docs 27833bb [zsxwing] Remove `import StreamingContext._` c15162c [zsxwing] Reorganize StreamingContext implicit to improve API convenience
1 parent f205fe4 commit f9ed2b6

File tree

26 files changed

+60
-37
lines changed

26 files changed

+60
-37
lines changed

docs/streaming-programming-guide.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ main entry point for all streaming functionality. We create a local StreamingCon
7575
{% highlight scala %}
7676
import org.apache.spark._
7777
import org.apache.spark.streaming._
78-
import org.apache.spark.streaming.StreamingContext._
78+
import org.apache.spark.streaming.StreamingContext._ // not necessary in Spark 1.3+
7979

8080
// Create a local StreamingContext with two working thread and batch interval of 1 second.
8181
// The master requires 2 cores to prevent from a starvation scenario.
@@ -107,7 +107,7 @@ each line will be split into multiple words and the stream of words is represent
107107
`words` DStream. Next, we want to count these words.
108108

109109
{% highlight scala %}
110-
import org.apache.spark.streaming.StreamingContext._
110+
import org.apache.spark.streaming.StreamingContext._ // not necessary in Spark 1.3+
111111
// Count each word in each batch
112112
val pairs = words.map(word => (word, 1))
113113
val wordCounts = pairs.reduceByKey(_ + _)

examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import java.util.Properties
2222
import kafka.producer._
2323

2424
import org.apache.spark.streaming._
25-
import org.apache.spark.streaming.StreamingContext._
2625
import org.apache.spark.streaming.kafka._
2726
import org.apache.spark.SparkConf
2827

examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import java.net.Socket
2323
import org.apache.spark.{SparkConf, Logging}
2424
import org.apache.spark.storage.StorageLevel
2525
import org.apache.spark.streaming.{Seconds, StreamingContext}
26-
import org.apache.spark.streaming.StreamingContext._
2726
import org.apache.spark.streaming.receiver.Receiver
2827

2928
/**

examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark.examples.streaming
1919

2020
import org.apache.spark.SparkConf
2121
import org.apache.spark.streaming.{Seconds, StreamingContext}
22-
import org.apache.spark.streaming.StreamingContext._
2322

2423
/**
2524
* Counts words in new text files created in the given directory

examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
2222

2323
import org.apache.spark.storage.StorageLevel
2424
import org.apache.spark.streaming.{Seconds, StreamingContext}
25-
import org.apache.spark.streaming.StreamingContext._
2625
import org.apache.spark.streaming.mqtt._
2726
import org.apache.spark.SparkConf
2827

examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark.examples.streaming
1919

2020
import org.apache.spark.SparkConf
2121
import org.apache.spark.streaming.{Seconds, StreamingContext}
22-
import org.apache.spark.streaming.StreamingContext._
2322
import org.apache.spark.storage.StorageLevel
2423

2524
/**

examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import scala.collection.mutable.SynchronizedQueue
2222
import org.apache.spark.SparkConf
2323
import org.apache.spark.rdd.RDD
2424
import org.apache.spark.streaming.{Seconds, StreamingContext}
25-
import org.apache.spark.streaming.StreamingContext._
2625

2726
object QueueStream {
2827

examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import com.google.common.io.Files
2525
import org.apache.spark.SparkConf
2626
import org.apache.spark.rdd.RDD
2727
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
28-
import org.apache.spark.streaming.StreamingContext._
2928
import org.apache.spark.util.IntParam
3029

3130
/**

examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package org.apache.spark.examples.streaming
2020
import org.apache.spark.SparkConf
2121
import org.apache.spark.HashPartitioner
2222
import org.apache.spark.streaming._
23-
import org.apache.spark.streaming.StreamingContext._
2423

2524
/**
2625
* Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every

examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import org.apache.spark.SparkConf
2323
import org.apache.spark.SparkContext._
2424
import org.apache.spark.storage.StorageLevel
2525
import org.apache.spark.streaming.{Seconds, StreamingContext}
26-
import org.apache.spark.streaming.StreamingContext._
2726
import org.apache.spark.streaming.twitter._
2827

2928
// scalastyle:off

0 commit comments

Comments
 (0)