Skip to content

Commit 4316906

Browse files
committed
Address more
1 parent d50a05e commit 4316906

File tree

4 files changed

+9
-10
lines changed

4 files changed

+9
-10
lines changed

docs/structured-streaming-kafka-integration.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ The rest configurations are optional:
190190
as you expected.</td>
191191
</tr>
192192
<tr>
193-
<td>kafka.consumer.poll.timeoutMs</td>
193+
<td>kafkaConsumer.pollTimeoutMs</td>
194194
<td>long</td>
195195
<td>512</td>
196196
<td>The timeout in milliseconds to poll data from Kafka in executors.</td>

docs/structured-streaming-programming-guide.md

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -418,10 +418,15 @@ Since Spark 2.0, DataFrames and Datasets can represent static, bounded data, as
418418
Streaming DataFrames can be created through the `DataStreamReader` interface
419419
([Scala](api/scala/index.html#org.apache.spark.sql.streaming.DataStreamReader)/
420420
[Java](api/java/org/apache/spark/sql/streaming/DataStreamReader.html)/
421-
[Python](api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader) docs) returned by `SparkSession.readStream()`. Similar to the read interface for creating static DataFrame, you can specify the details of the source – data format, schema, options, etc. In Spark 2.0, there are a few built-in sources.
421+
[Python](api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader) docs) returned by `SparkSession.readStream()`. Similar to the read interface for creating static DataFrame, you can specify the details of the source – data format, schema, options, etc.
422+
423+
#### Data Sources
424+
In Spark 2.0, there are a few built-in sources.
422425

423426
- **File source** - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.
424427

428+
- **Kafka source** - Poll data from Kafka. It's compatible with Kafka broker versions 0.10.0 or higher. See the [Kafka Integration Guide](structured-streaming-kafka-integration.html) for more details.
429+
425430
- **Socket source (for testing)** - Reads UTF8 text data from a socket connection. The listening server socket is at the driver. Note that this should be used only for testing as this does not provide end-to-end fault-tolerance guarantees.
426431

427432
Here are some examples.
@@ -1126,12 +1131,6 @@ aggDF \
11261131
</div>
11271132
</div>
11281133

1129-
## Advanced Sources
1130-
1131-
Structured Streaming supports the following advanced sources:
1132-
1133-
- **Kafka:** Structured Streaming {{site.SPARK_VERSION_SHORT}} is compatible with Kafka broker versions 0.10.0 or higher. See the [Kafka Integration Guide](structured-streaming-kafka-integration.html) for more details.
1134-
11351134
# Where to go from here
11361135
- Examples: See and run the
11371136
[Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming)/[Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/sql/streaming)/[Python]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python/sql/streaming)

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ private[kafka010] case class KafkaSource(
8787

8888
private val sc = sqlContext.sparkContext
8989

90-
private val pollTimeoutMs = sourceOptions.getOrElse("kafka.consumer.poll.timeoutMs", "512").toLong
90+
private val pollTimeoutMs = sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong
9191

9292
private val maxOffsetFetchAttempts =
9393
sourceOptions.getOrElse("fetchOffset.numRetries", "3").toInt

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
210210
|Kafka option '${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG}' is not supported.
211211
|Instead set the source option '$STARTING_OFFSET_OPTION_KEY' to 'earliest' or 'latest' to
212212
|specify where to start. Structured Streaming manages which offsets are consumed
213-
|internally, rather than relying on the kafka Consumer to do it. This will ensure that no
213+
|internally, rather than relying on the kafkaConsumer to do it. This will ensure that no
214214
|data is missed when when new topics/partitions are dynamically subscribed. Note that
215215
|'$STARTING_OFFSET_OPTION_KEY' only applies when a new Streaming query is started, and
216216
|that resuming will always pick up from where the query left off. See the docs for more

0 commit comments

Comments
 (0)