From f8fcc3560e087440c7618b33cc892f3feafd4a3a Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Wed, 18 Oct 2017 22:24:38 -0700 Subject: [PATCH] [SPARK-22312][CORE] Fix bug in Executor allocation manager in running tasks calculation --- .../spark/ExecutorAllocationManager.scala | 30 +++++++++++-------- .../ExecutorAllocationManagerSuite.scala | 22 ++++++++++++++ 2 files changed, 40 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 119b426a9af3..104ff05bed5c 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -267,6 +267,10 @@ private[spark] class ExecutorAllocationManager( (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor } + private def totalRunningTasks(): Int = { + listener.totalRunningTasks + } + /** * This is called at a fixed interval to regulate the number of pending executor requests * and number of executors running. @@ -491,7 +495,6 @@ private[spark] class ExecutorAllocationManager( s"when it is already pending to be removed!") return false } - true } @@ -602,12 +605,11 @@ private[spark] class ExecutorAllocationManager( private class ExecutorAllocationListener extends SparkListener { private val stageIdToNumTasks = new mutable.HashMap[Int, Int] + // Number of running tasks per stage including speculative tasks. + // Should be 0 when no stages are active. + private val stageIdToNumRunningTask = new mutable.HashMap[Int, Int] private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]] private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]] - // Number of tasks currently running on the cluster including speculative tasks. - // Should be 0 when no stages are active. - private var numRunningTasks: Int = _ - // Number of speculative tasks to be scheduled in each stage private val stageIdToNumSpeculativeTasks = new mutable.HashMap[Int, Int] // The speculative tasks started in each stage @@ -625,6 +627,7 @@ private[spark] class ExecutorAllocationManager( val numTasks = stageSubmitted.stageInfo.numTasks allocationManager.synchronized { stageIdToNumTasks(stageId) = numTasks + stageIdToNumRunningTask(stageId) = 0 allocationManager.onSchedulerBacklogged() // Compute the number of tasks requested by the stage on each host @@ -651,6 +654,7 @@ private[spark] class ExecutorAllocationManager( val stageId = stageCompleted.stageInfo.stageId allocationManager.synchronized { stageIdToNumTasks -= stageId + stageIdToNumRunningTask -= stageId stageIdToNumSpeculativeTasks -= stageId stageIdToTaskIndices -= stageId stageIdToSpeculativeTaskIndices -= stageId @@ -663,10 +667,6 @@ private[spark] class ExecutorAllocationManager( // This is needed in case the stage is aborted for any reason if (stageIdToNumTasks.isEmpty && stageIdToNumSpeculativeTasks.isEmpty) { allocationManager.onSchedulerQueueEmpty() - if (numRunningTasks != 0) { - logWarning("No stages are running, but numRunningTasks != 0") - numRunningTasks = 0 - } } } } @@ -678,7 +678,9 @@ private[spark] class ExecutorAllocationManager( val executorId = taskStart.taskInfo.executorId allocationManager.synchronized { - numRunningTasks += 1 + if (stageIdToNumRunningTask.contains(stageId)) { + stageIdToNumRunningTask(stageId) = stageIdToNumRunningTask(stageId) + 1 + } // This guards against the race condition in which the `SparkListenerTaskStart` // event is posted before the `SparkListenerBlockManagerAdded` event, which is // possible because these events are posted in different threads. (see SPARK-4951) @@ -709,7 +711,9 @@ private[spark] class ExecutorAllocationManager( val taskIndex = taskEnd.taskInfo.index val stageId = taskEnd.stageId allocationManager.synchronized { - numRunningTasks -= 1 + if (stageIdToNumRunningTask.contains(stageId)) { + stageIdToNumRunningTask(stageId) = stageIdToNumRunningTask(stageId) - 1 + } // If the executor is no longer running any scheduled tasks, mark it as idle if (executorIdToTaskIds.contains(executorId)) { executorIdToTaskIds(executorId) -= taskId @@ -787,7 +791,9 @@ private[spark] class ExecutorAllocationManager( /** * The number of tasks currently running across all stages. */ - def totalRunningTasks(): Int = numRunningTasks + def totalRunningTasks(): Int = { + stageIdToNumRunningTask.values.sum + } /** * Return true if an executor is not currently running a task, and false otherwise. diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index a91e09b7cb69..8520aaf64c29 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -227,6 +227,23 @@ class ExecutorAllocationManagerSuite assert(numExecutorsToAdd(manager) === 1) } + test("Ignore task end events from completed stages") { + sc = createSparkContext(0, 10, 0) + val manager = sc.executorAllocationManager.get + val stage = createStageInfo(0, 5) + post(sc.listenerBus, SparkListenerStageSubmitted(stage)) + val taskInfo1 = createTaskInfo(0, 0, "executor-1") + val taskInfo2 = createTaskInfo(1, 1, "executor-1") + post(sc.listenerBus, SparkListenerTaskStart(0, 0, taskInfo1)) + post(sc.listenerBus, SparkListenerTaskStart(0, 0, taskInfo2)) + + post(sc.listenerBus, SparkListenerStageCompleted(stage)) + + post(sc.listenerBus, SparkListenerTaskEnd(0, 0, null, Success, taskInfo1, null)) + post(sc.listenerBus, SparkListenerTaskEnd(2, 0, null, Success, taskInfo2, null)) + assert(totalRunningTasks(manager) === 0) + } + test("cancel pending executors when no longer needed") { sc = createSparkContext(0, 10, 0) val manager = sc.executorAllocationManager.get @@ -1107,6 +1124,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { private val _localityAwareTasks = PrivateMethod[Int]('localityAwareTasks) private val _hostToLocalTaskCount = PrivateMethod[Map[String, Int]]('hostToLocalTaskCount) private val _onSpeculativeTaskSubmitted = PrivateMethod[Unit]('onSpeculativeTaskSubmitted) + private val _totalRunningTasks = PrivateMethod[Int]('totalRunningTasks) private def numExecutorsToAdd(manager: ExecutorAllocationManager): Int = { manager invokePrivate _numExecutorsToAdd() @@ -1190,6 +1208,10 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { manager invokePrivate _localityAwareTasks() } + private def totalRunningTasks(manager: ExecutorAllocationManager): Int = { + manager invokePrivate _totalRunningTasks() + } + private def hostToLocalTaskCount(manager: ExecutorAllocationManager): Map[String, Int] = { manager invokePrivate _hostToLocalTaskCount() }