- bin/run-example streaming.KinesisWordCountASL [Kinesis stream name] [endpoint URL]
+ bin/run-example streaming.KinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]
- bin/run-example streaming.JavaKinesisWordCountASL [Kinesis stream name] [endpoint URL]
+ bin/run-example streaming.JavaKinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]
@@ -136,7 +142,7 @@ To run the example,
- To generate random string data to put onto the Kinesis stream, in another terminal, run the associated Kinesis data producer.
- bin/run-example streaming.KinesisWordCountProducerASL [Kinesis stream name] [endpoint URL] 1000 10
+ bin/run-example streaming.KinesisWordProducerASL [Kinesis stream name] [endpoint URL] 1000 10
This will push 1000 lines per second of 10 random numbers per line to the Kinesis stream. This data should then be received and processed by the running example.
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 42b33947873b0..836f0473597d8 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -77,7 +77,7 @@ main entry point for all streaming functionality. We create a local StreamingCon
{% highlight scala %}
import org.apache.spark._
import org.apache.spark.streaming._
-import org.apache.spark.streaming.StreamingContext._ // not necessary in Spark 1.3+
+import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.
@@ -109,7 +109,7 @@ each line will be split into multiple words and the stream of words is represent
`words` DStream. Next, we want to count these words.
{% highlight scala %}
-import org.apache.spark.streaming.StreamingContext._ // not necessary in Spark 1.3+
+import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
@@ -682,7 +682,7 @@ for Java, and [StreamingContext](api/python/pyspark.streaming.html#pyspark.strea
### Advanced Sources
{:.no_toc}
-
{% highlight scala %}
-/** Lazily instantiated singleton instance of SQLContext */
-object SQLContextSingleton {
- @transient private var instance: SQLContext = null
-
- // Instantiate SQLContext on demand
- def getInstance(sparkContext: SparkContext): SQLContext = synchronized {
- if (instance == null) {
- instance = new SQLContext(sparkContext)
- }
- instance
- }
-}
-
-...
-
-/** Case class for converting RDD to DataFrame */
-case class Row(word: String)
-
-...
-
/** DataFrame operations inside your streaming program */
val words: DStream[String] = ...
@@ -1454,11 +1435,11 @@ val words: DStream[String] = ...
words.foreachRDD { rdd =>
// Get the singleton instance of SQLContext
- val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
+ val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
import sqlContext.implicits._
- // Convert RDD[String] to RDD[case class] to DataFrame
- val wordsDataFrame = rdd.map(w => Row(w)).toDF()
+ // Convert RDD[String] to DataFrame
+ val wordsDataFrame = rdd.toDF("word")
// Register as table
wordsDataFrame.registerTempTable("words")
@@ -1476,19 +1457,6 @@ See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/ma
{% highlight java %}
-/** Lazily instantiated singleton instance of SQLContext */
-class JavaSQLContextSingleton {
- static private transient SQLContext instance = null;
- static public SQLContext getInstance(SparkContext sparkContext) {
- if (instance == null) {
- instance = new SQLContext(sparkContext);
- }
- return instance;
- }
-}
-
-...
-
/** Java Bean class for converting RDD to DataFrame */
public class JavaRow implements java.io.Serializable {
private String word;
@@ -1512,7 +1480,9 @@ words.foreachRDD(
new Function2, Time, Void>() {
@Override
public Void call(JavaRDD rdd, Time time) {
- SQLContext sqlContext = JavaSQLContextSingleton.getInstance(rdd.context());
+
+ // Get the singleton instance of SQLContext
+ SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());
// Convert RDD[String] to RDD[case class] to DataFrame
JavaRDD rowRDD = rdd.map(new Function() {
@@ -2234,7 +2204,7 @@ The following table summarizes the semantics under failures:
### With Kafka Direct API
{:.no_toc}
-In Spark 1.3, we have introduced a new Kafka Direct API, which can ensure that all the Kafka data is received by Spark Streaming exactly once. Along with this, if you implement exactly-once output operation, you can achieve end-to-end exactly-once guarantees. This approach (experimental as of Spark 1.3) is further discussed in the [Kafka Integration Guide](streaming-kafka-integration.html).
+In Spark 1.3, we have introduced a new Kafka Direct API, which can ensure that all the Kafka data is received by Spark Streaming exactly once. Along with this, if you implement exactly-once output operation, you can achieve end-to-end exactly-once guarantees. This approach (experimental as of Spark {{site.SPARK_VERSION_SHORT}}) is further discussed in the [Kafka Integration Guide](streaming-kafka-integration.html).
## Semantics of output operations
{:.no_toc}
@@ -2248,8 +2218,16 @@ additional effort may be necessary to achieve exactly-once semantics. There are
- *Transactional updates*: All updates are made transactionally so that updates are made exactly once atomically. One way to do this would be the following.
- - Use the batch time (available in `foreachRDD`) and the partition index of the transformed RDD to create an identifier. This identifier uniquely identifies a blob data in the streaming application.
- - Update external system with this blob transactionally (that is, exactly once, atomically) using the identifier. That is, if the identifier is not already committed, commit the partition data and the identifier atomically. Else if this was already committed, skip the update.
+ - Use the batch time (available in `foreachRDD`) and the partition index of the RDD to create an identifier. This identifier uniquely identifies a blob data in the streaming application.
+ - Update external system with this blob transactionally (that is, exactly once, atomically) using the identifier. That is, if the identifier is not already committed, commit the partition data and the identifier atomically. Else if this was already committed, skip the update.
+
+ dstream.foreachRDD { (rdd, time) =>
+ rdd.foreachPartition { partitionIterator =>
+ val partitionId = TaskContext.get.partitionId()
+ val uniqueId = generateUniqueId(time.milliseconds, partitionId)
+ // use this uniqueId to transactionally commit the data in partitionIterator
+ }
+ }
***************************************************************************************************