From 25e455e711b978cd331ee0f484f70fde31307634 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 22 Nov 2016 18:08:07 -0800 Subject: [PATCH 01/19] Add failing regression test. --- .../scheduler/TaskSchedulerImplSuite.scala | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index a09a602d1368d..405e0d37ca420 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -274,4 +274,30 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L assert("executor1" === taskDescriptions3(0).executorId) } + test("if an executor is lost then state for tasks running on that executor is cleaned up") { + sc = new SparkContext("local", "TaskSchedulerImplSuite") + val taskScheduler = new TaskSchedulerImpl(sc) + taskScheduler.initialize(new FakeSchedulerBackend) + // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. + new DAGScheduler(sc, taskScheduler) { + override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} + override def executorAdded(execId: String, host: String) {} + } + + val e0Offers = Seq(new WorkerOffer("executor0", "host0", 1)) + val attempt1 = FakeTask.createTaskSet(1) + + // submit attempt 1, offer resources, task gets scheduled + taskScheduler.submitTasks(attempt1) + val taskDescriptions = taskScheduler.resourceOffers(e0Offers).flatten + assert(1 === taskDescriptions.length) + + // mark executor0 as dead + taskScheduler.executorLost("executor0", SlaveLost()) + + // Check that state associated with the lost task attempt is cleaned up: + assert(taskScheduler.taskIdToExecutorId.isEmpty) + assert(taskScheduler.taskIdToTaskSetManager.isEmpty) + assert(taskScheduler.runningTasksByExecutors().get("executor0").isEmpty) + } } From 69feae3591adc9fe88aff8c190d0d95f14cb0ced Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 22 Nov 2016 19:06:02 -0800 Subject: [PATCH 02/19] Initial fix. --- .../spark/scheduler/TaskSchedulerImpl.scala | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index d22321b88fb8e..88b94d90510ed 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -89,9 +89,11 @@ private[spark] class TaskSchedulerImpl( val nextTaskId = new AtomicLong(0) // Number of tasks running on each executor - private val executorIdToTaskCount = new HashMap[String, Int] + private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]] - def runningTasksByExecutors(): Map[String, Int] = executorIdToTaskCount.toMap + def runningTasksByExecutors(): Map[String, Int] = synchronized { + executorIdToRunningTaskIds.toMap.mapValues(_.size) + } // The set of executors we have on each host; this is used to compute hostsAlive, which // in turn is used to decide when we can attain data locality on a given host @@ -259,7 +261,7 @@ private[spark] class TaskSchedulerImpl( val tid = task.taskId taskIdToTaskSetManager(tid) = taskSet taskIdToExecutorId(tid) = execId - executorIdToTaskCount(execId) += 1 + executorIdToRunningTaskIds(execId).add(tid) executorsByHost(host) += execId availableCpus(i) -= CPUS_PER_TASK assert(availableCpus(i) >= 0) @@ -288,7 +290,7 @@ private[spark] class TaskSchedulerImpl( var newExecAvail = false for (o <- offers) { executorIdToHost(o.executorId) = o.host - executorIdToTaskCount.getOrElseUpdate(o.executorId, 0) + executorIdToRunningTaskIds.getOrElseUpdate(o.executorId, HashSet[Long]()) if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() executorAdded(o.executorId, o.host) @@ -339,7 +341,7 @@ private[spark] class TaskSchedulerImpl( // We lost this entire executor, so remember that it's gone val execId = taskIdToExecutorId(tid) - if (executorIdToTaskCount.contains(execId)) { + if (executorIdToRunningTaskIds.contains(execId)) { reason = Some( SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.")) removeExecutor(execId, reason.get) @@ -351,9 +353,7 @@ private[spark] class TaskSchedulerImpl( if (TaskState.isFinished(state)) { taskIdToTaskSetManager.remove(tid) taskIdToExecutorId.remove(tid).foreach { execId => - if (executorIdToTaskCount.contains(execId)) { - executorIdToTaskCount(execId) -= 1 - } + executorIdToRunningTaskIds.remove(execId) } } if (state == TaskState.FINISHED) { @@ -477,7 +477,7 @@ private[spark] class TaskSchedulerImpl( var failedExecutor: Option[String] = None synchronized { - if (executorIdToTaskCount.contains(executorId)) { + if (executorIdToRunningTaskIds.contains(executorId)) { val hostPort = executorIdToHost(executorId) logExecutorLoss(executorId, hostPort, reason) removeExecutor(executorId, reason) @@ -525,7 +525,12 @@ private[spark] class TaskSchedulerImpl( * of any running tasks, since the loss reason defines whether we'll fail those tasks. */ private def removeExecutor(executorId: String, reason: ExecutorLossReason) { - executorIdToTaskCount -= executorId + executorIdToRunningTaskIds.remove(executorId).foreach { taskIds => + taskIds.foreach { tid => + taskIdToExecutorId.remove(tid) + taskIdToTaskSetManager.remove(tid) + } + } val host = executorIdToHost(executorId) val execs = executorsByHost.getOrElse(host, new HashSet) @@ -563,11 +568,11 @@ private[spark] class TaskSchedulerImpl( } def isExecutorAlive(execId: String): Boolean = synchronized { - executorIdToTaskCount.contains(execId) + executorIdToRunningTaskIds.contains(execId) } def isExecutorBusy(execId: String): Boolean = synchronized { - executorIdToTaskCount.getOrElse(execId, -1) > 0 + executorIdToRunningTaskIds.get(execId).exists(_.nonEmpty) } // By default, rack is unknown From e99cc8ffad9c47976d5743502852cc66f59452d3 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 22 Nov 2016 21:31:42 -0800 Subject: [PATCH 03/19] Update reflection in StandaloneDynamicAllocationSuite test. --- .../spark/deploy/StandaloneDynamicAllocationSuite.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 814027076d6fe..3027ff99ad370 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -433,10 +433,11 @@ class StandaloneDynamicAllocationSuite assert(executors.size === 2) // simulate running a task on the executor - val getMap = PrivateMethod[mutable.HashMap[String, Int]]('executorIdToTaskCount) + val getMap = + PrivateMethod[mutable.HashMap[String, mutable.HashSet[Long]]]('executorIdToRunningTaskIds) val taskScheduler = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl] - val executorIdToTaskCount = taskScheduler invokePrivate getMap() - executorIdToTaskCount(executors.head) = 1 + val executorIdToRunningTaskIds = taskScheduler invokePrivate getMap() + executorIdToRunningTaskIds(executors.head) = mutable.HashSet(1L) // kill the busy executor without force; this should fail assert(!killExecutor(sc, executors.head, force = false)) apps = getApplications() From 233cfa6d320100ee5759d42a13fdbf9c30cf620e Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 23 Nov 2016 00:29:08 -0800 Subject: [PATCH 04/19] Fix bug in statusUpdate() (which is under-tested). --- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 88b94d90510ed..57681812c7066 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -353,7 +353,9 @@ private[spark] class TaskSchedulerImpl( if (TaskState.isFinished(state)) { taskIdToTaskSetManager.remove(tid) taskIdToExecutorId.remove(tid).foreach { execId => - executorIdToRunningTaskIds.remove(execId) + if (executorIdToRunningTaskIds.contains(execId)) { + executorIdToRunningTaskIds(execId).remove(tid) + } } } if (state == TaskState.FINISHED) { From e0308fe62e2c9e7cbc8bbd313da08bb4e861b755 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 23 Nov 2016 00:40:53 -0800 Subject: [PATCH 05/19] Update to avoid changing Mesos fine-grained mode behavior. --- .../spark/scheduler/TaskSchedulerImpl.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 57681812c7066..9cc64bbe1529f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -337,18 +337,18 @@ private[spark] class TaskSchedulerImpl( var reason: Option[ExecutorLossReason] = None synchronized { try { - if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) { - // We lost this entire executor, so remember that it's gone - val execId = taskIdToExecutorId(tid) - - if (executorIdToRunningTaskIds.contains(execId)) { + taskIdToTaskSetManager.get(tid) match { + case Some(taskSet) if state == TaskState.LOST => + // TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode, + // where each executor corresponds to a single task, so mark the executor as failed. + val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException( + "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)")) reason = Some( SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.")) removeExecutor(execId, reason.get) failedExecutor = Some(execId) - } - } - taskIdToTaskSetManager.get(tid) match { + taskSet.removeRunningTask(tid) + taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) case Some(taskSet) => if (TaskState.isFinished(state)) { taskIdToTaskSetManager.remove(tid) @@ -361,7 +361,7 @@ private[spark] class TaskSchedulerImpl( if (state == TaskState.FINISHED) { taskSet.removeRunningTask(tid) taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) - } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { + } else if (Set(TaskState.FAILED, TaskState.KILLED).contains(state)) { taskSet.removeRunningTask(tid) taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) } From e26b7b7bd2457d4ff9c242242a2c8c538f89b876 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 23 Nov 2016 00:49:09 -0800 Subject: [PATCH 06/19] Update outdated comment. --- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 9cc64bbe1529f..ca9323dc8e602 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -88,7 +88,7 @@ private[spark] class TaskSchedulerImpl( // Incrementing task IDs val nextTaskId = new AtomicLong(0) - // Number of tasks running on each executor + // IDs of the tasks running on each executor private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]] def runningTasksByExecutors(): Map[String, Int] = synchronized { From cd04c1a45f0198d2674a781dfa9ff15329732917 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 23 Nov 2016 00:59:39 -0800 Subject: [PATCH 07/19] Add debug message to log record of state cleanup. --- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index ca9323dc8e602..b2ef67ebb9206 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -528,6 +528,8 @@ private[spark] class TaskSchedulerImpl( */ private def removeExecutor(executorId: String, reason: ExecutorLossReason) { executorIdToRunningTaskIds.remove(executorId).foreach { taskIds => + logDebug("Cleaning up TaskScheduler state for tasks " + + s"${taskIds.mkString("[", ",", "]")} on failed executor $executorId") taskIds.foreach { tid => taskIdToExecutorId.remove(tid) taskIdToTaskSetManager.remove(tid) From 8c5e9277b5a9fea54053b5142b4f4f9ec03829dc Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 23 Nov 2016 16:17:35 -0800 Subject: [PATCH 08/19] Revert synchronized change so it can be added in a separate followup PR. --- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index b2ef67ebb9206..fd79c1a894893 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -91,9 +91,7 @@ private[spark] class TaskSchedulerImpl( // IDs of the tasks running on each executor private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]] - def runningTasksByExecutors(): Map[String, Int] = synchronized { - executorIdToRunningTaskIds.toMap.mapValues(_.size) - } + def runningTasksByExecutors: Map[String, Int] = executorIdToRunningTaskIds.toMap.mapValues(_.size) // The set of executors we have on each host; this is used to compute hostsAlive, which // in turn is used to decide when we can attain data locality on a given host From af3d50eb7806c78d0ec4791833feed6b9c6cabc8 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 23 Nov 2016 16:22:13 -0800 Subject: [PATCH 09/19] Split synchronized fix into separate PR. --- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index fd79c1a894893..2e2286ba9596d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -91,7 +91,9 @@ private[spark] class TaskSchedulerImpl( // IDs of the tasks running on each executor private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]] - def runningTasksByExecutors: Map[String, Int] = executorIdToRunningTaskIds.toMap.mapValues(_.size) + def runningTasksByExecutors(): Map[String, Int] = { + executorIdToRunningTaskIds.toMap.mapValues(_.size) + } // The set of executors we have on each host; this is used to compute hostsAlive, which // in turn is used to decide when we can attain data locality on a given host From 0177d22adba320a40484a8436904ed79f4d47fc2 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 23 Nov 2016 16:26:42 -0800 Subject: [PATCH 10/19] Combine both Some(taskSet) cases. Suggested at https://github.com/apache/spark/pull/15986/files#r89411506 --- .../spark/scheduler/TaskSchedulerImpl.scala | 48 ++++++++++--------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 2e2286ba9596d..1835ff0b64bf6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -338,32 +338,34 @@ private[spark] class TaskSchedulerImpl( synchronized { try { taskIdToTaskSetManager.get(tid) match { - case Some(taskSet) if state == TaskState.LOST => - // TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode, - // where each executor corresponds to a single task, so mark the executor as failed. - val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException( - "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)")) - reason = Some( - SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.")) - removeExecutor(execId, reason.get) - failedExecutor = Some(execId) - taskSet.removeRunningTask(tid) - taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) case Some(taskSet) => - if (TaskState.isFinished(state)) { - taskIdToTaskSetManager.remove(tid) - taskIdToExecutorId.remove(tid).foreach { execId => - if (executorIdToRunningTaskIds.contains(execId)) { - executorIdToRunningTaskIds(execId).remove(tid) - } - } - } - if (state == TaskState.FINISHED) { - taskSet.removeRunningTask(tid) - taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) - } else if (Set(TaskState.FAILED, TaskState.KILLED).contains(state)) { + if (state == TaskState.LOST) { + // TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode, + // where each executor corresponds to a single task, so mark the executor as failed. + val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException( + "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)")) + reason = Some( + SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.")) + removeExecutor(execId, reason.get) + failedExecutor = Some(execId) taskSet.removeRunningTask(tid) taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) + } else { + if (TaskState.isFinished(state)) { + taskIdToTaskSetManager.remove(tid) + taskIdToExecutorId.remove(tid).foreach { execId => + if (executorIdToRunningTaskIds.contains(execId)) { + executorIdToRunningTaskIds(execId).remove(tid) + } + } + } + if (state == TaskState.FINISHED) { + taskSet.removeRunningTask(tid) + taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) + } else if (Set(TaskState.FAILED, TaskState.KILLED).contains(state)) { + taskSet.removeRunningTask(tid) + taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) + } } case None => logError( From efba89393849f993aafb733528d333198717340f Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 23 Nov 2016 16:30:35 -0800 Subject: [PATCH 11/19] Re-introduce executorIdToRunningTaskIds.contains(execId) check. --- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 1835ff0b64bf6..52bdcdae3a99f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -344,10 +344,12 @@ private[spark] class TaskSchedulerImpl( // where each executor corresponds to a single task, so mark the executor as failed. val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException( "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)")) - reason = Some( - SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.")) - removeExecutor(execId, reason.get) - failedExecutor = Some(execId) + if (executorIdToRunningTaskIds.contains(execId)) { + reason = Some( + SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.")) + removeExecutor(execId, reason.get) + failedExecutor = Some(execId) + } taskSet.removeRunningTask(tid) taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) } else { From 49949d21ec9a2617f8442fd65d1f33b09c6f429f Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 23 Nov 2016 16:46:48 -0800 Subject: [PATCH 12/19] Refactor shared code into cleanupTaskState() method. --- .../spark/scheduler/TaskSchedulerImpl.scala | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 52bdcdae3a99f..9a163f6e5553a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -354,12 +354,7 @@ private[spark] class TaskSchedulerImpl( taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) } else { if (TaskState.isFinished(state)) { - taskIdToTaskSetManager.remove(tid) - taskIdToExecutorId.remove(tid).foreach { execId => - if (executorIdToRunningTaskIds.contains(execId)) { - executorIdToRunningTaskIds(execId).remove(tid) - } - } + cleanupTaskState(tid) } if (state == TaskState.FINISHED) { taskSet.removeRunningTask(tid) @@ -525,19 +520,30 @@ private[spark] class TaskSchedulerImpl( logError(s"Lost executor $executorId on $hostPort: $reason") } + /** + * Cleans up the TaskScheduler's state for tracking the given task. + */ + private def cleanupTaskState(tid: Long): Unit = { + taskIdToTaskSetManager.remove(tid) + taskIdToExecutorId.remove(tid).foreach { executorId => + executorIdToRunningTaskIds.get(executorId).foreach { _.remove(tid) } + } + } + /** * Remove an executor from all our data structures and mark it as lost. If the executor's loss * reason is not yet known, do not yet remove its association with its host nor update the status * of any running tasks, since the loss reason defines whether we'll fail those tasks. */ private def removeExecutor(executorId: String, reason: ExecutorLossReason) { + // The tasks on the lost executor may not send any more status updates (because the executor + // has been lost), so they should be cleaned up here. executorIdToRunningTaskIds.remove(executorId).foreach { taskIds => logDebug("Cleaning up TaskScheduler state for tasks " + s"${taskIds.mkString("[", ",", "]")} on failed executor $executorId") - taskIds.foreach { tid => - taskIdToExecutorId.remove(tid) - taskIdToTaskSetManager.remove(tid) - } + // We do not notify the TaskSetManager of the task failures because that will + // happen below in the rootPool.executorLost() call. + taskIds.foreach(cleanupTaskState) } val host = executorIdToHost(executorId) From df4dc2d8dc66cf09f9d3c21db5cf331286851249 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 23 Nov 2016 16:49:23 -0800 Subject: [PATCH 13/19] Update "ignoring update" log message. --- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 9a163f6e5553a..df41676b3c7dd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -367,7 +367,8 @@ private[spark] class TaskSchedulerImpl( case None => logError( ("Ignoring update with state %s for TID %s because its task set is gone (this is " + - "likely the result of receiving duplicate task finished status updates)") + "likely the result of receiving duplicate task finished status updates) or its " + + "executor has been marked as failed.") .format(state, tid)) } } catch { From 0d5cdb4132aed9d96deac6969b498ac83de4690b Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 23 Nov 2016 16:49:56 -0800 Subject: [PATCH 14/19] Remove spurious 'new' --- .../org/apache/spark/scheduler/TaskSchedulerImplSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 405e0d37ca420..396996057ba3f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -284,7 +284,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L override def executorAdded(execId: String, host: String) {} } - val e0Offers = Seq(new WorkerOffer("executor0", "host0", 1)) + val e0Offers = Seq(WorkerOffer("executor0", "host0", 1)) val attempt1 = FakeTask.createTaskSet(1) // submit attempt 1, offer resources, task gets scheduled From 26ca80d22f873c0d37cbb3892fc9c4de196f1d12 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 23 Nov 2016 16:56:42 -0800 Subject: [PATCH 15/19] Add JIRA number to test case. --- .../org/apache/spark/scheduler/TaskSchedulerImplSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 396996057ba3f..f693bef1d8a5a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -274,7 +274,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L assert("executor1" === taskDescriptions3(0).executorId) } - test("if an executor is lost then state for tasks running on that executor is cleaned up") { + test("if an executor is lost then the state for its running tasks is cleaned up (SPARK-18553)") { sc = new SparkContext("local", "TaskSchedulerImplSuite") val taskScheduler = new TaskSchedulerImpl(sc) taskScheduler.initialize(new FakeSchedulerBackend) From 4e907c9b73e10639ea7dd602823ad8385705e750 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 27 Nov 2016 19:22:18 -0800 Subject: [PATCH 16/19] Add a test case for the TaskState.LOST case. --- .../scheduler/TaskSchedulerImplSuite.scala | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index f693bef1d8a5a..d37fdaf2f8ee7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler +import java.nio.ByteBuffer + import org.apache.spark._ import org.apache.spark.internal.Logging @@ -294,10 +296,50 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L // mark executor0 as dead taskScheduler.executorLost("executor0", SlaveLost()) + assert(!taskScheduler.isExecutorAlive("executor0")) + assert(!taskScheduler.hasExecutorsAliveOnHost("host0")) + assert(taskScheduler.getExecutorsAliveOnHost("host0").isEmpty) + // Check that state associated with the lost task attempt is cleaned up: assert(taskScheduler.taskIdToExecutorId.isEmpty) assert(taskScheduler.taskIdToTaskSetManager.isEmpty) assert(taskScheduler.runningTasksByExecutors().get("executor0").isEmpty) } + + test("if a task finishes with TaskState.LOST then mark its executor as dead") { + sc = new SparkContext("local", "TaskSchedulerImplSuite") + val taskScheduler = new TaskSchedulerImpl(sc) + taskScheduler.initialize(new FakeSchedulerBackend) + // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. + new DAGScheduler(sc, taskScheduler) { + override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} + override def executorAdded(execId: String, host: String) {} + } + + val e0Offers = Seq(WorkerOffer("executor0", "host0", 1)) + val attempt1 = FakeTask.createTaskSet(1) + + // submit attempt 1, offer resources, task gets scheduled + taskScheduler.submitTasks(attempt1) + val taskDescriptions = taskScheduler.resourceOffers(e0Offers).flatten + assert(1 === taskDescriptions.length) + + // Report the task as failed with TaskState.LOST + taskScheduler.statusUpdate( + tid = taskDescriptions.head.taskId, + state = TaskState.LOST, + serializedData = ByteBuffer.allocate(0) + ) + + // Check that state associated with the lost task attempt is cleaned up: + assert(taskScheduler.taskIdToExecutorId.isEmpty) + assert(taskScheduler.taskIdToTaskSetManager.isEmpty) + assert(taskScheduler.runningTasksByExecutors().get("executor0").isEmpty) + + // Check that the executor has been marked as dead + assert(!taskScheduler.isExecutorAlive("executor0")) + assert(!taskScheduler.hasExecutorsAliveOnHost("host0")) + assert(taskScheduler.getExecutorsAliveOnHost("host0").isEmpty) + } } From 3392903734bf5f00258f0652c971938846e64bcd Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 27 Nov 2016 19:29:22 -0800 Subject: [PATCH 17/19] Always call cleanupTaskState in statusUpdate. --- .../spark/scheduler/TaskSchedulerImpl.scala | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index df41676b3c7dd..a5f6e290a9227 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -350,20 +350,16 @@ private[spark] class TaskSchedulerImpl( removeExecutor(execId, reason.get) failedExecutor = Some(execId) } + } + if (TaskState.isFinished(state)) { + cleanupTaskState(tid) taskSet.removeRunningTask(tid) - taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) - } else { - if (TaskState.isFinished(state)) { - cleanupTaskState(tid) - } - if (state == TaskState.FINISHED) { - taskSet.removeRunningTask(tid) - taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) - } else if (Set(TaskState.FAILED, TaskState.KILLED).contains(state)) { - taskSet.removeRunningTask(tid) - taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) - } } + if (state == TaskState.FINISHED) { + taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) + } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { + taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) + } case None => logError( ("Ignoring update with state %s for TID %s because its task set is gone (this is " + From 4d6aed92bdf87307a1ebe9b990d17a89041a7089 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 27 Nov 2016 20:21:10 -0800 Subject: [PATCH 18/19] Fix formatting. --- .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index a5f6e290a9227..b2ef41e126871 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -354,12 +354,12 @@ private[spark] class TaskSchedulerImpl( if (TaskState.isFinished(state)) { cleanupTaskState(tid) taskSet.removeRunningTask(tid) + if (state == TaskState.FINISHED) { + taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) + } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { + taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) + } } - if (state == TaskState.FINISHED) { - taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) - } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { - taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) - } case None => logError( ("Ignoring update with state %s for TID %s because its task set is gone (this is " + From 9c6ce7e8ceadcdae3ce36a147aac7cf680d5a86f Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 27 Nov 2016 20:21:47 -0800 Subject: [PATCH 19/19] reword test case name --- .../org/apache/spark/scheduler/TaskSchedulerImplSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index d37fdaf2f8ee7..19b6fec9aec83 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -307,7 +307,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L assert(taskScheduler.runningTasksByExecutors().get("executor0").isEmpty) } - test("if a task finishes with TaskState.LOST then mark its executor as dead") { + test("if a task finishes with TaskState.LOST its executor is marked as dead") { sc = new SparkContext("local", "TaskSchedulerImplSuite") val taskScheduler = new TaskSchedulerImpl(sc) taskScheduler.initialize(new FakeSchedulerBackend)