Skip to content

Commit a178d37

Browse files
committed
Move 'stopReceivers' to the event looop to resolve the race condition
1 parent 51fb07e commit a178d37

File tree

2 files changed

+34
-44
lines changed

2 files changed

+34
-44
lines changed

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ private[streaming] abstract class ReceiverSupervisor(
9898
/** Called when supervisor is stopped */
9999
protected def onStop(message: String, error: Option[Throwable]) { }
100100

101-
/** Called when receiver is started. Return if the driver accepts us */
101+
/** Called when receiver is started. Return true if the driver accepts us */
102102
protected def onReceiverStart(): Boolean = true
103103

104104
/** Called when receiver is stopped */

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala

Lines changed: 33 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package org.apache.spark.streaming.scheduler
2020
import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedMap}
2121
import scala.language.existentials
2222
import scala.math.max
23-
import org.apache.spark.rdd._
2423

2524
import org.apache.spark.streaming.util.WriteAheadLogUtils
2625
import org.apache.spark.{Logging, SparkEnv, SparkException}
@@ -47,6 +46,8 @@ private[streaming] case class ReportError(streamId: Int, message: String, error:
4746
private[streaming] case class DeregisterReceiver(streamId: Int, msg: String, error: String)
4847
extends ReceiverTrackerMessage
4948

49+
private[streaming] case object StopAllReceivers extends ReceiverTrackerMessage
50+
5051
/**
5152
* This class manages the execution of the receivers of ReceiverInputDStreams. Instance of
5253
* this class must be created after all input streams have been added and StreamingContext.start()
@@ -79,29 +80,20 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
7980
import TrackerState._
8081

8182
/** State of the tracker. Protected by "trackerStateLock" */
82-
private var trackerState = Initialized
83-
84-
/** "trackerStateLock" is used to protect reading/writing "trackerState" */
85-
private val trackerStateLock = new AnyRef
83+
@volatile private var trackerState = Initialized
8684

8785
// endpoint is created when generator starts.
8886
// This not being null means the tracker has been started and not stopped
8987
private var endpoint: RpcEndpointRef = null
9088

9189
/** Check if tracker has been marked for starting */
92-
private def isTrackerStarted(): Boolean = trackerStateLock.synchronized {
93-
trackerState == Started
94-
}
90+
private def isTrackerStarted(): Boolean = trackerState == Started
9591

9692
/** Check if tracker has been marked for stopping */
97-
private def isTrackerStopping(): Boolean = trackerStateLock.synchronized {
98-
trackerState == Stopping
99-
}
93+
private def isTrackerStopping(): Boolean = trackerState == Stopping
10094

10195
/** Check if tracker has been marked for stopped */
102-
private def isTrackerStopped(): Boolean = trackerStateLock.synchronized {
103-
trackerState == Stopped
104-
}
96+
private def isTrackerStopped(): Boolean = trackerState == Stopped
10597

10698
/** Start the endpoint and receiver execution thread. */
10799
def start(): Unit = synchronized {
@@ -114,29 +106,23 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
114106
"ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
115107
if (!skipReceiverLaunch) receiverExecutor.start()
116108
logInfo("ReceiverTracker started")
117-
trackerStateLock.synchronized {
118-
trackerState = Started
119-
}
109+
trackerState = Started
120110
}
121111
}
122112

123113
/** Stop the receiver execution thread. */
124114
def stop(graceful: Boolean): Unit = synchronized {
125115
if (isTrackerStarted) {
126116
// First, stop the receivers
127-
trackerStateLock.synchronized {
128-
trackerState = Stopping
129-
}
117+
trackerState = Stopping
130118
if (!skipReceiverLaunch) receiverExecutor.stop(graceful)
131119

132120
// Finally, stop the endpoint
133121
ssc.env.rpcEnv.stop(endpoint)
134122
endpoint = null
135123
receivedBlockTracker.stop()
136124
logInfo("ReceiverTracker stopped")
137-
trackerStateLock.synchronized {
138-
trackerState = Stopped
139-
}
125+
trackerState = Stopped
140126
}
141127
}
142128

@@ -187,18 +173,17 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
187173
throw new SparkException("Register received for unexpected id " + streamId)
188174
}
189175

190-
trackerStateLock.synchronized {
191-
if (isTrackerStopping || isTrackerStopped) {
192-
false
193-
} else {
194-
// When updating "receiverInfo", we should make sure "trackerState" won't be changed at the
195-
// same time. Therefore the following line should be in "trackerStateLock.synchronized".
196-
receiverInfo(streamId) = ReceiverInfo(
197-
streamId, s"${typ}-${streamId}", receiverEndpoint, true, host)
198-
listenerBus.post(StreamingListenerReceiverStarted(receiverInfo(streamId)))
199-
logInfo("Registered receiver for stream " + streamId + " from " + senderAddress)
200-
true
201-
}
176+
if (isTrackerStopping || isTrackerStopped) {
177+
false
178+
} else {
179+
// "stopReceivers" won't happen at the same time because both "registerReceiver" and are
180+
// called in the event loop. So here we can assume "stopReceivers" has not yet been called. If
181+
// "stopReceivers" is called later, it should be able to see this receiver.
182+
receiverInfo(streamId) = ReceiverInfo(
183+
streamId, s"${typ}-${streamId}", receiverEndpoint, true, host)
184+
listenerBus.post(StreamingListenerReceiverStarted(receiverInfo(streamId)))
185+
logInfo("Registered receiver for stream " + streamId + " from " + senderAddress)
186+
true
202187
}
203188
}
204189

@@ -275,6 +260,18 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
275260
case DeregisterReceiver(streamId, message, error) =>
276261
deregisterReceiver(streamId, message, error)
277262
context.reply(true)
263+
case StopAllReceivers =>
264+
assert(isTrackerStopping || isTrackerStopped)
265+
stopReceivers()
266+
context.reply(true)
267+
}
268+
269+
/** Stops the receivers. */
270+
private def stopReceivers() {
271+
// Signal the receivers to stop
272+
receiverInfo.values.flatMap { info => Option(info.endpoint)}
273+
.foreach { _.send(StopReceiver) }
274+
logInfo("Sent stop signal to all " + receiverInfo.size + " receivers")
278275
}
279276
}
280277

@@ -299,7 +296,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
299296

300297
def stop(graceful: Boolean) {
301298
// Send the stop signal to all the receivers
302-
stopReceivers()
299+
endpoint.askWithRetry[Boolean](StopAllReceivers)
303300

304301
// Wait for the Spark job that runs the receivers to be over
305302
// That is, for the receivers to quit gracefully.
@@ -414,12 +411,5 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
414411
}
415412
}
416413

417-
/** Stops the receivers. */
418-
private def stopReceivers() {
419-
// Signal the receivers to stop
420-
receiverInfo.values.flatMap { info => Option(info.endpoint)}
421-
.foreach { _.send(StopReceiver) }
422-
logInfo("Sent stop signal to all " + receiverInfo.size + " receivers")
423-
}
424414
}
425415
}

0 commit comments

Comments
 (0)