Skip to content

Commit e9471d3

Browse files
committed
[SPARK-7284] [STREAMING] Updated streaming documentation
- Kinesis API updated - Kafka version updated, and Python API for Direct Kafka added - Added SQLContext.getOrCreate() - Added information on how to get partitionId in foreachRDD Author: Tathagata Das <[email protected]> Closes apache#6781 from tdas/SPARK-7284 and squashes the following commits: aac7be0 [Tathagata Das] Added information on how to get partition id a66ec22 [Tathagata Das] Complete the line incomplete line, a92ca39 [Tathagata Das] Updated streaming documentation
1 parent 8860405 commit e9471d3

File tree

3 files changed

+50
-56
lines changed

3 files changed

+50
-56
lines changed

docs/streaming-kafka-integration.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,13 @@ Next, we discuss how to use this approach in your streaming application.
118118
See the [API docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html)
119119
and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java).
120120

121+
</div>
122+
<div data-lang="python" markdown="1">
123+
from pyspark.streaming.kafka import KafkaUtils
124+
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
125+
126+
By default, the Python API will decode Kafka data as UTF8 encoded strings. You can specify your custom decoding function to decode the byte arrays in Kafka records to any arbitrary data type. See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils)
127+
and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/direct_kafka_wordcount.py).
121128
</div>
122129
</div>
123130

@@ -147,10 +154,13 @@ Next, we discuss how to use this approach in your streaming application.
147154
}
148155
);
149156
</div>
157+
<div data-lang="python" markdown="1">
158+
Not supported
150159
</div>
160+
</div>
151161

152162
You can use this to update Zookeeper yourself if you want Zookeeper-based Kafka monitoring tools to show progress of the streaming application.
153163

154164
Another thing to note is that since this approach does not use Receivers, the standard receiver-related (that is, [configurations](configuration.html) of the form `spark.streaming.receiver.*` ) will not apply to the input DStreams created by this approach (will apply to other input DStreams though). Instead, use the [configurations](configuration.html) `spark.streaming.kafka.*`. An important one is `spark.streaming.kafka.maxRatePerPartition` which is the maximum rate at which each Kafka partition will be read by this direct API.
155165

156-
3. **Deploying:** Similar to the first approach, you can package `spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and its dependencies into the application JAR and the launch the application using `spark-submit`. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` dependencies as those are already present in a Spark installation.
166+
3. **Deploying:** This is same as the first approach, for Scala, Java and Python.

docs/streaming-kinesis-integration.md

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
3232
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
3333

3434
val kinesisStream = KinesisUtils.createStream(
35-
streamingContext, [Kinesis stream name], [endpoint URL], [checkpoint interval], [initial position])
35+
streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
36+
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)
3637

3738
See the [API docs](api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisUtils$)
3839
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala). Refer to the Running the Example section for instructions on how to run the example.
@@ -44,7 +45,8 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
4445
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
4546

4647
JavaReceiverInputDStream<byte[]> kinesisStream = KinesisUtils.createStream(
47-
streamingContext, [Kinesis stream name], [endpoint URL], [checkpoint interval], [initial position]);
48+
streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
49+
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2);
4850

4951
See the [API docs](api/java/index.html?org/apache/spark/streaming/kinesis/KinesisUtils.html)
5052
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java). Refer to the next subsection for instructions to run the example.
@@ -54,19 +56,23 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
5456

5557
- `streamingContext`: StreamingContext containg an application name used by Kinesis to tie this Kinesis application to the Kinesis stream
5658

57-
- `[Kinesis stream name]`: The Kinesis stream that this streaming application receives from
58-
- The application name used in the streaming context becomes the Kinesis application name
59+
- `[Kineiss app name]`: The application name that will be used to checkpoint the Kinesis
60+
sequence numbers in DynamoDB table.
5961
- The application name must be unique for a given account and region.
60-
- The Kinesis backend automatically associates the application name to the Kinesis stream using a DynamoDB table (always in the us-east-1 region) created during Kinesis Client Library initialization.
61-
- Changing the application name or stream name can lead to Kinesis errors in some cases. If you see errors, you may need to manually delete the DynamoDB table.
62+
- If the table exists but has incorrect checkpoint information (for a different stream, or
63+
old expired sequenced numbers), then there may be temporary errors.
6264

65+
- `[Kinesis stream name]`: The Kinesis stream that this streaming application will pull data from.
6366

6467
- `[endpoint URL]`: Valid Kinesis endpoints URL can be found [here](http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region).
6568

69+
- `[region name]`: Valid Kinesis region names can be found [here](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html).
70+
6671
- `[checkpoint interval]`: The interval (e.g., Duration(2000) = 2 seconds) at which the Kinesis Client Library saves its position in the stream. For starters, set it to the same as the batch interval of the streaming application.
6772

