From 59f9c156c3ad746f84f385bcf277685c9c329286 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 26 Oct 2017 09:00:34 +0800 Subject: [PATCH] Correctly handle maximum task failures introduced stage abortion scenario --- .../spark/ExecutorAllocationManager.scala | 54 ++++++++++++------- .../ExecutorAllocationManagerSuite.scala | 37 +++++++++++++ 2 files changed, 71 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 119b426a9af34..d020e389c9af9 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -603,10 +603,12 @@ private[spark] class ExecutorAllocationManager( private val stageIdToNumTasks = new mutable.HashMap[Int, Int] private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]] - private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]] - // Number of tasks currently running on the cluster including speculative tasks. - // Should be 0 when no stages are active. - private var numRunningTasks: Int = _ + + // Structure to track stage and numTasks running on executors, this is used to 1) calculate + // the total running tasks; 2) track if there're tasks running on specific executor, if not + // mark this executor as idle. + private val executorIdToStageAndNumTasks = + new mutable.HashMap[String, mutable.HashMap[Int, Int]] // Number of speculative tasks to be scheduled in each stage private val stageIdToNumSpeculativeTasks = new mutable.HashMap[Int, Int] @@ -656,6 +658,16 @@ private[spark] class ExecutorAllocationManager( stageIdToSpeculativeTaskIndices -= stageId stageIdToExecutorPlacementHints -= stageId + // Update the executor to task number statistics + executorIdToStageAndNumTasks.foreach { case (_, stageToNumTasks) => + stageToNumTasks -= stageId + } + val idleExecutorIds = executorIdToStageAndNumTasks.filter(_._2.isEmpty).keySet + idleExecutorIds.foreach { id => + executorIdToStageAndNumTasks.remove(id) + allocationManager.onExecutorIdle(id) + } + // Update the executor placement hints updateExecutorPlacementHints() @@ -663,22 +675,16 @@ private[spark] class ExecutorAllocationManager( // This is needed in case the stage is aborted for any reason if (stageIdToNumTasks.isEmpty && stageIdToNumSpeculativeTasks.isEmpty) { allocationManager.onSchedulerQueueEmpty() - if (numRunningTasks != 0) { - logWarning("No stages are running, but numRunningTasks != 0") - numRunningTasks = 0 - } } } } override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { val stageId = taskStart.stageId - val taskId = taskStart.taskInfo.taskId val taskIndex = taskStart.taskInfo.index val executorId = taskStart.taskInfo.executorId allocationManager.synchronized { - numRunningTasks += 1 // This guards against the race condition in which the `SparkListenerTaskStart` // event is posted before the `SparkListenerBlockManagerAdded` event, which is // possible because these events are posted in different threads. (see SPARK-4951) @@ -698,24 +704,30 @@ private[spark] class ExecutorAllocationManager( } // Mark the executor on which this task is scheduled as busy - executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet[Long]) += taskId + val stageToNumTasks = executorIdToStageAndNumTasks.getOrElseUpdate( + executorId, new mutable.HashMap[Int, Int]) + stageToNumTasks(stageId) = stageToNumTasks.getOrElse(stageId, 0) + 1 allocationManager.onExecutorBusy(executorId) } } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { val executorId = taskEnd.taskInfo.executorId - val taskId = taskEnd.taskInfo.taskId val taskIndex = taskEnd.taskInfo.index val stageId = taskEnd.stageId allocationManager.synchronized { - numRunningTasks -= 1 // If the executor is no longer running any scheduled tasks, mark it as idle - if (executorIdToTaskIds.contains(executorId)) { - executorIdToTaskIds(executorId) -= taskId - if (executorIdToTaskIds(executorId).isEmpty) { - executorIdToTaskIds -= executorId - allocationManager.onExecutorIdle(executorId) + if (executorIdToStageAndNumTasks.contains(executorId) && + executorIdToStageAndNumTasks(executorId).contains(stageId)) { + val taskNum = executorIdToStageAndNumTasks(executorId)(stageId) - 1 + if (taskNum <= 0) { + executorIdToStageAndNumTasks(executorId) -= stageId + if (executorIdToStageAndNumTasks(executorId).isEmpty) { + executorIdToStageAndNumTasks -= executorId + allocationManager.onExecutorIdle(executorId) + } + } else { + executorIdToStageAndNumTasks(executorId)(stageId) = taskNum } } @@ -787,7 +799,9 @@ private[spark] class ExecutorAllocationManager( /** * The number of tasks currently running across all stages. */ - def totalRunningTasks(): Int = numRunningTasks + def totalRunningTasks(): Int = { + executorIdToStageAndNumTasks.values.map(_.values.sum).sum + } /** * Return true if an executor is not currently running a task, and false otherwise. @@ -795,7 +809,7 @@ private[spark] class ExecutorAllocationManager( * Note: This is not thread-safe without the caller owning the `allocationManager` lock. */ def isExecutorIdle(executorId: String): Boolean = { - !executorIdToTaskIds.contains(executorId) + !executorIdToStageAndNumTasks.contains(executorId) } /** diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index a91e09b7cb69f..46a96477435b5 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -1029,6 +1029,43 @@ class ExecutorAllocationManagerSuite assert(removeTimes(manager) === Map.empty) } + test("correctly handle task failure introduces stage abortion scenario") { + sc = createSparkContext() + val manager = sc.executorAllocationManager.get + assert(maxNumExecutorsNeeded(manager) === 0) + + val stageInfo = createStageInfo(0, 3) + post(sc.listenerBus, SparkListenerStageSubmitted(stageInfo)) + assert(maxNumExecutorsNeeded(manager) === 3) + + val taskInfos = Seq( + createTaskInfo(1, 1, "executor-1"), + createTaskInfo(2, 2, "executor-1"), + createTaskInfo(3, 3, "executor-1")) + + taskInfos.foreach(task => post(sc.listenerBus, SparkListenerTaskStart(0, 0, task))) + assert(maxNumExecutorsNeeded(manager) === 3) + + // Simulate task 1 failed due to exception more than 4 times + post(sc.listenerBus, + SparkListenerTaskEnd( + 0, 0, null, ExceptionFailure(null, null, null, null, None), taskInfos(0), null)) + // 1 pending + 2 running + assert(maxNumExecutorsNeeded(manager) === 3) + + // Stage will be aborted when task is failed more than 4 times + post(sc.listenerBus, SparkListenerStageCompleted(stageInfo)) + // When stage is completed, all the related tasks should be finished + assert(maxNumExecutorsNeeded(manager) === 0) + + // Simulate task 2 is killed intentionally because of stage abortion + post(sc.listenerBus, SparkListenerTaskEnd(0, 0, null, TaskKilled(""), taskInfos(1), null)) + assert(maxNumExecutorsNeeded(manager) === 0) + + // TaskEnd event may never be delivered, still we should guarantee executor-1 can be removed. + assert(removeTimes(manager).keySet === Set("executor-1")) + } + private def createSparkContext( minExecutors: Int = 1, maxExecutors: Int = 5,