diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 5730a87f960a0..cedcbcacd9c6e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -66,6 +66,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Executors we have requested the cluster manager to kill that have not died yet private val executorsPendingToRemove = new HashSet[String] + // Number of executors requested from the cluster manager that have not been replaced yet + private var numReplacingExecutors = 0 + // A map to store hostname with its possible task number running on it protected var hostToLocalTaskCount: Map[String, Int] = Map.empty @@ -147,6 +150,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp numPendingExecutors -= 1 logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") } + if (numReplacingExecutors > 0) { + numReplacingExecutors -= 1 + logDebug(s"Decremented number of executors being replaced executors " + + s"($numReplacingExecutors left)") + } } // Note: some tests expect the reply to come after we put the executor in the map context.reply(RegisteredExecutor) @@ -431,7 +439,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // take into account executors that are pending to be added or removed. if (!replace) { doRequestTotalExecutors( - numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size) + numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size + + numReplacingExecutors) + } else { + numReplacingExecutors += knownExecutors.size } doKillExecutors(executorsToKill) diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 1f2a0f0d309ce..85ef4e324cbe5 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -303,6 +303,32 @@ class StandaloneDynamicAllocationSuite assert(master.apps.head.getExecutorLimit === 1) } + test("the pending replacement executors should not be lost (SPARK-10515)") { + sc = new SparkContext(appConf) + val appId = sc.applicationId + assert(master.apps.size === 1) + assert(master.apps.head.id === appId) + assert(master.apps.head.executors.size === 2) + assert(master.apps.head.getExecutorLimit === Int.MaxValue) + // sync executors between the Master and the driver, needed because + // the driver refuses to kill executors it does not know about + syncExecutors(sc) + val executors = getExecutorIds(sc) + assert(executors.size === 2) + + // kill executor,and replace it + assert(sc.killAndReplaceExecutor(executors.head)) + assert(master.apps.head.executors.size === 2) + + assert(sc.killExecutor(executors.head)) + assert(master.apps.head.executors.size === 2) + assert(master.apps.head.getExecutorLimit === 2) + + assert(sc.killExecutor(executors(1))) + assert(master.apps.head.executors.size === 1) + assert(master.apps.head.getExecutorLimit === 1) + } + // =============================== // | Utility methods for testing | // ===============================