6873
- `[initial position]`: Can be either `InitialPositionInStream.TRIM_HORIZON` or `InitialPositionInStream.LATEST` (see Kinesis Checkpointing section and Amazon Kinesis API documentation for more details).
6974

75+
In other versions of the API, you can also specify the AWS access key and secret key directly.
7076

7177
3. **Deploying:** Package `spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}` and its dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide).
7278

@@ -122,12 +128,12 @@ To run the example,
122128
<div class="codetabs">
123129
<div data-lang="scala" markdown="1">
124130

125-
bin/run-example streaming.KinesisWordCountASL [Kinesis stream name] [endpoint URL]
131+
bin/run-example streaming.KinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]
126132

127133
</div>
128134
<div data-lang="java" markdown="1">
129135

130-
bin/run-example streaming.JavaKinesisWordCountASL [Kinesis stream name] [endpoint URL]
136+
bin/run-example streaming.JavaKinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]
131137

132138
</div>
133139
</div>
@@ -136,7 +142,7 @@ To run the example,
136142

137143
- To generate random string data to put onto the Kinesis stream, in another terminal, run the associated Kinesis data producer.
138144

139-
bin/run-example streaming.KinesisWordCountProducerASL [Kinesis stream name] [endpoint URL] 1000 10
145+
bin/run-example streaming.KinesisWordProducerASL [Kinesis stream name] [endpoint URL] 1000 10
140146

141147
This will push 1000 lines per second of 10 random numbers per line to the Kinesis stream. This data should then be received and processed by the running example.
142148

docs/streaming-programming-guide.md

Lines changed: 24 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ main entry point for all streaming functionality. We create a local StreamingCon
7777
{% highlight scala %}
7878
import org.apache.spark._
7979
import org.apache.spark.streaming._
80-
import org.apache.spark.streaming.StreamingContext._ // not necessary in Spark 1.3+
80+
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
8181

8282
// Create a local StreamingContext with two working thread and batch interval of 1 second.
8383
// The master requires 2 cores to prevent from a starvation scenario.
@@ -109,7 +109,7 @@ each line will be split into multiple words and the stream of words is represent
109109
`words` DStream. Next, we want to count these words.
110110

