Skip to content

Commit b9a3077

Browse files
committed
[SPARK-3736] Addressing PR comments related to reconnection.
- scheduledReconnectMessage --> scheduledReconnectTask - A log statement in the master is printed if a worker that was unregistered and not in its worker set sends a heartbeat.
1 parent 2ad5ed5 commit b9a3077

File tree

2 files changed

+10
-7
lines changed

2 files changed

+10
-7
lines changed

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,9 @@ private[spark] class Master(
345345
logWarning(s"Got heartbeat from unregistered worker $workerId." +
346346
" Asking it to re-register.")
347347
sender ! ReconnectWorker(masterUrl)
348+
} else {
349+
logWarning(s"Got heartbeat from unregistered worker $workerId." +
350+
" This worker was never registered, so ignoring the heartbeat.")
348351
}
349352
}
350353
}

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ private[spark] class Worker(
9696
val finishedExecutors = new HashMap[String, ExecutorRunner]
9797
val drivers = new HashMap[String, DriverRunner]
9898
val finishedDrivers = new HashMap[String, DriverRunner]
99-
var scheduledReconnectMessage: Option[Cancellable] = None
99+
var scheduledReconnectTask: Option[Cancellable] = None
100100

101101
val publicAddress = {
102102
val envVar = System.getenv("SPARK_PUBLIC_DNS")
@@ -159,7 +159,7 @@ private[spark] class Worker(
159159
throw new SparkException("Invalid spark URL: " + x)
160160
}
161161
connected = true
162-
scheduledReconnectMessage.foreach(_.cancel())
162+
scheduledReconnectTask.foreach(_.cancel())
163163
}
164164

165165
def tryRegisterAllMasters() {
@@ -201,8 +201,8 @@ private[spark] class Worker(
201201
context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis,
202202
CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup)
203203
}
204-
scheduledReconnectMessage.foreach(_.cancel())
205-
scheduledReconnectMessage = None
204+
scheduledReconnectTask.foreach(_.cancel())
205+
scheduledReconnectTask = None
206206

207207
case SendHeartbeat =>
208208
if (connected) { master ! Heartbeat(workerId) }
@@ -379,8 +379,8 @@ private[spark] class Worker(
379379
}
380380

381381
def scheduleAttemptsToReconnectToMaster() {
382-
if (!scheduledReconnectMessage.isDefined) {
383-
scheduledReconnectMessage = Some(context.system.scheduler.schedule(
382+
if (!scheduledReconnectTask.isDefined) {
383+
scheduledReconnectTask = Some(context.system.scheduler.schedule(
384384
Duration Zero, RECONNECT_ATTEMPT_INTERVAL_MILLIS millis) {
385385
tryRegisterAllMasters()
386386
})
@@ -394,7 +394,7 @@ private[spark] class Worker(
394394
override def postStop() {
395395
metricsSystem.report()
396396
registrationRetryTimer.foreach(_.cancel())
397-
scheduledReconnectMessage.foreach(_.cancel())
397+
scheduledReconnectTask.foreach(_.cancel())
398398
executors.values.foreach(_.kill())
399399
drivers.values.foreach(_.kill())
400400
webUi.stop()

0 commit comments

Comments
 (0)