@@ -37,12 +37,6 @@ private[streaming] case class RegisterReceiver(
3737 host : String ,
3838 receiverEndpoint : RpcEndpointRef
3939 ) extends ReceiverTrackerMessage
40- private [streaming] case class ReceiverStarted (
41- streamId : Int ,
42- typ : String ,
43- host : String ,
44- receiverEndpoint : RpcEndpointRef
45- ) extends ReceiverTrackerMessage
4640private [streaming] case class AddBlock (receivedBlockInfo : ReceivedBlockInfo )
4741 extends ReceiverTrackerMessage
4842private [streaming] case class ReportError (streamId : Int , message : String , error : String )
@@ -83,6 +77,12 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
8377 /** State of the tracker */
8478 @ volatile private var trackerState = Initialized
8579
80+ /**
81+ * There is a race condition when stopping receivers and registering receivers happen at the same
82+ * time. This lock is used to eliminate the race condition. See SPARK-5681.
83+ */
84+ private val stoppingTrackerLock = new AnyRef
85+
8686 // endpoint is created when generator starts.
8787 // This not being null means the tracker has been started and not stopped
8888 private var endpoint : RpcEndpointRef = null
@@ -120,8 +120,16 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
120120 /** Stop the receiver execution thread. */
121121 def stop (graceful : Boolean ): Unit = synchronized {
122122 if (isTrackerStarted) {
123- trackerState = Stopping
124123 // First, stop the receivers
124+ // acquire "stoppingTrackerLock" so that setting trackerState to "Stopping" and registering
125+ // receivers won't happen at the same time
126+ stoppingTrackerLock.synchronized {
127+ trackerState = Stopping
128+ if (! skipReceiverLaunch) {
129+ // Send the stop signal to all the receivers
130+ receiverExecutor.stopReceivers()
131+ }
132+ }
125133 if (! skipReceiverLaunch) receiverExecutor.stop(graceful)
126134
127135 // Finally, stop the endpoint
@@ -175,33 +183,25 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
175183 host : String ,
176184 receiverEndpoint : RpcEndpointRef ,
177185 senderAddress : RpcAddress
178- ) {
186+ ): Boolean = {
179187 if (! receiverInputStreamIds.contains(streamId)) {
180188 throw new SparkException (" Register received for unexpected id " + streamId)
181189 }
182- receiverInfo(streamId) = ReceiverInfo (
183- streamId, s " ${typ}- ${streamId}" , receiverEndpoint, true , host)
184- listenerBus.post(StreamingListenerReceiverRegistered (receiverInfo(streamId)))
185- logInfo(" Registered receiver for stream " + streamId + " from " + senderAddress)
186- }
187-
188- /** Receiver started */
189- private def receiverStarted (
190- streamId : Int ,
191- typ : String ,
192- host : String ,
193- receiverEndpoint : RpcEndpointRef ,
194- senderAddress : RpcAddress
195- ) {
196- if (! receiverInputStreamIds.contains(streamId)) {
197- throw new SparkException (" Start received for unexpected id " + streamId)
190+ // acquire "stoppingTrackerLock" so that setting trackerState to "Stopping" and registering
191+ // receivers won't happen at the same time
192+ stoppingTrackerLock.synchronized {
193+ if (isTrackerStopping || isTrackerStopped) {
194+ false
195+ } else {
196+ receiverInfo(streamId) = ReceiverInfo (
197+ streamId, s " ${typ}- ${streamId}" , receiverEndpoint, true , host)
198+ listenerBus.post(StreamingListenerReceiverStarted (receiverInfo(streamId)))
199+ logInfo(" Registered receiver for stream " + streamId + " from " + senderAddress)
200+ true
201+ }
198202 }
199- receiverInfo(streamId) = ReceiverInfo (
200- streamId, s " ${typ}- ${streamId}" , receiverEndpoint, true , host)
201- listenerBus.post(StreamingListenerReceiverStarted (receiverInfo(streamId)))
202- logInfo(" Receiver started for stream " + streamId + " from " + senderAddress)
203203 }
204-
204+
205205 /** Deregister a receiver */
206206 private def deregisterReceiver (streamId : Int , message : String , error : String ) {
207207 val newReceiverInfo = receiverInfo.get(streamId) match {
@@ -267,19 +267,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
267267
268268 override def receiveAndReply (context : RpcCallContext ): PartialFunction [Any , Unit ] = {
269269 case RegisterReceiver (streamId, typ, host, receiverEndpoint) =>
270- if ( ! isTrackerStopping) {
270+ val successful =
271271 registerReceiver(streamId, typ, host, receiverEndpoint, context.sender.address)
272- context.reply(true )
273- } else {
274- context.reply(false )
275- }
276- case ReceiverStarted (streamId, typ, host, receiverEndpoint) =>
277- if (! isTrackerStopping) {
278- receiverStarted(streamId, typ, host, receiverEndpoint, context.sender.address)
279- context.reply(true )
280- } else {
281- context.reply(false )
282- }
272+ context.reply(successful)
283273 case AddBlock (receivedBlockInfo) =>
284274 context.reply(addBlock(receivedBlockInfo))
285275 case DeregisterReceiver (streamId, message, error) =>
@@ -308,9 +298,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
308298 }
309299
310300 def stop (graceful : Boolean ) {
311- // Send the stop signal to all the receivers
312- stopReceivers()
313-
314301 // Wait for the Spark job that runs the receivers to be over
315302 // That is, for the receivers to quit gracefully.
316303 thread.join(10000 )
@@ -385,7 +372,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
385372 }
386373
387374 /** Stops the receivers. */
388- private def stopReceivers () {
375+ def stopReceivers () {
389376 // Signal the receivers to stop
390377 receiverInfo.values.flatMap { info => Option (info.endpoint)}
391378 .foreach { _.send(StopReceiver ) }
0 commit comments