111111
{% highlight scala %}
112-
import org.apache.spark.streaming.StreamingContext._ // not necessary in Spark 1.3+
112+
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
113113
// Count each word in each batch
114114
val pairs = words.map(word => (word, 1))
115115
val wordCounts = pairs.reduceByKey(_ + _)
@@ -682,7 +682,7 @@ for Java, and [StreamingContext](api/python/pyspark.streaming.html#pyspark.strea
682682
### Advanced Sources
683683
{:.no_toc}
684684

685-
<span class="badge" style="background-color: grey">Python API</span> As of Spark 1.3,
685+
<span class="badge" style="background-color: grey">Python API</span> As of Spark {{site.SPARK_VERSION_SHORT}},
686686
out of these sources, *only* Kafka is available in the Python API. We will add more advanced sources in the Python API in future.
687687

688688
This category of sources require interfacing with external non-Spark libraries, some of them with
@@ -723,7 +723,7 @@ and it in the classpath.
723723

724724
Some of these advanced sources are as follows.
725725

726-
- **Kafka:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with Kafka 0.8.1.1. See the [Kafka Integration Guide](streaming-kafka-integration.html) for more details.
726+
- **Kafka:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with Kafka 0.8.2.1. See the [Kafka Integration Guide](streaming-kafka-integration.html) for more details.
727727

728728
- **Flume:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with Flume 1.4.0. See the [Flume Integration Guide](streaming-flume-integration.html) for more details.
729729

@@ -991,8 +991,9 @@ cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(.
991991
</div>
992992
</div>
993993

994-
In fact, you can also use [machine learning](mllib-guide.html) and
995-
[graph computation](graphx-programming-guide.html) algorithms in the `transform` method.
994+
Note that the supplied function gets called in every batch interval. This allows you to do
995+
time-varying RDD operations, that is, RDD operations, number of partitions, broadcast variables,
996+
etc. can be changed between batches.
996997

997998
#### Window Operations
998999
{:.no_toc}
@@ -1427,38 +1428,18 @@ You can easily use [DataFrames and SQL](sql-programming-guide.html) operations o
14271428
<div data-lang="scala" markdown="1">
14281429
{% highlight scala %}
14291430

1430-
/** Lazily instantiated singleton instance of SQLContext */
1431-
object SQLContextSingleton {
1432-
@transient private var instance: SQLContext = null
1433-
1434-
// Instantiate SQLContext on demand
1435-
def getInstance(sparkContext: SparkContext): SQLContext = synchronized {
1436-
if (instance == null) {
1437-
instance = new SQLContext(sparkContext)
1438-
}
1439-
instance
1440-
}
1441-
}
1442-
1443-
...
1444-
1445-
/** Case class for converting RDD to DataFrame */
1446-
case class Row(word: String)
1447-
1448-
...
1449-
14501431
/** DataFrame operations inside your streaming program */
14511432

14521433
val words: DStream[String] = ...
14531434

14541435
words.foreachRDD { rdd =>
14551436

14561437
// Get the singleton instance of SQLContext
1457-
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
1438+
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
14581439
import sqlContext.implicits._
14591440

1460-
// Convert RDD[String] to RDD[case class] to DataFrame
1461-
val wordsDataFrame = rdd.map(w => Row(w)).toDF()
1441+
// Convert RDD[String] to DataFrame
1442+
val wordsDataFrame = rdd.toDF("word")
14621443

14631444
// Register as table
14641445
wordsDataFrame.registerTempTable("words")
@@ -1476,19 +1457,6 @@ See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/ma
14761457
<div data-lang="java" markdown="1">
14771458
{% highlight java %}
14781459

1479-
/** Lazily instantiated singleton instance of SQLContext */
1480-
class JavaSQLContextSingleton {
1481-
static private transient SQLContext instance = null;
1482-
static public SQLContext getInstance(SparkContext sparkContext) {
1483-
if (instance == null) {
1484-
instance = new SQLContext(sparkContext);
1485-
}
1486-
return instance;
1487-
}
1488-
}
1489-
1490-
...
1491-
14921460
/** Java Bean class for converting RDD to DataFrame */
14931461
public class JavaRow implements java.io.Serializable {
14941462
private String word;
@@ -1512,7 +1480,9 @@ words.foreachRDD(
15121480
new Function2<JavaRDD<String>, Time, Void>() {
15131481
@Override
15141482
public Void call(JavaRDD<String> rdd, Time time) {
1515-
SQLContext sqlContext = JavaSQLContextSingleton.getInstance(rdd.context());
1483+
1484+
// Get the singleton instance of SQLContext
1485+
SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());
15161486

15171487
// Convert RDD[String] to RDD[case class] to DataFrame
15181488
JavaRDD<JavaRow> rowRDD = rdd.map(new Function<String, JavaRow>() {
@@ -2234,7 +2204,7 @@ The following table summarizes the semantics under failures:
22342204

22352205
### With Kafka Direct API
22362206
{:.no_toc}
2237-
In Spark 1.3, we have introduced a new Kafka Direct API, which can ensure that all the Kafka data is received by Spark Streaming exactly once. Along with this, if you implement exactly-once output operation, you can achieve end-to-end exactly-once guarantees. This approach (experimental as of Spark 1.3) is further discussed in the [Kafka Integration Guide](streaming-kafka-integration.html).
2207+
In Spark 1.3, we have introduced a new Kafka Direct API, which can ensure that all the Kafka data is received by Spark Streaming exactly once. Along with this, if you implement exactly-once output operation, you can achieve end-to-end exactly-once guarantees. This approach (experimental as of Spark {{site.SPARK_VERSION_SHORT}}) is further discussed in the [Kafka Integration Guide](streaming-kafka-integration.html).
22382208

22392209
## Semantics of output operations
22402210
{:.no_toc}
@@ -2248,8 +2218,16 @@ additional effort may be necessary to achieve exactly-once semantics. There are
22482218

22492219
- *Transactional updates*: All updates are made transactionally so that updates are made exactly once atomically. One way to do this would be the following.
22502220

2251-
- Use the batch time (available in `foreachRDD`) and the partition index of the transformed RDD to create an identifier. This identifier uniquely identifies a blob data in the streaming application.
2252-
- Update external system with this blob transactionally (that is, exactly once, atomically) using the identifier. That is, if the identifier is not already committed, commit the partition data and the identifier atomically. Else if this was already committed, skip the update.
2221+
- Use the batch time (available in `foreachRDD`) and the partition index of the RDD to create an identifier. This identifier uniquely identifies a blob data in the streaming application.
2222+
- Update external system with this blob transactionally (that is, exactly once, atomically) using the identifier. That is, if the identifier is not already committed, commit the partition data and the identifier atomically. Else if this was already committed, skip the update.
2223+
2224+
dstream.foreachRDD { (rdd, time) =>
2225+
rdd.foreachPartition { partitionIterator =>
2226+
val partitionId = TaskContext.get.partitionId()
2227+
val uniqueId = generateUniqueId(time.milliseconds, partitionId)
2228+
// use this uniqueId to transactionally commit the data in partitionIterator
2229+
}
2230+
}
22532231

22542232

22552233
***************************************************************************************************

0 commit comments

Comments
 (0)