Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
985cfa9
Kafka source project
tdas Sep 14, 2016
c4db857
Add more columns inspired by Cody's work
zsxwing Sep 14, 2016
d3c0754
Fix the build
zsxwing Sep 14, 2016
f5c57f5
Support add partitions
zsxwing Sep 20, 2016
d005ba5
Merge remote-tracking branch 'origin/master' into kafka-source
zsxwing Sep 20, 2016
b64d104
Fix compile error
zsxwing Sep 20, 2016
a3d0a2b
Get rid of type parameters
zsxwing Sep 20, 2016
6bc2994
Use Kafka API to fetch the earlies offsets of new partitions; Get rid…
zsxwing Sep 20, 2016
881b206
Stress test for adding new topics
zsxwing Sep 21, 2016
8e86f98
test case that shows why the current implementation is wrong from an …
koeninger Sep 22, 2016
786af2f
Merge pull request #4 from koeninger/kafka-source-deletion
zsxwing Sep 22, 2016
5f33eb4
Merge remote-tracking branch 'origin/master' into kafka-source
zsxwing Sep 23, 2016
4fe16c8
Refactored and simplified
tdas Sep 23, 2016
e167152
Added source option startingOffset
tdas Sep 23, 2016
608b8c3
Addressed zsxwing's comments
tdas Sep 24, 2016
3ee8305
Removed line
tdas Sep 26, 2016
ce3f38f
Merge pull request #3 from tdas/kafka-source-new
zsxwing Sep 26, 2016
755ceaa
Merge remote-tracking branch 'origin/master' into kafka-source
zsxwing Sep 26, 2016
852f607
Fix corner cases and update stress test
zsxwing Sep 27, 2016
59a93a5
Workaround for KAFKA-1894 and address some commments
zsxwing Sep 28, 2016
9d95d52
Address more
zsxwing Sep 28, 2016
e883062
Address more
zsxwing Oct 3, 2016
d154532
Add programming guide
zsxwing Oct 3, 2016
77208d1
Adddress; Fix a corner case when the first batch fails; set failOnCor…
zsxwing Oct 3, 2016
ccadd81
Add example about how to convert key and value to strings
zsxwing Oct 3, 2016
7ff1059
Address rest comments
zsxwing Oct 3, 2016
a6c4970
Remove the sorting
zsxwing Oct 3, 2016
9e9fef3
Address nits
zsxwing Oct 4, 2016
d50a05e
Address
zsxwing Oct 4, 2016
4316906
Address more
zsxwing Oct 4, 2016
d9d848c
Address nits
zsxwing Oct 5, 2016
7d658f1
Address one more nit
zsxwing Oct 5, 2016
4754125
Add an instruction for failOnDataLoss
zsxwing Oct 5, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,6 @@ private[spark] class UninterruptibleThread(name: String) extends Thread(name) {
}
}

/**
* Tests whether `interrupt()` has been called.
*/
override def isInterrupted: Boolean = {
super.isInterrupted || uninterruptibleLock.synchronized { shouldInterruptThread }
}

