Skip to content

Commit b7d74a6

Browse files
zsxwingtdas
authored andcommitted
[SPARK-7799][SPARK-12786][STREAMING] Add "streaming-akka" project
Include the following changes: 1. Add "streaming-akka" project and org.apache.spark.streaming.akka.AkkaUtils for creating an actorStream 2. Remove "StreamingContext.actorStream" and "JavaStreamingContext.actorStream" 3. Update the ActorWordCount example and add the JavaActorWordCount example 4. Make "streaming-zeromq" depend on "streaming-akka" and update the codes accordingly Author: Shixiong Zhu <[email protected]> Closes #10744 from zsxwing/streaming-akka-2.
1 parent 944fdad commit b7d74a6

File tree

22 files changed

+601
-185
lines changed

22 files changed

+601
-185
lines changed

dev/sparktestsupport/modules.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,18 @@ def contains_file(self, filename):
222222
)
223223

224224

225+
streaming_akka = Module(
226+
name="streaming-akka",
227+
dependencies=[streaming],
228+
source_file_regexes=[
229+
"external/akka",
230+
],
231+
sbt_test_goals=[
232+
"streaming-akka/test",
233+
]
234+
)
235+
236+
225237
streaming_flume = Module(
226238
name="streaming-flume",
227239
dependencies=[streaming],

docs/streaming-custom-receivers.md

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -257,25 +257,54 @@ The following table summarizes the characteristics of both types of receivers
257257

258258
## Implementing and Using a Custom Actor-based Receiver
259259

260+
<div class="codetabs">
261+
<div data-lang="scala" markdown="1" >
262+
260263
Custom [Akka Actors](http://doc.akka.io/docs/akka/2.3.11/scala/actors.html) can also be used to
261-
receive data. The [`ActorHelper`](api/scala/index.html#org.apache.spark.streaming.receiver.ActorHelper)
262-
trait can be mixed in to any Akka actor, which allows received data to be stored in Spark using
263-
`store(...)` methods. The supervisor strategy of this actor can be configured to handle failures, etc.
264+
receive data. Extending [`ActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.ActorReceiver)
265+
allows received data to be stored in Spark using `store(...)` methods. The supervisor strategy of
266+
this actor can be configured to handle failures, etc.
264267

265268
{% highlight scala %}
266-
class CustomActor extends Actor with ActorHelper {
269+
270+
class CustomActor extends ActorReceiver {
267271
def receive = {
268272
case data: String => store(data)
269273
}
270274
}
275+
276+
// A new input stream can be created with this custom actor as
277+
val ssc: StreamingContext = ...
278+
val lines = AkkaUtils.createStream[String](ssc, Props[CustomActor](), "CustomReceiver")
279+
271280
{% endhighlight %}
272281

273-
And a new input stream can be created with this custom actor as
282+
See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala) for an end-to-end example.
283+
</div>
284+
<div data-lang="java" markdown="1">
285+
286+
Custom [Akka UntypedActors](http://doc.akka.io/docs/akka/2.3.11/java/untyped-actors.html) can also be used to
287+
receive data. Extending [`JavaActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.JavaActorReceiver)
288+
allows received data to be stored in Spark using `store(...)` methods. The supervisor strategy of
289+
this actor can be configured to handle failures, etc.
290+
291+
{% highlight java %}
292+
293+
class CustomActor extends JavaActorReceiver {
294+
@Override
295+
public void onReceive(Object msg) throws Exception {
296+
store((String) msg);
297+
}
298+
}
299+
300+
// A new input stream can be created with this custom actor as
301+
JavaStreamingContext jssc = ...;
302+
JavaDStream<String> lines = AkkaUtils.<String>createStream(jssc, Props.create(CustomActor.class), "CustomReceiver");
274303

275-
{% highlight scala %}
276-
val ssc: StreamingContext = ...
277-
val lines = ssc.actorStream[String](Props[CustomActor], "CustomReceiver")
278304
{% endhighlight %}
279305

280-
See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala)
281-
for an end-to-end example.
306+
See [JavaActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/JavaActorWordCount.scala) for an end-to-end example.
307+
</div>
308+
</div>
309+
310+
<span class="badge" style="background-color: grey">Python API</span> Since actors are available only in the Java and Scala libraries, AkkaUtils is not available in the Python API.

docs/streaming-programming-guide.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -659,11 +659,11 @@ methods for creating DStreams from files and Akka actors as input sources.
659659
<span class="badge" style="background-color: grey">Python API</span> `fileStream` is not available in the Python API, only `textFileStream` is available.
660660

661661
- **Streams based on Custom Actors:** DStreams can be created with data streams received through Akka
662-
actors by using `streamingContext.actorStream(actorProps, actor-name)`. See the [Custom Receiver
662+
actors by using `AkkaUtils.createStream(ssc, actorProps, actor-name)`. See the [Custom Receiver
663663
Guide](streaming-custom-receivers.html) for more details.
664664

665665
<span class="badge" style="background-color: grey">Python API</span> Since actors are available only in the Java and Scala
666-
libraries, `actorStream` is not available in the Python API.
666+
libraries, `AkkaUtils.createStream` is not available in the Python API.
667667

668668
- **Queue of RDDs as a Stream:** For testing a Spark Streaming application with test data, one can also create a DStream based on a queue of RDDs, using `streamingContext.queueStream(queueOfRDDs)`. Each RDD pushed into the queue will be treated as a batch of data in the DStream, and processed like a stream.
669669

examples/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,11 @@
7575
<artifactId>spark-streaming-flume_${scala.binary.version}</artifactId>
7676
<version>${project.version}</version>
7777
</dependency>
78+
<dependency>
79+
<groupId>org.apache.spark</groupId>
80+
<artifactId>spark-streaming-akka_${scala.binary.version}</artifactId>
81+
<version>${project.version}</version>
82+
</dependency>
7883
<dependency>
7984
<groupId>org.apache.spark</groupId>
8085
<artifactId>spark-streaming-mqtt_${scala.binary.version}</artifactId>

examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@
3131
import org.apache.spark.streaming.Duration;
3232
import org.apache.spark.streaming.api.java.JavaDStream;
3333
import org.apache.spark.streaming.api.java.JavaStreamingContext;
34-
import org.apache.spark.streaming.receiver.JavaActorReceiver;
34+
import org.apache.spark.streaming.akka.AkkaUtils;
35+
import org.apache.spark.streaming.akka.JavaActorReceiver;
3536

3637
/**
3738
* A sample actor as receiver, is also simplest. This receiver actor
@@ -56,6 +57,7 @@ public void preStart() {
5657
remotePublisher.tell(new SubscribeReceiver(getSelf()), getSelf());
5758
}
5859

60+
@Override
5961
public void onReceive(Object msg) throws Exception {
6062
store((T) msg);
6163
}
@@ -100,18 +102,20 @@ public static void main(String[] args) {
100102
String feederActorURI = "akka.tcp://test@" + host + ":" + port + "/user/FeederActor";
101103

102104
/*
103-
* Following is the use of actorStream to plug in custom actor as receiver
105+
* Following is the use of AkkaUtils.createStream to plug in custom actor as receiver
104106
*
105107
* An important point to note:
106108
* Since Actor may exist outside the spark framework, It is thus user's responsibility
107109
* to ensure the type safety, i.e type of data received and InputDstream
108110
* should be same.
109111
*
110-
* For example: Both actorStream and JavaSampleActorReceiver are parameterized
112+
* For example: Both AkkaUtils.createStream and JavaSampleActorReceiver are parameterized
111113
* to same type to ensure type safety.
112114
*/
113-
JavaDStream<String> lines = jssc.actorStream(
114-
Props.create(JavaSampleActorReceiver.class, feederActorURI), "SampleReceiver");
115+
JavaDStream<String> lines = AkkaUtils.createStream(
116+
jssc,
117+
Props.create(JavaSampleActorReceiver.class, feederActorURI),
118+
"SampleReceiver");
115119

116120
// compute wordcount
117121
lines.flatMap(new FlatMapFunction<String, String>() {

examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@ import scala.collection.mutable.LinkedList
2222
import scala.reflect.ClassTag
2323
import scala.util.Random
2424

25-
import akka.actor.{actorRef2Scala, Actor, ActorRef, Props}
25+
import akka.actor._
26+
import com.typesafe.config.ConfigFactory
2627

27-
import org.apache.spark.{SecurityManager, SparkConf}
28+
import org.apache.spark.SparkConf
2829
import org.apache.spark.streaming.{Seconds, StreamingContext}
29-
import org.apache.spark.streaming.receiver.ActorReceiver
30-
import org.apache.spark.util.AkkaUtils
30+
import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}
3131

3232
case class SubscribeReceiver(receiverActor: ActorRef)
3333
case class UnsubscribeReceiver(receiverActor: ActorRef)
@@ -78,8 +78,7 @@ class FeederActor extends Actor {
7878
*
7979
* @see [[org.apache.spark.examples.streaming.FeederActor]]
8080
*/
81-
class SampleActorReceiver[T: ClassTag](urlOfPublisher: String)
82-
extends ActorReceiver {
81+
class SampleActorReceiver[T](urlOfPublisher: String) extends ActorReceiver {
8382

8483
lazy private val remotePublisher = context.actorSelection(urlOfPublisher)
8584

@@ -108,9 +107,13 @@ object FeederActor {
108107
}
109108
val Seq(host, port) = args.toSeq
110109

111-
val conf = new SparkConf
112-
val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt, conf = conf,
113-
securityManager = new SecurityManager(conf))._1
110+
val akkaConf = ConfigFactory.parseString(
111+
s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider"
112+
|akka.remote.enabled-transports = ["akka.remote.netty.tcp"]
113+
|akka.remote.netty.tcp.hostname = "$host"
114+
|akka.remote.netty.tcp.port = $port
115+
|""".stripMargin)
116+
val actorSystem = ActorSystem("test", akkaConf)
114117
val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor")
115118

116119
println("Feeder started as:" + feeder)
@@ -121,6 +124,7 @@ object FeederActor {
121124

122125
/**
123126
* A sample word count program demonstrating the use of plugging in
127+
*
124128
* Actor as Receiver
125129
* Usage: ActorWordCount <hostname> <port>
126130
* <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
@@ -146,20 +150,21 @@ object ActorWordCount {
146150
val ssc = new StreamingContext(sparkConf, Seconds(2))
147151

148152
/*
149-
* Following is the use of actorStream to plug in custom actor as receiver
153+
* Following is the use of AkkaUtils.createStream to plug in custom actor as receiver
150154
*
151155
* An important point to note:
152156
* Since Actor may exist outside the spark framework, It is thus user's responsibility
153-
* to ensure the type safety, i.e type of data received and InputDstream
157+
* to ensure the type safety, i.e type of data received and InputDStream
154158
* should be same.
155159
*
156-
* For example: Both actorStream and SampleActorReceiver are parameterized
160+
* For example: Both AkkaUtils.createStream and SampleActorReceiver are parameterized
157161
* to same type to ensure type safety.
158162
*/
159-
160-
val lines = ssc.actorStream[String](
161-
Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
162-
host, port.toInt))), "SampleReceiver")
163+
val lines = AkkaUtils.createStream[String](
164+
ssc,
165+
Props(classOf[SampleActorReceiver[String]],
166+
"akka.tcp://test@%s:%s/user/FeederActor".format(host, port.toInt)),
167+
"SampleReceiver")
163168

164169
// compute wordcount
165170
lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print()

examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ import akka.actor.actorRef2Scala
2525
import akka.util.ByteString
2626
import akka.zeromq._
2727
import akka.zeromq.Subscribe
28+
import com.typesafe.config.ConfigFactory
2829

29-
import org.apache.spark.SparkConf
30+
import org.apache.spark.{SparkConf, TaskContext}
3031
import org.apache.spark.streaming.{Seconds, StreamingContext}
3132
import org.apache.spark.streaming.zeromq._
3233

@@ -69,10 +70,10 @@ object SimpleZeroMQPublisher {
6970
*
7071
* To run this example locally, you may run publisher as
7172
* `$ bin/run-example \
72-
* org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar`
73+
* org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.0.1:1234 foo`
7374
* and run the example as
7475
* `$ bin/run-example \
75-
* org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.1.1:1234 foo`
76+
* org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.0.1:1234 foo`
7677
*/
7778
// scalastyle:on
7879
object ZeroMQWordCount {
@@ -90,7 +91,11 @@ object ZeroMQWordCount {
9091
def bytesToStringIterator(x: Seq[ByteString]): Iterator[String] = x.map(_.utf8String).iterator
9192

9293
// For this stream, a zeroMQ publisher should be running.
93-
val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _)
94+
val lines = ZeroMQUtils.createStream(
95+
ssc,
96+
url,
97+
Subscribe(topic),
98+
bytesToStringIterator _)
9499
val words = lines.flatMap(_.split(" "))
95100
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
96101
wordCounts.print()

external/akka/pom.xml

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Licensed to the Apache Software Foundation (ASF) under one or more
4+
~ contributor license agreements. See the NOTICE file distributed with
5+
~ this work for additional information regarding copyright ownership.
6+
~ The ASF licenses this file to You under the Apache License, Version 2.0
7+
~ (the "License"); you may not use this file except in compliance with
8+
~ the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing, software
13+
~ distributed under the License is distributed on an "AS IS" BASIS,
14+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
~ See the License for the specific language governing permissions and
16+
~ limitations under the License.
17+
-->
18+
19+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
<modelVersion>4.0.0</modelVersion>
21+
<parent>
22+
<groupId>org.apache.spark</groupId>
23+
<artifactId>spark-parent_2.10</artifactId>
24+
<version>2.0.0-SNAPSHOT</version>
25+
<relativePath>../../pom.xml</relativePath>
26+
</parent>
27+
28+
<groupId>org.apache.spark</groupId>
29+
<artifactId>spark-streaming-akka_2.10</artifactId>
30+
<properties>
31+
<sbt.project.name>streaming-akka</sbt.project.name>
32+
</properties>
33+
<packaging>jar</packaging>
34+
<name>Spark Project External Akka</name>
35+
<url>http://spark.apache.org/</url>
36+
37+
<dependencies>
38+
<dependency>
39+
<groupId>org.apache.spark</groupId>
40+
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
41+
<version>${project.version}</version>
42+
<scope>provided</scope>
43+
</dependency>
44+
<dependency>
45+
<groupId>org.apache.spark</groupId>
46+
<artifactId>spark-core_${scala.binary.version}</artifactId>
47+
<version>${project.version}</version>
48+
<type>test-jar</type>
49+
<scope>test</scope>
50+
</dependency>
51+
<dependency>
52+
<groupId>${akka.group}</groupId>
53+
<artifactId>akka-actor_${scala.binary.version}</artifactId>
54+
<version>${akka.version}</version>
55+
</dependency>
56+
<dependency>
57+
<groupId>${akka.group}</groupId>
58+
<artifactId>akka-remote_${scala.binary.version}</artifactId>
59+
<version>${akka.version}</version>
60+
</dependency>
61+
<dependency>
62+
<groupId>org.apache.spark</groupId>
63+
<artifactId>spark-core_${scala.binary.version}</artifactId>
64+
<version>${project.version}</version>
65+
<type>test-jar</type>
66+
<scope>test</scope>
67+
</dependency>
68+
</dependencies>
69+
<build>
70+
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
71+
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
72+
</build>
73+
</project>

0 commit comments

Comments
 (0)