Skip to content

Commit a36cc48

Browse files
committed
Refactored the NetworkReceiver API for future stability.
1 parent ada310a commit a36cc48

File tree

12 files changed

+399
-227
lines changed

12 files changed

+399
-227
lines changed

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.apache.spark.util.Utils
3434
import org.apache.spark.storage.StorageLevel
3535
import org.apache.spark.streaming.StreamingContext
3636
import org.apache.spark.streaming.dstream._
37+
import org.apache.spark.Logging
3738

3839
private[streaming]
3940
class FlumeInputDStream[T: ClassTag](
@@ -115,13 +116,13 @@ private[streaming] object SparkFlumeEvent {
115116
private[streaming]
116117
class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
117118
override def append(event : AvroFlumeEvent) : Status = {
118-
receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event)
119+
receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event))
119120
Status.OK
120121
}
121122

122123
override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = {
123124
events.foreach (event =>
124-
receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event))
125+
receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event)))
125126
Status.OK
126127
}
127128
}
@@ -133,23 +134,21 @@ class FlumeReceiver(
133134
host: String,
134135
port: Int,
135136
storageLevel: StorageLevel
136-
) extends NetworkReceiver[SparkFlumeEvent] {
137+
) extends NetworkReceiver[SparkFlumeEvent](storageLevel) with Logging {
137138

138-
lazy val blockGenerator = new BlockGenerator(storageLevel)
139+
lazy val responder = new SpecificResponder(
140+
classOf[AvroSourceProtocol], new FlumeEventServer(this))
141+
lazy val server = new NettyServer(responder, new InetSocketAddress(host, port))
139142

140-
protected override def onStart() {
141-
val responder = new SpecificResponder(
142-
classOf[AvroSourceProtocol], new FlumeEventServer(this))
143-
val server = new NettyServer(responder, new InetSocketAddress(host, port))
144-
blockGenerator.start()
143+
def onStart() {
145144
server.start()
146145
logInfo("Flume receiver started")
147146
}
148147

149-
protected override def onStop() {
150-
blockGenerator.stop()
148+
def onStop() {
149+
server.close()
151150
logInfo("Flume receiver stopped")
152151
}
153152

154-
override def getLocationPreference = Some(host)
153+
override def preferredLocation = Some(host)
155154
}

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,21 +70,15 @@ class KafkaReceiver[
7070
kafkaParams: Map[String, String],
7171
topics: Map[String, Int],
7272
storageLevel: StorageLevel
73-
) extends NetworkReceiver[Any] {
73+
) extends NetworkReceiver[Any](storageLevel) with Logging {
7474

75-
// Handles pushing data into the BlockManager
76-
lazy protected val blockGenerator = new BlockGenerator(storageLevel)
7775
// Connection to Kafka
7876
var consumerConnector : ConsumerConnector = null
7977

80-
def onStop() {
81-
blockGenerator.stop()
82-
}
78+
def onStop() { }
8379

8480
def onStart() {
8581

86-
blockGenerator.start()
87-
8882
// In case we are using multiple Threads to handle Kafka Messages
8983
val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))
9084

@@ -130,7 +124,7 @@ class KafkaReceiver[
130124
def run() {
131125
logInfo("Starting MessageHandler.")
132126
for (msgAndMetadata <- stream) {
133-
blockGenerator += (msgAndMetadata.key, msgAndMetadata.message)
127+
store((msgAndMetadata.key, msgAndMetadata.message))
134128
}
135129
}
136130
}

external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -49,38 +49,34 @@ import org.apache.spark.streaming.dstream._
4949
*/
5050

5151
private[streaming]
52-
class MQTTInputDStream[T: ClassTag](
52+
class MQTTInputDStream(
5353
@transient ssc_ : StreamingContext,
5454
brokerUrl: String,
5555
topic: String,
5656
storageLevel: StorageLevel
57-
) extends NetworkInputDStream[T](ssc_) with Logging {
57+
) extends NetworkInputDStream[String](ssc_) with Logging {
5858

59-
def getReceiver(): NetworkReceiver[T] = {
60-
new MQTTReceiver(brokerUrl, topic, storageLevel).asInstanceOf[NetworkReceiver[T]]
59+
def getReceiver(): NetworkReceiver[String] = {
60+
new MQTTReceiver(brokerUrl, topic, storageLevel)
6161
}
6262
}
6363

6464
private[streaming]
65-
class MQTTReceiver(brokerUrl: String,
66-
topic: String,
67-
storageLevel: StorageLevel
68-
) extends NetworkReceiver[Any] {
69-
lazy protected val blockGenerator = new BlockGenerator(storageLevel)
70-
71-
def onStop() {
72-
blockGenerator.stop()
73-
}
65+
class MQTTReceiver(
66+
brokerUrl: String,
67+
topic: String,
68+
storageLevel: StorageLevel
69+
) extends NetworkReceiver[String](storageLevel) {
70+
71+
def onStop() { }
7472

7573
def onStart() {
7674

77-
blockGenerator.start()
78-
7975
// Set up persistence for messages
80-
var peristance: MqttClientPersistence = new MemoryPersistence()
76+
val peristance: MqttClientPersistence = new MemoryPersistence()
8177

8278
// Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
83-
var client: MqttClient = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance)
79+
val client: MqttClient = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance)
8480

8581
// Connect to MqttBroker
8682
client.connect()
@@ -89,18 +85,19 @@ class MQTTReceiver(brokerUrl: String,
8985
client.subscribe(topic)
9086

9187
// Callback automatically triggers as and when new message arrives on specified topic
92-
var callback: MqttCallback = new MqttCallback() {
88+
val callback: MqttCallback = new MqttCallback() {
9389

9490
// Handles Mqtt message
9591
override def messageArrived(arg0: String, arg1: MqttMessage) {
96-
blockGenerator += new String(arg1.getPayload())
92+
store(new String(arg1.getPayload()))
9793
}
9894

9995
override def deliveryComplete(arg0: IMqttDeliveryToken) {
10096
}
10197

10298
override def connectionLost(arg0: Throwable) {
103-
logInfo("Connection lost " + arg0)
99+
store("Connection lost " + arg0)
100+
stopOnError(new Exception(arg0))
104101
}
105102
}
106103

external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ object MQTTUtils {
3737
topic: String,
3838
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
3939
): DStream[String] = {
40-
new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel)
40+
new MQTTInputDStream(ssc, brokerUrl, topic, storageLevel)
4141
}
4242

4343
/**

external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import twitter4j.auth.OAuthAuthorization
2525
import org.apache.spark.streaming._
2626
import org.apache.spark.streaming.dstream._
2727
import org.apache.spark.storage.StorageLevel
28+
import org.apache.spark.Logging
2829

2930
/* A stream of Twitter statuses, potentially filtered by one or more keywords.
3031
*
@@ -59,17 +60,15 @@ class TwitterReceiver(
5960
twitterAuth: Authorization,
6061
filters: Seq[String],
6162
storageLevel: StorageLevel
62-
) extends NetworkReceiver[Status] {
63+
) extends NetworkReceiver[Status](storageLevel) with Logging {
6364

6465
var twitterStream: TwitterStream = _
65-
lazy val blockGenerator = new BlockGenerator(storageLevel)
6666

67-
protected override def onStart() {
68-
blockGenerator.start()
67+
def onStart() {
6968
twitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
7069
twitterStream.addListener(new StatusListener {
7170
def onStatus(status: Status) = {
72-
blockGenerator += status
71+
store(status)
7372
}
7473
// Unimplemented
7574
def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
@@ -89,8 +88,7 @@ class TwitterReceiver(
8988
logInfo("Twitter receiver started")
9089
}
9190

92-
protected override def onStop() {
93-
blockGenerator.stop()
91+
def onStop() {
9492
twitterStream.shutdown()
9593
logInfo("Twitter receiver stopped")
9694
}

0 commit comments

Comments
 (0)