Skip to content

Commit bb0ac6d

Browse files
cfreglytdas
authored andcommitted
[SPARK-1981] Add AWS Kinesis streaming support
Author: Chris Fregly <[email protected]> Closes #1434 from cfregly/master and squashes the following commits: 4774581 [Chris Fregly] updated docs, renamed retry to retryRandom to be more clear, removed retries around store() method 0393795 [Chris Fregly] moved Kinesis examples out of examples/ and back into extras/kinesis-asl 691a6be [Chris Fregly] fixed tests and formatting, fixed a bug with JavaKinesisWordCount during union of streams 0e1c67b [Chris Fregly] Merge remote-tracking branch 'upstream/master' 74e5c7c [Chris Fregly] updated per TD's feedback. simplified examples, updated docs e33cbeb [Chris Fregly] Merge remote-tracking branch 'upstream/master' bf614e9 [Chris Fregly] per matei's feedback: moved the kinesis examples into the examples/ dir d17ca6d [Chris Fregly] per TD's feedback: updated docs, simplified the KinesisUtils api 912640c [Chris Fregly] changed the foundKinesis class to be a publically-avail class db3eefd [Chris Fregly] Merge remote-tracking branch 'upstream/master' 21de67f [Chris Fregly] Merge remote-tracking branch 'upstream/master' 6c39561 [Chris Fregly] parameterized the versions of the aws java sdk and kinesis client 338997e [Chris Fregly] improve build docs for kinesis 828f8ae [Chris Fregly] more cleanup e7c8978 [Chris Fregly] Merge remote-tracking branch 'upstream/master' cd68c0d [Chris Fregly] fixed typos and backward compatibility d18e680 [Chris Fregly] Merge remote-tracking branch 'upstream/master' b3b0ff1 [Chris Fregly] [SPARK-1981] Add AWS Kinesis streaming support (cherry picked from commit 91f9504) Signed-off-by: Tathagata Das <[email protected]>
1 parent 91de0dc commit bb0ac6d

File tree

25 files changed

+1592
-15
lines changed

25 files changed

+1592
-15
lines changed

bin/run-example

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ if [ -n "$1" ]; then
2929
else
3030
echo "Usage: ./bin/run-example <example-class> [example-args]" 1>&2
3131
echo " - set MASTER=XX to use a specific master" 1>&2
32-
echo " - can use abbreviated example class name (e.g. SparkPi, mllib.LinearRegression)" 1>&2
32+
echo " - can use abbreviated example class name relative to com.apache.spark.examples" 1>&2
33+
echo " (e.g. SparkPi, mllib.LinearRegression, streaming.KinesisWordCountASL)" 1>&2
3334
exit 1
3435
fi
3536

bin/run-example2.cmd

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ rem Test that an argument was given
3232
if not "x%1"=="x" goto arg_given
3333
echo Usage: run-example ^<example-class^> [example-args]
3434
echo - set MASTER=XX to use a specific master
35-
echo - can use abbreviated example class name (e.g. SparkPi, mllib.LinearRegression)
35+
echo - can use abbreviated example class name relative to com.apache.spark.examples
36+
echo (e.g. SparkPi, mllib.LinearRegression, streaming.KinesisWordCountASL)
3637
goto exit
3738
:arg_given
3839

dev/audit-release/audit_release.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ def get_url(url):
105105
"spark-core", "spark-bagel", "spark-mllib", "spark-streaming", "spark-repl",
106106
"spark-graphx", "spark-streaming-flume", "spark-streaming-kafka",
107107
"spark-streaming-mqtt", "spark-streaming-twitter", "spark-streaming-zeromq",
108-
"spark-catalyst", "spark-sql", "spark-hive"
108+
"spark-catalyst", "spark-sql", "spark-hive", "spark-streaming-kinesis-asl"
109109
]
110110
modules = map(lambda m: "%s_%s" % (m, SCALA_BINARY_VERSION), modules)
111111

@@ -136,7 +136,7 @@ def ensure_path_not_present(x):
136136
os.chdir(original_dir)
137137

138138
# SBT application tests
139-
for app in ["sbt_app_core", "sbt_app_graphx", "sbt_app_streaming", "sbt_app_sql", "sbt_app_hive"]:
139+
for app in ["sbt_app_core", "sbt_app_graphx", "sbt_app_streaming", "sbt_app_sql", "sbt_app_hive", "sbt_app_kinesis"]:
140140
os.chdir(app)
141141
ret = run_cmd("sbt clean run", exit_on_failure=False)
142142
test(ret == 0, "sbt application (%s)" % app)

dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,5 +50,12 @@ object SimpleApp {
5050
println("Ganglia sink was loaded via spark-core")
5151
System.exit(-1)
5252
}
53+
54+
// Remove kinesis from default build due to ASL license issue
55+
val foundKinesis = Try(Class.forName("org.apache.spark.streaming.kinesis.KinesisUtils")).isSuccess
56+
if (foundKinesis) {
57+
println("Kinesis was loaded via spark-core")
58+
System.exit(-1)
59+
}
5360
}
5461
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
//
2+
// Licensed to the Apache Software Foundation (ASF) under one or more
3+
// contributor license agreements. See the NOTICE file distributed with
4+
// this work for additional information regarding copyright ownership.
5+
// The ASF licenses this file to You under the Apache License, Version 2.0
6+
// (the "License"); you may not use this file except in compliance with
7+
// the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
//
17+
18+
name := "Kinesis Test"
19+
20+
version := "1.0"
21+
22+
scalaVersion := System.getenv.get("SCALA_VERSION")
23+
24+
libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % System.getenv.get("SPARK_VERSION")
25+
26+
resolvers ++= Seq(
27+
"Spark Release Repository" at System.getenv.get("SPARK_RELEASE_REPOSITORY"),
28+
"Spray Repository" at "http://repo.spray.cc/")
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package main.scala
19+
20+
import scala.util.Try
21+
22+
import org.apache.spark.SparkContext
23+
import org.apache.spark.SparkContext._
24+
25+
object SimpleApp {
26+
def main(args: Array[String]) {
27+
val foundKinesis = Try(Class.forName("org.apache.spark.streaming.kinesis.KinesisUtils")).isSuccess
28+
if (!foundKinesis) {
29+
println("Kinesis not loaded via kinesis-asl")
30+
System.exit(-1)
31+
}
32+
}
33+
}

dev/create-release/create-release.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,15 @@ if [[ ! "$@" =~ --package-only ]]; then
5353
-Dusername=$GIT_USERNAME -Dpassword=$GIT_PASSWORD \
5454
-Dmaven.javadoc.skip=true \
5555
-Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \
56-
-Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl\
5756
-Dtag=$GIT_TAG -DautoVersionSubmodules=true \
57+
-Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \
5858
--batch-mode release:prepare
5959

6060
mvn -DskipTests \
6161
-Darguments="-DskipTests=true -Dmaven.javadoc.skip=true -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 -Dgpg.passphrase=${GPG_PASSPHRASE}" \
6262
-Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \
6363
-Dmaven.javadoc.skip=true \
64-
-Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl\
64+
-Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \
6565
release:perform
6666

6767
cd ..

dev/run-tests

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ fi
3636
if [ -z "$SBT_MAVEN_PROFILES_ARGS" ]; then
3737
export SBT_MAVEN_PROFILES_ARGS="-Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0"
3838
fi
39+
40+
export SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Pkinesis-asl"
41+
3942
echo "SBT_MAVEN_PROFILES_ARGS=\"$SBT_MAVEN_PROFILES_ARGS\""
4043

4144
# Remove work directory

docs/streaming-custom-receivers.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ title: Spark Streaming Custom Receivers
44
---
55

66
Spark Streaming can receive streaming data from any arbitrary data source beyond
7-
the one's for which it has in-built support (that is, beyond Flume, Kafka, files, sockets, etc.).
7+
the one's for which it has in-built support (that is, beyond Flume, Kafka, Kinesis, files, sockets, etc.).
88
This requires the developer to implement a *receiver* that is customized for receiving data from
99
the concerned data source. This guide walks through the process of implementing a custom receiver
1010
and using it in a Spark Streaming application.
@@ -174,7 +174,7 @@ val words = lines.flatMap(_.split(" "))
174174
...
175175
{% endhighlight %}
176176

177-
The full source code is in the example [CustomReceiver.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala).
177+
The full source code is in the example [CustomReceiver.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala).
178178

179179
</div>
180180
<div data-lang="java" markdown="1">

