From 77983f35c634083d5a0398bd86f6f594221bba7a Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 9 Feb 2015 15:43:18 +0800 Subject: [PATCH 1/5] Add tracker status and stop to receive messages when stopping tracker. --- .../receiver/ReceiverSupervisor.scala | 4 +- .../streaming/scheduler/ReceiverTracker.scala | 64 +++++++++++++++---- 2 files changed, 53 insertions(+), 15 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala index 1f0244c251eb..105cd9b1cc72 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala @@ -36,7 +36,7 @@ private[streaming] abstract class ReceiverSupervisor( conf: SparkConf ) extends Logging { - /** Enumeration to identify current state of the StreamingContext */ + /** Enumeration to identify current state of the Receiver */ object ReceiverState extends Enumeration { type CheckpointState = Value val Initialized, Started, Stopped = Value @@ -161,7 +161,7 @@ private[streaming] abstract class ReceiverSupervisor( } } - /** Check if receiver has been marked for stopping */ + /** Check if receiver has been marked for starting */ def isReceiverStarted() = { logDebug("state = " + receiverState) receiverState == Started diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index b36aeb341d25..285ce74f5f0f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -67,13 +67,39 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false ) private val listenerBus = ssc.scheduler.listenerBus + + /** Enumeration to identify current state of the ReceiverTracker */ + object TrackerState extends Enumeration { + type CheckpointState = Value + val Initialized, Started, Stopping, Stopped = Value + } + import TrackerState._ + + /** State of the tracker */ + @volatile private[streaming] var trackerState = Initialized + // actor is created when generator starts. // This not being null means the tracker has been started and not stopped private var actor: ActorRef = null + /** Check if tracker has been marked for starting */ + def isTrackerStarted() = { + trackerState == Started + } + + /** Check if tracker has been marked for stopping */ + def isTrackerStopping() = { + trackerState == Stopping + } + + /** Check if tracker has been marked for stopped */ + def isTrackerStopped() = { + trackerState == Stopped + } + /** Start the actor and receiver execution thread. */ def start() = synchronized { - if (actor != null) { + if (isTrackerStarted) { throw new SparkException("ReceiverTracker already started") } @@ -82,12 +108,15 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false "ReceiverTracker") if (!skipReceiverLaunch) receiverExecutor.start() logInfo("ReceiverTracker started") + trackerState = Started } } /** Stop the receiver execution thread. */ def stop(graceful: Boolean) = synchronized { - if (!receiverInputStreams.isEmpty && actor != null) { + if (isTrackerStarted) { + trackerState = Stopping + // First, stop the receivers if (!skipReceiverLaunch) receiverExecutor.stop(graceful) @@ -96,6 +125,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false actor = null receivedBlockTracker.stop() logInfo("ReceiverTracker stopped") + trackerState = Stopped } } @@ -202,16 +232,21 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false /** Actor to receive messages from the receivers. */ private class ReceiverTrackerActor extends Actor { def receive = { - case RegisterReceiver(streamId, typ, host, receiverActor) => - registerReceiver(streamId, typ, host, receiverActor, sender) - sender ! true - case AddBlock(receivedBlockInfo) => - sender ! addBlock(receivedBlockInfo) - case ReportError(streamId, message, error) => - reportError(streamId, message, error) - case DeregisterReceiver(streamId, message, error) => - deregisterReceiver(streamId, message, error) - sender ! true + // Actor stops to receive when tracker is stopping + if (!isTrackerStopping) { + case RegisterReceiver(streamId, typ, host, receiverActor) => + registerReceiver(streamId, typ, host, receiverActor, sender) + sender ! true + case AddBlock(receivedBlockInfo) => + sender ! addBlock(receivedBlockInfo) + case ReportError(streamId, message, error) => + reportError(streamId, message, error) + case DeregisterReceiver(streamId, message, error) => + deregisterReceiver(streamId, message, error) + sender ! true + } else { + case _ => sender ! false + } } } @@ -219,6 +254,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false class ReceiverLauncher { @transient val env = ssc.env @volatile @transient private var running = false + @transient private val TIMEOUT = 10000 @transient val thread = new Thread() { override def run() { try { @@ -244,10 +280,12 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false if (graceful) { val pollTime = 100 + var elapsedTime = 0 def done = { receiverInfo.isEmpty && !running } logInfo("Waiting for receiver job to terminate gracefully") - while(!done) { + while(!done && elapsedTime < TIMEOUT) { Thread.sleep(pollTime) + elapsedTime += pollTime } logInfo("Waited for receiver job to terminate gracefully") } From 3d568e8ecac5ef53a96ae463e496cf51813eedfd Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 11 Feb 2015 12:26:40 +0800 Subject: [PATCH 2/5] Let receivers get registered first before going started. --- .../receiver/ReceiverSupervisor.scala | 2 +- .../streaming/scheduler/ReceiverTracker.scala | 32 +++++++++---------- 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala index 105cd9b1cc72..afe7c936a963 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala @@ -117,10 +117,10 @@ private[streaming] abstract class ReceiverSupervisor( /** Start receiver */ def startReceiver(): Unit = synchronized { try { + onReceiverStart() logInfo("Starting receiver") receiver.onStart() logInfo("Called receiver onStart") - onReceiverStart() receiverState = Started } catch { case t: Throwable => diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 285ce74f5f0f..f2f12d5b2569 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -76,24 +76,24 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false import TrackerState._ /** State of the tracker */ - @volatile private[streaming] var trackerState = Initialized + @volatile private var trackerState = Initialized // actor is created when generator starts. // This not being null means the tracker has been started and not stopped private var actor: ActorRef = null /** Check if tracker has been marked for starting */ - def isTrackerStarted() = { + private def isTrackerStarted() = { trackerState == Started } /** Check if tracker has been marked for stopping */ - def isTrackerStopping() = { + private def isTrackerStopping() = { trackerState == Stopping } /** Check if tracker has been marked for stopped */ - def isTrackerStopped() = { + private def isTrackerStopped() = { trackerState == Stopped } @@ -232,21 +232,19 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false /** Actor to receive messages from the receivers. */ private class ReceiverTrackerActor extends Actor { def receive = { - // Actor stops to receive when tracker is stopping - if (!isTrackerStopping) { - case RegisterReceiver(streamId, typ, host, receiverActor) => + case RegisterReceiver(streamId, typ, host, receiverActor) => + // Actor stops to accept registering when tracker is stopping + if (!isTrackerStopping) { registerReceiver(streamId, typ, host, receiverActor, sender) sender ! true - case AddBlock(receivedBlockInfo) => - sender ! addBlock(receivedBlockInfo) - case ReportError(streamId, message, error) => - reportError(streamId, message, error) - case DeregisterReceiver(streamId, message, error) => - deregisterReceiver(streamId, message, error) - sender ! true - } else { - case _ => sender ! false - } + } + case AddBlock(receivedBlockInfo) => + sender ! addBlock(receivedBlockInfo) + case ReportError(streamId, message, error) => + reportError(streamId, message, error) + case DeregisterReceiver(streamId, message, error) => + deregisterReceiver(streamId, message, error) + sender ! true } } From 355f9ce34cb0d01048b15c96d5028fb291604286 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 11 Feb 2015 14:19:43 +0800 Subject: [PATCH 3/5] Separate register and start events for receivers. --- .../receiver/ReceiverSupervisor.scala | 8 +++-- .../receiver/ReceiverSupervisorImpl.scala | 11 +++++-- .../streaming/scheduler/ReceiverTracker.scala | 33 +++++++++++++++++-- .../scheduler/StreamingListener.scala | 11 +++++-- .../scheduler/StreamingListenerBus.scala | 2 ++ 5 files changed, 57 insertions(+), 8 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala index afe7c936a963..7b8e5d5c60cc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala @@ -93,7 +93,10 @@ private[streaming] abstract class ReceiverSupervisor( /** Called when supervisor is stopped */ protected def onStop(message: String, error: Option[Throwable]) { } - + + /** Called when receiver is registered */ + protected def onReceiverRegister() { } + /** Called when receiver is started */ protected def onReceiverStart() { } @@ -117,10 +120,11 @@ private[streaming] abstract class ReceiverSupervisor( /** Start receiver */ def startReceiver(): Unit = synchronized { try { - onReceiverStart() + onReceiverRegister() logInfo("Starting receiver") receiver.onStart() logInfo("Called receiver onStart") + onReceiverStart() receiverState = Started } catch { case t: Throwable => diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 7d29ed88cfcb..c1c75a183183 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -182,13 +182,20 @@ private[streaming] class ReceiverSupervisorImpl( blockGenerator.stop() env.actorSystem.stop(actor) } - - override protected def onReceiverStart() { + + override protected def onReceiverRegister() { val msg = RegisterReceiver( streamId, receiver.getClass.getSimpleName, Utils.localHostName(), actor) val future = trackerActor.ask(msg)(askTimeout) Await.result(future, askTimeout) } + + override protected def onReceiverStart() { + val msg = ReceiverStarted( + streamId, receiver.getClass.getSimpleName, Utils.localHostName(), actor) + val future = trackerActor.ask(msg)(askTimeout) + Await.result(future, askTimeout) + } override protected def onReceiverStop(message: String, error: Option[Throwable]) { logInfo("Deregistering receiver " + streamId) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index f2f12d5b2569..19a9a7ae7985 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -38,6 +38,12 @@ private[streaming] case class RegisterReceiver( host: String, receiverActor: ActorRef ) extends ReceiverTrackerMessage +private[streaming] case class ReceiverStarted( + streamId: Int, + typ: String, + host: String, + receiverActor: ActorRef + ) extends ReceiverTrackerMessage private[streaming] case class AddBlock(receivedBlockInfo: ReceivedBlockInfo) extends ReceiverTrackerMessage private[streaming] case class ReportError(streamId: Int, message: String, error: String) @@ -177,10 +183,27 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false } receiverInfo(streamId) = ReceiverInfo( streamId, s"${typ}-${streamId}", receiverActor, true, host) - listenerBus.post(StreamingListenerReceiverStarted(receiverInfo(streamId))) + listenerBus.post(StreamingListenerReceiverRegistered(receiverInfo(streamId))) logInfo("Registered receiver for stream " + streamId + " from " + sender.path.address) } - + + /** Receiver started */ + private def receiverStarted( + streamId: Int, + typ: String, + host: String, + receiverActor: ActorRef, + sender: ActorRef + ) { + if (!receiverInputStreamIds.contains(streamId)) { + throw new SparkException("Start received for unexpected id " + streamId) + } + receiverInfo(streamId) = ReceiverInfo( + streamId, s"${typ}-${streamId}", receiverActor, true, host) + listenerBus.post(StreamingListenerReceiverStarted(receiverInfo(streamId))) + logInfo("Receiver started for stream " + streamId + " from " + sender.path.address) + } + /** Deregister a receiver */ private def deregisterReceiver(streamId: Int, message: String, error: String) { val newReceiverInfo = receiverInfo.get(streamId) match { @@ -238,6 +261,12 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false registerReceiver(streamId, typ, host, receiverActor, sender) sender ! true } + case ReceiverStarted(streamId, typ, host, receiverActor) => + // Actor stops to accept starting when tracker is stopping + if (!isTrackerStopping) { + receiverStarted(streamId, typ, host, receiverActor, sender) + sender ! true + } case AddBlock(receivedBlockInfo) => sender ! addBlock(receivedBlockInfo) case ReportError(streamId, message, error) => diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala index 74dbba453f02..0838dbb6ec39 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala @@ -37,7 +37,11 @@ case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends Streami @DeveloperApi case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent - + +@DeveloperApi +case class StreamingListenerReceiverRegistered(receiverInfo: ReceiverInfo) + extends StreamingListenerEvent + @DeveloperApi case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo) extends StreamingListenerEvent @@ -57,7 +61,10 @@ case class StreamingListenerReceiverStopped(receiverInfo: ReceiverInfo) */ @DeveloperApi trait StreamingListener { - + + /** Called when a receiver has been registered */ + def onReceiverRegistered(receiverRegistered: StreamingListenerReceiverRegistered) { } + /** Called when a receiver has been started */ def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala index b07d6cf347ca..9febd7f44629 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala @@ -31,6 +31,8 @@ private[spark] class StreamingListenerBus override def onPostEvent(listener: StreamingListener, event: StreamingListenerEvent): Unit = { event match { + case receiverRegistered: StreamingListenerReceiverRegistered => + listener.onReceiverRegistered(receiverRegistered) case receiverStarted: StreamingListenerReceiverStarted => listener.onReceiverStarted(receiverStarted) case receiverError: StreamingListenerReceiverError => From c419677c97d3c9cbe87540353c8df196fde7ecc0 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 7 Apr 2015 15:40:25 +0800 Subject: [PATCH 4/5] Fix style. --- .../org/apache/spark/streaming/scheduler/ReceiverTracker.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 6f4e6b23d6c8..ae990498534d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -308,7 +308,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false if (graceful) { val pollTime = 100 var elapsedTime = 0 - def done = { receiverInfo.isEmpty && !running } + def done: Boolean = { receiverInfo.isEmpty && !running } logInfo("Waiting for receiver job to terminate gracefully") while(!done && elapsedTime < TIMEOUT) { Thread.sleep(pollTime) From 19b76d954378dcddfa751ffa19ec4edd61769f6c Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 10 May 2015 22:53:24 +0800 Subject: [PATCH 5/5] Remove timeout. --- .../apache/spark/streaming/scheduler/ReceiverTracker.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 3226b819c5d0..05888d4ab061 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -284,7 +284,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false class ReceiverLauncher { @transient val env = ssc.env @volatile @transient private var running = false - @transient private val TIMEOUT = 10000 @transient val thread = new Thread() { override def run() { try { @@ -310,12 +309,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false if (graceful) { val pollTime = 100 - var elapsedTime = 0 - def done: Boolean = { receiverInfo.isEmpty && !running } logInfo("Waiting for receiver job to terminate gracefully") - while(!done && elapsedTime < TIMEOUT) { + while (receiverInfo.nonEmpty || running) { Thread.sleep(pollTime) - elapsedTime += pollTime } logInfo("Waited for receiver job to terminate gracefully") }