Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 28 additions & 19 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -473,29 +473,38 @@ private[spark] class Executor(
// the default uncaught exception handler, which will terminate the Executor.
logError(s"Exception in $taskName (TID $taskId)", t)

// Collect latest accumulator values to report back to the driver
val accums: Seq[AccumulatorV2[_, _]] =
if (task != null) {
task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart)
task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
task.collectAccumulatorUpdates(taskFailed = true)
} else {
Seq.empty
}
// SPARK-20904: Do not report failure to driver if if happened during shut down. Because
// libraries may set up shutdown hooks that race with running tasks during shutdown,
// spurious failures may occur and can result in improper accounting in the driver (e.g.
// the task failure would not be ignored if the shutdown happened because of premption,
// instead of an app issue).
if (!ShutdownHookManager.inShutdown()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the shut down is caused by an app issue, do we want to report the task failure to the driver?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The task will still fail in that case, just with a different error ("Executor lost").

Because the executor shutdown in that case won't be caused by the cluster manager (e.g. preemption), the task failure will still count. So aside from a different error message, everything else behaves the same in that case.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point I don't think we have any information on why we're in shutdown, whether it is an app issue, the Spark executor process being killed from the command line, etc.

Yes, a nice log message would be nice. Maybe, in the else clause to this if, something like logInfo(s"Not reporting failure as we are in the middle of a shutdown").

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can add a log, but it's not guaranteed to be printed. During shutdown the JVM can die at any moment (only shutdown hooks run to completion, and this is not one of them)...

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it isn't guaranteed. I'm thinking that if this happens often enough maybe one executor will print the message, giving a clue to the user. Also it's a de-facto code comment. Yes, any daemon thread will terminate at any time at shutdown - even finishing this block isn't guaranteed. Thanks!

// Collect latest accumulator values to report back to the driver
val accums: Seq[AccumulatorV2[_, _]] =
if (task != null) {
task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart)
task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
task.collectAccumulatorUpdates(taskFailed = true)
} else {
Seq.empty
}

val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None))
val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None))

val serializedTaskEndReason = {
try {
ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums))
} catch {
case _: NotSerializableException =>
// t is not serializable so just send the stacktrace
ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums))
val serializedTaskEndReason = {
try {
ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums))
} catch {
case _: NotSerializableException =>
// t is not serializable so just send the stacktrace
ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums))
}
}
setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)
} else {
logInfo("Not reporting error to driver during JVM shutdown.")
}
setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)

// Don't forcibly exit unless the exception was inherently fatal, to avoid
// stopping other tasks unnecessarily.
Expand Down