docs/streaming-kinesis.md

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
---
2+
layout: global
3+
title: Spark Streaming Kinesis Receiver
4+
---
5+
6+
### Kinesis
7+
Build notes:
8+
<li>Spark supports a Kinesis Streaming Receiver which is not included in the default build due to licensing restrictions.</li>
9+
<li>_**Note that by embedding this library you will include [ASL](https://aws.amazon.com/asl/)-licensed code in your Spark package**_.</li>
10+
<li>The Spark Kinesis Streaming Receiver source code, examples, tests, and artifacts live in $SPARK_HOME/extras/kinesis-asl.</li>
11+
<li>To build with Kinesis, you must run the maven or sbt builds with -Pkinesis-asl`.</li>
12+
<li>Applications will need to link to the 'spark-streaming-kinesis-asl` artifact.</li>
13+
14+
Kinesis examples notes:
15+
<li>To build the Kinesis examples, you must run the maven or sbt builds with -Pkinesis-asl`.</li>
16+
<li>These examples automatically determine the number of local threads and KinesisReceivers to spin up based on the number of shards for the stream.</li>
17+
<li>KinesisWordCountProducerASL will generate random data to put onto the Kinesis stream for testing.</li>
18+
<li>Checkpointing is disabled (no checkpoint dir is set). The examples as written will not recover from a driver failure.</li>
19+
20+
Deployment and runtime notes:
21+
<li>A single KinesisReceiver can process many shards of a stream.</li>
22+
<li>Each shard of a stream is processed by one or more KinesisReceiver's managed by the Kinesis Client Library (KCL) Worker.</li>
23+
<li>You never need more KinesisReceivers than the number of shards in your stream.</li>
24+
<li>You can horizontally scale the receiving by creating more KinesisReceiver/DStreams (up to the number of shards for a given stream)</li>
25+
<li>The Kinesis libraries must be present on all worker nodes, as they will need access to the Kinesis Client Library.</li>
26+
<li>This code uses the DefaultAWSCredentialsProviderChain and searches for credentials in the following order of precedence:<br/>
27+
1) Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY<br/>
28+
2) Java System Properties - aws.accessKeyId and aws.secretKey<br/>
29+
3) Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs<br/>
30+
4) Instance profile credentials - delivered through the Amazon EC2 metadata service<br/>
31+
</li>
32+
<li>You need to setup a Kinesis stream with 1 or more shards per the following:<br/>
33+
http://docs.aws.amazon.com/kinesis/latest/dev/step-one-create-stream.html</li>
34+
<li>Valid Kinesis endpoint urls can be found here: Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region</li>
35+
<li>When you first start up the KinesisReceiver, the Kinesis Client Library (KCL) needs ~30s to establish connectivity with the AWS Kinesis service,
36+
retrieve any checkpoint data, and negotiate with other KCL's reading from the same stream.</li>
37+
<li>Be careful when changing the app name. Kinesis maintains a mapping table in DynamoDB based on this app name (http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-implementation-app.html#kinesis-record-processor-initialization).
38+
Changing the app name could lead to Kinesis errors as only 1 logical application can process a stream. In order to start fresh,
39+
it's always best to delete the DynamoDB table that matches your app name. This DynamoDB table lives in us-east-1 regardless of the Kinesis endpoint URL.</li>
40+
41+
Failure recovery notes:
42+
<li>The combination of Spark Streaming and Kinesis creates 3 different checkpoints as follows:<br/>
43+
1) RDD data checkpoint (Spark Streaming) - frequency is configurable with DStream.checkpoint(Duration)<br/>
44+
2) RDD metadata checkpoint (Spark Streaming) - frequency is every DStream batch<br/>
45+
3) Kinesis checkpointing (Kinesis) - frequency is controlled by the developer calling ICheckpointer.checkpoint() directly<br/>
46+
</li>
47+
<li>Checkpointing too frequently will cause excess load on the AWS checkpoint storage layer and may lead to AWS throttling</li>
48+
<li>Upon startup, a KinesisReceiver will begin processing records with sequence numbers greater than the last checkpoint sequence number recorded per shard.</li>
49+
<li>If no checkpoint info exists, the worker will start either from the oldest record available (InitialPositionInStream.TRIM_HORIZON)
50+
or from the tip/latest (InitialPostitionInStream.LATEST). This is configurable.</li>
51+
<li>When pulling from the stream tip (InitialPositionInStream.LATEST), only new stream data will be picked up after the KinesisReceiver starts.</li>
52+
<li>InitialPositionInStream.LATEST could lead to missed records if data is added to the stream while no KinesisReceivers are running.</li>
53+
<li>In production, you'll want to switch to InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis limit) of previous stream data
54+
depending on the checkpoint frequency.</li>
55+
<li>InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records depending on the checkpoint frequency.</li>
56+
<li>Record processing should be idempotent when possible.</li>
57+
<li>Failed or latent KinesisReceivers will be detected and automatically shutdown/load-balanced by the KCL.</li>
58+
<li>If possible, explicitly shutdown the worker if a failure occurs in order to trigger the final checkpoint.</li>

0 commit comments

Comments
 (0)