Skip to content

Commit 355f9ce

Browse files
committed
Separate register and start events for receivers.
1 parent 3d568e8 commit 355f9ce

File tree

5 files changed

+57
-8
lines changed

5 files changed

+57
-8
lines changed

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,10 @@ private[streaming] abstract class ReceiverSupervisor(
9393

9494
/** Called when supervisor is stopped */
9595
protected def onStop(message: String, error: Option[Throwable]) { }
96-
96+
97+
/** Called when receiver is registered */
98+
protected def onReceiverRegister() { }
99+
97100
/** Called when receiver is started */
98101
protected def onReceiverStart() { }
99102

@@ -117,10 +120,11 @@ private[streaming] abstract class ReceiverSupervisor(
117120
/** Start receiver */
118121
def startReceiver(): Unit = synchronized {
119122
try {
120-
onReceiverStart()
123+
onReceiverRegister()
121124
logInfo("Starting receiver")
122125
receiver.onStart()
123126
logInfo("Called receiver onStart")
127+
onReceiverStart()
124128
receiverState = Started
125129
} catch {
126130
case t: Throwable =>

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,13 +182,20 @@ private[streaming] class ReceiverSupervisorImpl(
182182
blockGenerator.stop()
183183
env.actorSystem.stop(actor)
184184
}
185-
186-
override protected def onReceiverStart() {
185+
186+
override protected def onReceiverRegister() {
187187
val msg = RegisterReceiver(
188188
streamId, receiver.getClass.getSimpleName, Utils.localHostName(), actor)
189189
val future = trackerActor.ask(msg)(askTimeout)
190190
Await.result(future, askTimeout)
191191
}
192+
193+
override protected def onReceiverStart() {
194+
val msg = ReceiverStarted(
195+
streamId, receiver.getClass.getSimpleName, Utils.localHostName(), actor)
196+
val future = trackerActor.ask(msg)(askTimeout)
197+
Await.result(future, askTimeout)
198+
}
192199

193200
override protected def onReceiverStop(message: String, error: Option[Throwable]) {
194201
logInfo("Deregistering receiver " + streamId)

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@ private[streaming] case class RegisterReceiver(
3838
host: String,
3939
receiverActor: ActorRef
4040
) extends ReceiverTrackerMessage
41+
private[streaming] case class ReceiverStarted(
42+
streamId: Int,
43+
typ: String,
44+
host: String,
45+
receiverActor: ActorRef
46+
) extends ReceiverTrackerMessage
4147
private[streaming] case class AddBlock(receivedBlockInfo: ReceivedBlockInfo)
4248
extends ReceiverTrackerMessage
4349
private[streaming] case class ReportError(streamId: Int, message: String, error: String)
@@ -177,10 +183,27 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
177183
}
178184
receiverInfo(streamId) = ReceiverInfo(
179185
streamId, s"${typ}-${streamId}", receiverActor, true, host)
180-
listenerBus.post(StreamingListenerReceiverStarted(receiverInfo(streamId)))
186+
listenerBus.post(StreamingListenerReceiverRegistered(receiverInfo(streamId)))
181187
logInfo("Registered receiver for stream " + streamId + " from " + sender.path.address)
182188
}
183-
189+
190+
/** Receiver started */
191+
private def receiverStarted(
192+
streamId: Int,
193+
typ: String,
194+
host: String,
195+
receiverActor: ActorRef,
196+
sender: ActorRef
197+
) {
198+
if (!receiverInputStreamIds.contains(streamId)) {
199+
throw new SparkException("Start received for unexpected id " + streamId)
200+
}
201+
receiverInfo(streamId) = ReceiverInfo(
202+
streamId, s"${typ}-${streamId}", receiverActor, true, host)
203+
listenerBus.post(StreamingListenerReceiverStarted(receiverInfo(streamId)))
204+
logInfo("Receiver started for stream " + streamId + " from " + sender.path.address)
205+
}
206+
184207
/** Deregister a receiver */
185208
private def deregisterReceiver(streamId: Int, message: String, error: String) {
186209
val newReceiverInfo = receiverInfo.get(streamId) match {
@@ -238,6 +261,12 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
238261
registerReceiver(streamId, typ, host, receiverActor, sender)
239262
sender ! true
240263
}
264+
case ReceiverStarted(streamId, typ, host, receiverActor) =>
265+
// Actor stops to accept starting when tracker is stopping
266+
if (!isTrackerStopping) {
267+
receiverStarted(streamId, typ, host, receiverActor, sender)
268+
sender ! true
269+
}
241270
case AddBlock(receivedBlockInfo) =>
242271
sender ! addBlock(receivedBlockInfo)
243272
case ReportError(streamId, message, error) =>

streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,11 @@ case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends Streami
3737

3838
@DeveloperApi
3939
case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent
40-
40+
41+
@DeveloperApi
42+
case class StreamingListenerReceiverRegistered(receiverInfo: ReceiverInfo)
43+
extends StreamingListenerEvent
44+
4145
@DeveloperApi
4246
case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo)
4347
extends StreamingListenerEvent
@@ -57,7 +61,10 @@ case class StreamingListenerReceiverStopped(receiverInfo: ReceiverInfo)
5761
*/
5862
@DeveloperApi
5963
trait StreamingListener {
60-
64+
65+
/** Called when a receiver has been registered */
66+
def onReceiverRegistered(receiverRegistered: StreamingListenerReceiverRegistered) { }
67+
6168
/** Called when a receiver has been started */
6269
def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }
6370

streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ private[spark] class StreamingListenerBus
3131

3232
override def onPostEvent(listener: StreamingListener, event: StreamingListenerEvent): Unit = {
3333
event match {
34+
case receiverRegistered: StreamingListenerReceiverRegistered =>
35+
listener.onReceiverRegistered(receiverRegistered)
3436
case receiverStarted: StreamingListenerReceiverStarted =>
3537
listener.onReceiverStarted(receiverStarted)
3638
case receiverError: StreamingListenerReceiverError =>

0 commit comments

Comments
 (0)