Skip to content

Commit 3debd73

Browse files
committed
Fix it so we don't block the decom message
1 parent 484f8e2 commit 3debd73

File tree

1 file changed

+30
-29
lines changed

1 file changed

+30
-29
lines changed

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 30 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -291,40 +291,41 @@ private[spark] class CoarseGrainedExecutorBackend(
291291
// code paths.
292292

293293
val shutdownThread = new Thread("wait-for-blocks-to-migrate") {
294-
var lastTaskRunningTime = System.nanoTime()
295-
val sleep_time = 1000 // 1s
296-
297-
while (true) {
298-
logInfo("Checking to see if we can shutdown.")
299-
if (executor == null || executor.numRunningTasks == 0) {
300-
if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
301-
logInfo("No running tasks, checking migrations")
302-
val (migrationTime, allBlocksMigrated) = env.blockManager.lastMigrationInfo()
303-
// We can only trust allBlocksMigrated boolean value if there were no tasks running
304-
// since the start of computing it.
305-
if (allBlocksMigrated && (migrationTime > lastTaskRunningTime)) {
306-
logInfo("No running tasks, all blocks migrated, stopping.")
307-
exitExecutor(0, "Finished decommissioning", notifyDriver = true)
294+
override def run(): Unit = {
295+
var lastTaskRunningTime = System.nanoTime()
296+
val sleep_time = 1000 // 1s
297+
298+
while (true) {
299+
logInfo("Checking to see if we can shutdown.")
300+
if (executor == null || executor.numRunningTasks == 0) {
301+
if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
302+
logInfo("No running tasks, checking migrations")
303+
val (migrationTime, allBlocksMigrated) = env.blockManager.lastMigrationInfo()
304+
// We can only trust allBlocksMigrated boolean value if there were no tasks running
305+
// since the start of computing it.
306+
if (allBlocksMigrated && (migrationTime > lastTaskRunningTime)) {
307+
logInfo("No running tasks, all blocks migrated, stopping.")
308+
exitExecutor(0, "Finished decommissioning", notifyDriver = true)
309+
} else {
310+
logInfo("All blocks not yet migrated.")
311+
}
308312
} else {
309-
logInfo("All blocks not yet migrated.")
313+
logInfo("No running tasks, no block migration configured, stopping.")
314+
exitExecutor(0, "Finished decommissioning", notifyDriver = true)
310315
}
316+
Thread.sleep(sleep_time)
311317
} else {
312-
logInfo("No running tasks, no block migration configured, stopping.")
313-
exitExecutor(0, "Finished decommissioning", notifyDriver = true)
318+
logInfo("Blocked from shutdown by running ${executor.numRunningtasks} tasks")
319+
// If there is a running task it could store blocks, so make sure we wait for a
320+
// migration loop to complete after the last task is done.
321+
Thread.sleep(sleep_time)
322+
lastTaskRunningTime = System.nanoTime()
314323
}
315-
Thread.sleep(sleep_time)
316-
} else {
317-
logInfo("Blocked from shutdown by running ${executor.numRunningtasks} tasks")
318-
// If there is a running task it could store blocks, so make sure we wait for a
319-
// migration loop to complete after the last task is done.
320-
Thread.sleep(sleep_time)
321-
lastTaskRunningTime = System.nanoTime()
322324
}
323-
}
324-
}.setDaemon(true)
325-
logInfo("Will exit when finished decommissioning")
326-
// Return true since we are handling a signal
327-
true
325+
}.setDaemon(true)
326+
logInfo("Will exit when finished decommissioning")
327+
// Return true since we are handling a signal
328+
true
328329
} catch {
329330
case e: Exception =>
330331
logError("Unexpected error while decommissioning self", e)

0 commit comments

Comments
 (0)