Skip to content

Commit 075e0a3

Browse files
committed
Add receiver RDD name; use '!isTrackerStarted' instead
1 parent 276a4ac commit 075e0a3

File tree

1 file changed

+7
-6
lines changed

1 file changed

+7
-6
lines changed

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -379,13 +379,13 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
379379
}
380380

381381
/** Check if tracker has been marked for starting */
382-
private def isTrackerStarted(): Boolean = trackerState == Started
382+
private def isTrackerStarted: Boolean = trackerState == Started
383383

384384
/** Check if tracker has been marked for stopping */
385-
private def isTrackerStopping(): Boolean = trackerState == Stopping
385+
private def isTrackerStopping: Boolean = trackerState == Stopping
386386

387387
/** Check if tracker has been marked for stopped */
388-
private def isTrackerStopped(): Boolean = trackerState == Stopped
388+
private def isTrackerStopped: Boolean = trackerState == Stopped
389389

390390
/** RpcEndpoint to receive messages from the receivers. */
391391
private class ReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint {
@@ -436,7 +436,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
436436
receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
437437
val receiverId = receiver.streamId
438438

439-
if (isTrackerStopping() || isTrackerStopped()) {
439+
if (!isTrackerStarted) {
440440
onReceiverJobFinish(receiverId)
441441
return
442442
}
@@ -455,19 +455,20 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
455455
} else {
456456
ssc.sc.makeRDD(Seq(receiver -> scheduledLocations))
457457
}
458+
receiverRDD.setName(s"Receiver $receiverId")
458459
val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
459460
receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
460461
// We will keep restarting the receiver job until ReceiverTracker is stopped
461462
future.onComplete {
462463
case Success(_) =>
463-
if (isTrackerStopping() || isTrackerStopped()) {
464+
if (!isTrackerStarted) {
464465
onReceiverJobFinish(receiverId)
465466
} else {
466467
logInfo(s"Restarting Receiver $receiverId")
467468
self.send(StartReceiver(receiver))
468469
}
469470
case Failure(e) =>
470-
if (isTrackerStopping() || isTrackerStopped()) {
471+
if (!isTrackerStarted) {
471472
onReceiverJobFinish(receiverId)
472473
} else {
473474
logError("Receiver has been stopped. Try to restart it.", e)

0 commit comments

Comments
 (0)