Skip to content

Commit b4c29e7

Browse files
committed
Stop receiver only if we start it
1 parent c41ee94 commit b4c29e7

File tree

1 file changed

+10
-9
lines changed

1 file changed

+10
-9
lines changed

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -141,16 +141,17 @@ private[streaming] abstract class ReceiverSupervisor(
141141
def stopReceiver(message: String, error: Option[Throwable]): Unit = synchronized {
142142
try {
143143
logInfo("Stopping receiver with message: " + message + ": " + error.getOrElse(""))
144-
if (receiverState == Started) {
145-
receiverState = Stopped
146-
receiver.onStop()
147-
} else {
148-
// "receiver.onStart()" is not called. So we should not call "receiver.onStop()"
149-
receiverState = Stopped
144+
receiverState match {
145+
case Initialized =>
146+
logWarning("Skip stopping receiver because it has not yet stared")
147+
case Started =>
148+
receiverState = Stopped
149+
receiver.onStop()
150+
logInfo("Called receiver onStop")
151+
onReceiverStop(message, error)
152+
case Stopped =>
153+
logWarning("Receiver has been stopped")
150154
}
151-
receiver.onStop()
152-
logInfo("Called receiver onStop")
153-
onReceiverStop(message, error)
154155
} catch {
155156
case NonFatal(t) =>
156157
logError("Error stopping receiver " + streamId + t.getStackTraceString)

0 commit comments

Comments
 (0)