From 59ff0dc99d30e305dde6bbda6684d0ffcc74857c Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 20 Jan 2016 16:52:16 -0800 Subject: [PATCH 1/2] Add linking instructions for streaming-akka project --- docs/streaming-custom-receivers.md | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md index 95b99862ec06..d4f14ba5d25c 100644 --- a/docs/streaming-custom-receivers.md +++ b/docs/streaming-custom-receivers.md @@ -257,12 +257,22 @@ The following table summarizes the characteristics of both types of receivers ## Implementing and Using a Custom Actor-based Receiver +Custom [Akka Actors](http://doc.akka.io/docs/akka/2.3.11/scala/actors.html) can also be used to +receive data. Here are the instructions. + +1. **Linking:** You need to add the following dependency to your SBT or Maven project (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information). + + groupId = org.apache.spark + artifactId = spark-streaming-akka_{{site.SCALA_BINARY_VERSION}} + version = {{site.SPARK_VERSION_SHORT}} + +2. **Programming:** +
-Custom [Akka Actors](http://doc.akka.io/docs/akka/2.3.11/scala/actors.html) can also be used to -receive data. Extending [`ActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.ActorReceiver) -allows received data to be stored in Spark using `store(...)` methods. The supervisor strategy of +You need to extend [`ActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.ActorReceiver) +so as to store received data into Spark using `store(...)` methods. The supervisor strategy of this actor can be configured to handle failures, etc. {% highlight scala %} @@ -283,9 +293,8 @@ See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/
-Custom [Akka UntypedActors](http://doc.akka.io/docs/akka/2.3.11/java/untyped-actors.html) can also be used to -receive data. Extending [`JavaActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.JavaActorReceiver) -allows received data to be stored in Spark using `store(...)` methods. The supervisor strategy of +You need to extend [`JavaActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.JavaActorReceiver) +so as to store received data into Spark using `store(...)` methods. The supervisor strategy of this actor can be configured to handle failures, etc. {% highlight java %} From 448007b2e928a6046a64a599a6831a0452fbd69b Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 20 Jan 2016 17:18:30 -0800 Subject: [PATCH 2/2] Add deploying instruction --- docs/streaming-custom-receivers.md | 92 +++++++++++++++--------------- 1 file changed, 45 insertions(+), 47 deletions(-) diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md index d4f14ba5d25c..84547748618d 100644 --- a/docs/streaming-custom-receivers.md +++ b/docs/streaming-custom-receivers.md @@ -268,52 +268,50 @@ receive data. Here are the instructions. 2. **Programming:** -
-
- -You need to extend [`ActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.ActorReceiver) -so as to store received data into Spark using `store(...)` methods. The supervisor strategy of -this actor can be configured to handle failures, etc. - -{% highlight scala %} - -class CustomActor extends ActorReceiver { - def receive = { - case data: String => store(data) - } -} - -// A new input stream can be created with this custom actor as -val ssc: StreamingContext = ... -val lines = AkkaUtils.createStream[String](ssc, Props[CustomActor](), "CustomReceiver") - -{% endhighlight %} - -See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala) for an end-to-end example. -
-
- -You need to extend [`JavaActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.JavaActorReceiver) -so as to store received data into Spark using `store(...)` methods. The supervisor strategy of -this actor can be configured to handle failures, etc. - -{% highlight java %} - -class CustomActor extends JavaActorReceiver { - @Override - public void onReceive(Object msg) throws Exception { - store((String) msg); - } -} - -// A new input stream can be created with this custom actor as -JavaStreamingContext jssc = ...; -JavaDStream lines = AkkaUtils.createStream(jssc, Props.create(CustomActor.class), "CustomReceiver"); - -{% endhighlight %} - -See [JavaActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/JavaActorWordCount.scala) for an end-to-end example. -
-
+
+
+ + You need to extend [`ActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.ActorReceiver) + so as to store received data into Spark using `store(...)` methods. The supervisor strategy of + this actor can be configured to handle failures, etc. + + class CustomActor extends ActorReceiver { + def receive = { + case data: String => store(data) + } + } + + // A new input stream can be created with this custom actor as + val ssc: StreamingContext = ... + val lines = AkkaUtils.createStream[String](ssc, Props[CustomActor](), "CustomReceiver") + + See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala) for an end-to-end example. +
+
+ + You need to extend [`JavaActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.JavaActorReceiver) + so as to store received data into Spark using `store(...)` methods. The supervisor strategy of + this actor can be configured to handle failures, etc. + + class CustomActor extends JavaActorReceiver { + @Override + public void onReceive(Object msg) throws Exception { + store((String) msg); + } + } + + // A new input stream can be created with this custom actor as + JavaStreamingContext jssc = ...; + JavaDStream lines = AkkaUtils.createStream(jssc, Props.create(CustomActor.class), "CustomReceiver"); + + See [JavaActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/JavaActorWordCount.scala) for an end-to-end example. +
+
+ +3. **Deploying:** As with any Spark applications, `spark-submit` is used to launch your application. +You need to package `spark-streaming-akka_{{site.SCALA_BINARY_VERSION}}` and its dependencies into +the application JAR. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` +are marked as `provided` dependencies as those are already present in a Spark installation. Then +use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide). Python API Since actors are available only in the Java and Scala libraries, AkkaUtils is not available in the Python API.