Skip to content

Commit cecd285

Browse files
Marcelo Vanzincloud-fan
authored andcommitted
[SPARK-20904][CORE] Don't report task failures to driver during shutdown.
Executors run a thread pool with daemon threads to run tasks. This means that those threads remain active when the JVM is shutting down, meaning those tasks are affected by code that runs in shutdown hooks. So if a shutdown hook messes with something that the task is using (e.g. an HDFS connection), the task will fail and will report that failure to the driver. That will make the driver mark the task as failed regardless of what caused the executor to shut down. So, for example, if YARN pre-empted that executor, the driver would consider that task failed when it should instead ignore the failure. This change avoids reporting failures to the driver when shutdown hooks are executing; this fixes the YARN preemption accounting, and doesn't really change things much for other scenarios, other than reporting a more generic error ("Executor lost") when the executor shuts down unexpectedly - which is arguably more correct. Tested with a hacky app running on spark-shell that tried to cause failures only when shutdown hooks were running, verified that preemption didn't cause the app to fail because of task failures exceeding the threshold. Author: Marcelo Vanzin <[email protected]> Closes #18594 from vanzin/SPARK-20904.
1 parent ccaee5b commit cecd285

File tree

1 file changed

+28
-19
lines changed

1 file changed

+28
-19
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -473,29 +473,38 @@ private[spark] class Executor(
473473
// the default uncaught exception handler, which will terminate the Executor.
474474
logError(s"Exception in $taskName (TID $taskId)", t)
475475

476-
// Collect latest accumulator values to report back to the driver
477-
val accums: Seq[AccumulatorV2[_, _]] =
478-
if (task != null) {
479-
task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart)
480-
task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
481-
task.collectAccumulatorUpdates(taskFailed = true)
482-
} else {
483-
Seq.empty
484-
}
476+
// SPARK-20904: Do not report failure to driver if if happened during shut down. Because
477+
// libraries may set up shutdown hooks that race with running tasks during shutdown,
478+
// spurious failures may occur and can result in improper accounting in the driver (e.g.
479+
// the task failure would not be ignored if the shutdown happened because of premption,
480+
// instead of an app issue).
481+
if (!ShutdownHookManager.inShutdown()) {
482+
// Collect latest accumulator values to report back to the driver
483+
val accums: Seq[AccumulatorV2[_, _]] =
484+
if (task != null) {
485+
task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart)
486+
task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
487+
task.collectAccumulatorUpdates(taskFailed = true)
488+
} else {
489+
Seq.empty
490+
}
485491

486-
val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None))
492+
val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None))
487493

488-
val serializedTaskEndReason = {
489-
try {
490-
ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums))
491-
} catch {
492-
case _: NotSerializableException =>
493-
// t is not serializable so just send the stacktrace
494-
ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums))
494+
val serializedTaskEndReason = {
495+
try {
496+
ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums))
497+
} catch {
498+
case _: NotSerializableException =>
499+
// t is not serializable so just send the stacktrace
500+
ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums))
501+
}
495502
}
503+
setTaskFinishedAndClearInterruptStatus()
504+
execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)
505+
} else {
506+
logInfo("Not reporting error to driver during JVM shutdown.")
496507
}
497-
setTaskFinishedAndClearInterruptStatus()
498-
execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)
499508

500509
// Don't forcibly exit unless the exception was inherently fatal, to avoid
501510
// stopping other tasks unnecessarily.

0 commit comments

Comments
 (0)