/**
* Interrupt `this` thread if possible. If `this` is in the uninterruptible status, it won't be
* interrupted until it enters into the interruptible status.
Expand Down
2 changes: 1 addition & 1 deletion dev/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def determine_modules_to_test(changed_modules):
['graphx', 'examples']
>>> x = [x.name for x in determine_modules_to_test([modules.sql])]
>>> x # doctest: +NORMALIZE_WHITESPACE
['sql', 'hive', 'mllib', 'examples', 'hive-thriftserver',
['sql', 'hive', 'mllib', 'sql-kafka-0-10', 'examples', 'hive-thriftserver',
'pyspark-sql', 'sparkr', 'pyspark-mllib', 'pyspark-ml']
"""
modules_to_test = set()
Expand Down
12 changes: 12 additions & 0 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,18 @@ def __hash__(self):
)


sql_kafka = Module(
name="sql-kafka-0-10",
dependencies=[sql],
source_file_regexes=[
"external/kafka-0-10-sql",
],
sbt_test_goals=[
"sql-kafka-0-10/test",
]
)


sketch = Module(
name="sketch",
dependencies=[tags],
Expand Down
239 changes: 239 additions & 0 deletions docs/structured-streaming-kafka-integration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
---
layout: global
title: Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)
---

Structured Streaming integration for Kafka 0.10 to poll data from Kafka.

### Linking
For Scala/Java applications using SBT/Maven project definitions, link your application with the following artifact:

groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_{{site.SCALA_BINARY_VERSION}}
version = {{site.SPARK_VERSION_SHORT}}

For Python applications, you need to add this above library and its dependencies when deploying your
application. See the [Deploying](#deploying) subsection below.

### Creating a Kafka Source Stream

<div class="codetabs">
<div data-lang="scala" markdown="1">

// Subscribe to 1 topic
val ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]

// Subscribe to multiple topics
val ds2 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load()
ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]

// Subscribe to a pattern
val ds3 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load()
ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]

</div>
<div data-lang="java" markdown="1">

// Subscribe to 1 topic
Dataset<Row> ds1 = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

// Subscribe to multiple topics
Dataset<Row> ds2 = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load()
ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

// Subscribe to a pattern
Dataset<Row> ds3 = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load()
ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

</div>
<div data-lang="python" markdown="1">

# Subscribe to 1 topic
ds1 = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to multiple topics
ds2 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load()
ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to a pattern
ds3 = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load()
ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

</div>
</div>

Each row in the source has the following schema:
<table class="table">
<tr><th>Column</th><th>Type</th></tr>
<tr>
<td>key</td>
<td>binary</td>
</tr>
<tr>
<td>value</td>
<td>binary</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably include an example of how to select key / value in the desired format.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an example to convert key/value to strings.

</tr>
<tr>
<td>topic</td>
<td>string</td>
</tr>
<tr>
<td>partition</td>
<td>int</td>
</tr>
<tr>
<td>offset</td>
<td>long</td>
</tr>
<tr>
<td>timestamp</td>
<td>long</td>
</tr>
<tr>
<td>timestampType</td>
<td>int</td>
</tr>
</table>

The following options must be set for the Kafka source.

<table class="table">
<tr><th>Option</th><th>value</th><th>meaning</th></tr>
<tr>
<td>subscribe</td>
<td>A comma-separated list of topics</td>
<td>The topic list to subscribe. Only one of "subscribe" and "subscribePattern" options can be
specified for Kafka source.</td>
</tr>
<tr>
<td>subscribePattern</td>
<td>Java regex string</td>
<td>The pattern used to subscribe the topic. Only one of "subscribe" and "subscribePattern"
options can be specified for Kafka source.</td>
</tr>
<tr>
<td>kafka.bootstrap.servers</td>
<td>A comma-separated list of host:port</td>
<td>The Kafka "bootstrap.servers" configuration.</td>
</tr>
</table>

The following configurations are optional:

<table class="table">
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would split this into two tables, one that must be specified (subscribe/subscribePattern, and kafka.bootstrap...) and then optional params.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would split this into two tables, one that must be specified (subscribe/subscribePattern, and kafka.bootstrap...) and then optional params.

done

<tr><th>Option</th><th>value</th><th>default</th><th>meaning</th></tr>
<tr>
<td>startingOffset</td>
<td>["earliest", "latest"]</td>
<td>"latest"</td>
<td>The start point when a query is started, either "earliest" which is from the earliest offset,
or "latest" which is just from the latest offset. Note: This only applies when a new Streaming q
uery is started, and that resuming will always pick up from where the query left off.</td>
</tr>
<tr>
<td>failOnDataLoss</td>
<td>[true, false]</td>
<td>true</td>
<td>Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or
offsets are out of range). This may be a false alarm. You can disable it when it doesn't work
as you expected.</td>
</tr>
<tr>
<td>kafkaConsumer.pollTimeoutMs</td>
<td>long</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can keep this is int

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can keep this is int

Since this is a milliseconds, I try to make the type consistent with other configurations in Spark.

<td>512</td>
<td>The timeout in milliseconds to poll data from Kafka in executors.</td>
</tr>
<tr>
<td>fetchOffset.numRetries</td>
<td>int</td>
<td>3</td>
<td>Number of times to retry before giving up fatch Kafka latest offsets.</td>
</tr>
<tr>
<td>fetchOffset.retryIntervalMs</td>
<td>long</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can keep this as int.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can keep this as int.

Since this is a milliseconds, I try to make the type consistent with other configurations in Spark.

<td>10</td>
<td>milliseconds to wait before retrying to fetch Kafka offsets</td>
</tr>
</table>

Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g,
`stream.option("kafka.bootstrap.servers", "host:port")`. For possible kafkaParams, see
[Kafka consumer config docs](http://kafka.apache.org/documentation.html#newconsumerconfigs).

Note that the following Kafka params cannot be set and the Kafka source will throw an exception:
- **group.id**: Kafka source will create a unique group id for each query automatically.
- **auto.offset.reset**: Set the source option `startingOffset` to `earliest` or `latest` to specify
where to start instead. Structured Streaming manages which offsets are consumed internally, rather
than rely on the kafka Consumer to do it. This will ensure that no data is missed when when new
topics/partitions are dynamically subscribed. Note that `startingOffset` only applies when a new
Streaming query is started, and that resuming will always pick up from where the query left off.
- **key.deserializer**: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use
DataFrame operations to explicitly deserialize the keys.
- **value.deserializer**: Values are always deserialized as byte arrays with ByteArrayDeserializer.
Use DataFrame operations to explicitly deserialize the values.
- **enable.auto.commit**: Kafka source doesn't commit any offset.
- **interceptor.classes**: Kafka source always read keys and values as byte arrays. It's not safe to
use ConsumerInterceptor as it may break the query.

### Deploying

As with any Spark applications, `spark-submit` is used to launch your application. `spark-sql-kafka-0-10_{{site.SCALA_BINARY_VERSION}}`
and its dependencies can be directly added to `spark-submit` using `--packages`, such as,

./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ...

See [Application Submission Guide](submitting-applications.html) for more details about submitting
applications with external dependencies.
7 changes: 6 additions & 1 deletion docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -418,10 +418,15 @@ Since Spark 2.0, DataFrames and Datasets can represent static, bounded data, as
Streaming DataFrames can be created through the `DataStreamReader` interface
([Scala](api/scala/index.html#org.apache.spark.sql.streaming.DataStreamReader)/
[Java](api/java/org/apache/spark/sql/streaming/DataStreamReader.html)/
[Python](api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader) docs) returned by `SparkSession.readStream()`. Similar to the read interface for creating static DataFrame, you can specify the details of the source – data format, schema, options, etc. In Spark 2.0, there are a few built-in sources.
[Python](api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader) docs) returned by `SparkSession.readStream()`. Similar to the read interface for creating static DataFrame, you can specify the details of the source – data format, schema, options, etc.

#### Data Sources
In Spark 2.0, there are a few built-in sources.

- **File source** - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.

- **Kafka source** - Poll data from Kafka. It's compatible with Kafka broker versions 0.10.0 or higher. See the [Kafka Integration Guide](structured-streaming-kafka-integration.html) for more details.

- **Socket source (for testing)** - Reads UTF8 text data from a socket connection. The listening server socket is at the driver. Note that this should be used only for testing as this does not provide end-to-end fault-tolerance guarantees.

Here are some examples.
Expand Down
82 changes: 82 additions & 0 deletions external/kafka-0-10-sql/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<properties>
<sbt.project.name>sql-kafka-0-10</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Kafka 0.10 Source for Structured Streaming</name>
<url>http://spark.apache.org/</url>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
<version>0.10.0.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.sf.jopt-simple</groupId>
<artifactId>jopt-simple</artifactId>
<version>3.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.spark.sql.kafka010.KafkaSourceProvider
Loading