Skip to content

Commit d5c8a32

Browse files
committed
Add docs for Kafka message handler
1 parent 7c970f9 commit d5c8a32

File tree

1 file changed

+3
-0
lines changed

1 file changed

+3
-0
lines changed

docs/streaming-kafka-integration.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ Next, we discuss how to use this approach in your streaming application.
104104
[key class], [value class], [key decoder class], [value decoder class] ](
105105
streamingContext, [map of Kafka parameters], [set of topics to consume])
106106

107+
You can also pass a `messageHandler` to `createDirectStream` to access `MessageAndMetadata` that contains metadata about the current message and transform it to any desired type.
107108
See the [API docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$)
108109
and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala).
109110
</div>
@@ -115,6 +116,7 @@ Next, we discuss how to use this approach in your streaming application.
115116
[key class], [value class], [key decoder class], [value decoder class],
116117
[map of Kafka parameters], [set of topics to consume]);
117118

119+
You can also pass a `messageHandler` to `createDirectStream` to access `MessageAndMetadata` that contains metadata about the current message and transform it to any desired type.
118120
See the [API docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html)
119121
and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java).
120122

@@ -123,6 +125,7 @@ Next, we discuss how to use this approach in your streaming application.
123125
from pyspark.streaming.kafka import KafkaUtils
124126
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
125127

128+
You can also pass a `messageHandler` to `createDirectStream` to access `KafkaMessageAndMetadata` that contains metadata about the current message and transform it to any desired type.
126129
By default, the Python API will decode Kafka data as UTF8 encoded strings. You can specify your custom decoding function to decode the byte arrays in Kafka records to any arbitrary data type. See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils)
127130
and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/direct_kafka_wordcount.py).
128131
</div>

0 commit comments

Comments
 (0)