File tree Expand file tree Collapse file tree 1 file changed +10
-9
lines changed
streaming/src/main/scala/org/apache/spark/streaming/receiver Expand file tree Collapse file tree 1 file changed +10
-9
lines changed Original file line number Diff line number Diff line change @@ -141,16 +141,17 @@ private[streaming] abstract class ReceiverSupervisor(
141141 def stopReceiver (message : String , error : Option [Throwable ]): Unit = synchronized {
142142 try {
143143 logInfo(" Stopping receiver with message: " + message + " : " + error.getOrElse(" " ))
144- if (receiverState == Started ) {
145- receiverState = Stopped
146- receiver.onStop()
147- } else {
148- // "receiver.onStart()" is not called. So we should not call "receiver.onStop()"
149- receiverState = Stopped
144+ receiverState match {
145+ case Initialized =>
146+ logWarning(" Skip stopping receiver because it has not yet stared" )
147+ case Started =>
148+ receiverState = Stopped
149+ receiver.onStop()
150+ logInfo(" Called receiver onStop" )
151+ onReceiverStop(message, error)
152+ case Stopped =>
153+ logWarning(" Receiver has been stopped" )
150154 }
151- receiver.onStop()
152- logInfo(" Called receiver onStop" )
153- onReceiverStop(message, error)
154155 } catch {
155156 case NonFatal (t) =>
156157 logError(" Error stopping receiver " + streamId + t.getStackTraceString)
You can’t perform that action at this time.
0 commit comments