From 0625dc3187c9c8fa8d507adc0da75747c30f0ebc Mon Sep 17 00:00:00 2001 From: Charles Lewis Date: Wed, 22 Mar 2017 13:33:55 -0700 Subject: [PATCH 1/3] report metrics for killed tasks --- .../org/apache/spark/TaskEndReason.scala | 14 ++++++-- .../org/apache/spark/executor/Executor.scala | 33 +++++++++++++++++-- .../apache/spark/scheduler/DAGScheduler.scala | 10 ++++-- .../spark/scheduler/TaskSetManager.scala | 8 ++++- .../org/apache/spark/util/JsonProtocol.scala | 7 +++- 5 files changed, 62 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index a76283e33fa65..1d14fa1832614 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -212,9 +212,19 @@ case object TaskResultLost extends TaskFailedReason { * Task was killed intentionally and needs to be rescheduled. */ @DeveloperApi -case class TaskKilled(reason: String) extends TaskFailedReason { - override def toErrorString: String = s"TaskKilled ($reason)" +case class TaskKilled( + reason: String + accumUpdates: Seq[AccumulableInfo] = Seq.empty, + private[spark] var accums: Seq[AccumulatorV2[_, _]] = Nil) + extends TaskFailedReason { + + override def toErrorString: String = "TaskKilled ($reason)" override def countTowardsTaskFailures: Boolean = false + + private[spark] def withAccums(accums: Seq[AccumulatorV2[_, _]]): TaskKilled = { + this.accums = accums + this + } } /** diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 99b1608010ddb..46f46c683e89b 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -429,15 +429,42 @@ private[spark] class Executor( case t: TaskKilledException => logInfo(s"Executor killed $taskName (TID $taskId), reason: ${t.reason}") + + // Collect latest accumulator values to report back to the driver + val accums: Seq[AccumulatorV2[_, _]] = + if (task != null) { + task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart) + task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) + task.collectAccumulatorUpdates(taskFailed = true) + } else { + Seq.empty + } + val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None)) + setTaskFinishedAndClearInterruptStatus() - execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled(t.reason))) + + val serializedTK = ser.serialize(TaskKilled(t.reason, accUpdates).withAccums(accums)) + execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK) case _: InterruptedException if task.reasonIfKilled.isDefined => val killReason = task.reasonIfKilled.getOrElse("unknown reason") logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason") + + // Collect latest accumulator values to report back to the driver + val accums: Seq[AccumulatorV2[_, _]] = + if (task != null) { + task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart) + task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) + task.collectAccumulatorUpdates(taskFailed = true) + } else { + Seq.empty + } + val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None)) + setTaskFinishedAndClearInterruptStatus() - execBackend.statusUpdate( - taskId, TaskState.KILLED, ser.serialize(TaskKilled(killReason))) + + val serializedTK = ser.serialize(TaskKilled(killReason, accUpdates).withAccums(accums)) + execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK) case CausedBy(cDE: CommitDeniedException) => val reason = cDE.toTaskFailedReason diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 09717316833a7..c951a31e9a5b8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1355,14 +1355,18 @@ class DAGScheduler( case commitDenied: TaskCommitDenied => // Do nothing here, left up to the TaskScheduler to decide how to handle denied commits - case exceptionFailure: ExceptionFailure => - // Tasks failed with exceptions might still have accumulator updates. + case _: ExceptionFailure => + // Tasks killed or failed with exceptions might still have accumulator updates. + updateAccumulators(event) + + case _: TaskKilled => + // Tasks killed or failed with exceptions might still have accumulator updates. updateAccumulators(event) case TaskResultLost => // Do nothing here; the TaskScheduler handles these failures and resubmits the task. - case _: ExecutorLostFailure | _: TaskKilled | UnknownReason => + case _: ExecutorLostFailure | UnknownReason => // Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler // will abort the job. } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index a177aab5f95de..7331b0f0132b1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -812,13 +812,19 @@ private[spark] class TaskSetManager( } ef.exception + case tk: TaskKilled => + // TaskKilled might have accumulator updates + accumUpdates = tk.accums + logWarning(failureReason) + None + case e: ExecutorLostFailure if !e.exitCausedByApp => logInfo(s"Task $tid failed because while it was being computed, its executor " + "exited for a reason unrelated to the task. Not counting this failure towards the " + "maximum number of failures for the task.") None - case e: TaskFailedReason => // TaskResultLost, TaskKilled, and others + case e: TaskFailedReason => // TaskResultLost and others logWarning(failureReason) None } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 2cb88919c8c83..2f3269a6f9ab8 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -391,7 +391,9 @@ private[spark] object JsonProtocol { ("Exit Caused By App" -> exitCausedByApp) ~ ("Loss Reason" -> reason.map(_.toString)) case taskKilled: TaskKilled => + val accumUpdates = JArray(taskKilled.accumUpdates.map(accumulableInfoToJson).toList) ("Kill Reason" -> taskKilled.reason) + ("Accumulator Updates" -> accumUpdates) case _ => Utils.emptyJson } ("Reason" -> reason) ~ json @@ -882,7 +884,10 @@ private[spark] object JsonProtocol { case `taskKilled` => val killReason = Utils.jsonOption(json \ "Kill Reason") .map(_.extract[String]).getOrElse("unknown reason") - TaskKilled(killReason) + val accumUpdates = Utils.jsonOption(json \ "Accumulator Updates") + .map(_.extract[List[JValue]].map(accumulableInfoFromJson)) + .getOrElse(Seq[AccumulableInfo]()) + TaskKilled(killReason, accumUpdates) case `taskCommitDenied` => // Unfortunately, the `TaskCommitDenied` message was introduced in 1.3.0 but the JSON // de/serialization logic was not added until 1.5.1. To provide backward compatibility From ee883b2f3da10a4e4a48f4a98910ccadceac461c Mon Sep 17 00:00:00 2001 From: Charles Lewis Date: Fri, 24 Mar 2017 12:06:46 -0700 Subject: [PATCH 2/3] add task killed to exception accum test --- .../spark/scheduler/DAGSchedulerSuite.scala | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index a10941b579fe2..1bd6f1b959092 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1760,7 +1760,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assertDataStructuresEmpty() } - test("accumulators are updated on exception failures") { + test("accumulators are updated on exception failures and task killed") { val acc1 = AccumulatorSuite.createLongAccum("ingenieur") val acc2 = AccumulatorSuite.createLongAccum("boulanger") val acc3 = AccumulatorSuite.createLongAccum("agriculteur") @@ -1776,15 +1776,25 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou val accUpdate3 = new LongAccumulator accUpdate3.metadata = acc3.metadata accUpdate3.setValue(18) - val accumUpdates = Seq(accUpdate1, accUpdate2, accUpdate3) - val accumInfo = accumUpdates.map(AccumulatorSuite.makeInfo) + + val accumUpdates1 = Seq(accUpdate1, accUpdate2) + val accumInfo1 = accumUpdates1.map(AccumulatorSuite.makeInfo) val exceptionFailure = new ExceptionFailure( new SparkException("fondue?"), - accumInfo).copy(accums = accumUpdates) + accumInfo1).copy(accums = accumUpdates1) submit(new MyRDD(sc, 1, Nil), Array(0)) runEvent(makeCompletionEvent(taskSets.head.tasks.head, exceptionFailure, "result")) + assert(AccumulatorContext.get(acc1.id).get.value === 15L) assert(AccumulatorContext.get(acc2.id).get.value === 13L) + + val accumUpdates2 = Seq(accUpdate3) + val accumInfo2 = accumUpdates2.map(AccumulatorSuite.makeInfo) + + val taskKilled = new TaskKilled( + accumInfo2).copy(accums = accumUpdates2) + runEvent(makeCompletionEvent(taskSets.head.tasks.head, taskKilled, "result")) + assert(AccumulatorContext.get(acc3.id).get.value === 18L) } @@ -2323,6 +2333,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou val accumUpdates = reason match { case Success => task.metrics.accumulators() case ef: ExceptionFailure => ef.accums + case tk: TaskKilled => tk.accums case _ => Seq.empty } CompletionEvent(task, reason, result, accumUpdates ++ extraAccumUpdates, taskInfo) From 25ffbf49b2779d4fa795d754ee20fbe3542dd57d Mon Sep 17 00:00:00 2001 From: Charles Lewis Date: Fri, 24 Mar 2017 16:20:59 -0700 Subject: [PATCH 3/3] extra fixes for task killed reason merge --- core/src/main/scala/org/apache/spark/TaskEndReason.scala | 2 +- core/src/main/scala/org/apache/spark/util/JsonProtocol.scala | 2 +- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 1d14fa1832614..8e1327e01ada9 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -213,7 +213,7 @@ case object TaskResultLost extends TaskFailedReason { */ @DeveloperApi case class TaskKilled( - reason: String + reason: String, accumUpdates: Seq[AccumulableInfo] = Seq.empty, private[spark] var accums: Seq[AccumulatorV2[_, _]] = Nil) extends TaskFailedReason { diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 2f3269a6f9ab8..3bf77a33adc66 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -392,7 +392,7 @@ private[spark] object JsonProtocol { ("Loss Reason" -> reason.map(_.toString)) case taskKilled: TaskKilled => val accumUpdates = JArray(taskKilled.accumUpdates.map(accumulableInfoToJson).toList) - ("Kill Reason" -> taskKilled.reason) + ("Kill Reason" -> taskKilled.reason) ~ ("Accumulator Updates" -> accumUpdates) case _ => Utils.emptyJson } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 1bd6f1b959092..3b3f2d5761786 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1792,6 +1792,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou val accumInfo2 = accumUpdates2.map(AccumulatorSuite.makeInfo) val taskKilled = new TaskKilled( + "test", accumInfo2).copy(accums = accumUpdates2) runEvent(makeCompletionEvent(taskSets.head.tasks.head, taskKilled, "result"))