Skip to content

Commit 15ed4a1

Browse files
committed
Register before starting the receiver
1 parent fff63f9 commit 15ed4a1

File tree

3 files changed

+26
-16
lines changed

3 files changed

+26
-16
lines changed

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ private[streaming] abstract class ReceiverSupervisor(
9494
/** Called when supervisor is stopped */
9595
protected def onStop(message: String, error: Option[Throwable]) { }
9696

97-
/** Called when receiver is started */
98-
protected def onReceiverStart() { }
97+
/** Called when receiver is started. Return if the driver accepts us */
98+
protected def onReceiverStart(): Boolean = true
9999

100100
/** Called when receiver is stopped */
101101
protected def onReceiverStop(message: String, error: Option[Throwable]) { }
@@ -117,11 +117,15 @@ private[streaming] abstract class ReceiverSupervisor(
117117
/** Start receiver */
118118
def startReceiver(): Unit = synchronized {
119119
try {
120-
logInfo("Starting receiver")
121-
receiver.onStart()
122-
logInfo("Called receiver onStart")
123-
onReceiverStart()
124-
receiverState = Started
120+
if (onReceiverStart()) {
121+
logInfo("Starting receiver")
122+
receiverState = Started
123+
receiver.onStart()
124+
logInfo("Called receiver onStart")
125+
} else {
126+
// The driver refused us
127+
stop("Registered unsuccessfully because the driver refused" + streamId, None)
128+
}
125129
} catch {
126130
case t: Throwable =>
127131
stop("Error starting receiver " + streamId, Some(t))
@@ -132,7 +136,13 @@ private[streaming] abstract class ReceiverSupervisor(
132136
def stopReceiver(message: String, error: Option[Throwable]): Unit = synchronized {
133137
try {
134138
logInfo("Stopping receiver with message: " + message + ": " + error.getOrElse(""))
135-
receiverState = Stopped
139+
if (receiverState == Started) {
140+
receiverState = Stopped
141+
receiver.onStop()
142+
} else {
143+
// "receiver.onStart()" is not called. So we should not call "receiver.onStop()"
144+
receiverState = Stopped
145+
}
136146
receiver.onStop()
137147
logInfo("Called receiver onStop")
138148
onReceiverStop(message, error)

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -167,13 +167,10 @@ private[streaming] class ReceiverSupervisorImpl(
167167
env.rpcEnv.stop(endpoint)
168168
}
169169

170-
override protected def onReceiverStart() {
170+
override protected def onReceiverStart(): Boolean = {
171171
val msg = RegisterReceiver(
172172
streamId, receiver.getClass.getSimpleName, Utils.localHostName(), endpoint)
173-
val successful = trackerEndpoint.askWithRetry[Boolean](msg)
174-
if (!successful) {
175-
stop("Registered unsuccessfully", None)
176-
}
173+
trackerEndpoint.askWithRetry[Boolean](msg)
177174
}
178175

179176
override protected def onReceiverStop(message: String, error: Option[Throwable]) {

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -366,9 +366,12 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
366366
// Distribute the receivers and start them
367367
logInfo("Starting " + receivers.length + " receivers")
368368
running = true
369-
ssc.sparkContext.runJob(tempRDD, ssc.sparkContext.clean(startReceiver))
370-
running = false
371-
logInfo("All of the receivers have been terminated")
369+
try {
370+
ssc.sparkContext.runJob(tempRDD, ssc.sparkContext.clean(startReceiver))
371+
logInfo("All of the receivers have been terminated")
372+
} finally {
373+
running = false
374+
}
372375
}
373376

374377
/** Stops the receivers. */

0 commit comments

Comments
 (0)