From a03023de6a3715bb8b86e082eed7ff1190db9014 Mon Sep 17 00:00:00 2001 From: KaiXinXiaoLei Date: Wed, 9 Sep 2015 21:31:20 +0800 Subject: [PATCH 01/10] delete --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 8 -------- 1 file changed, 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 5730a87f960a0..27af3fd7af07a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -426,14 +426,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val executorsToKill = knownExecutors.filter { id => !executorsPendingToRemove.contains(id) } executorsPendingToRemove ++= executorsToKill - // If we do not wish to replace the executors we kill, sync the target number of executors - // with the cluster manager to avoid allocating new ones. When computing the new target, - // take into account executors that are pending to be added or removed. - if (!replace) { - doRequestTotalExecutors( - numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size) - } - doKillExecutors(executorsToKill) } From 773a11ed8460c5a60b3e498d2089298bfac112bb Mon Sep 17 00:00:00 2001 From: KaiXinXiaoLei Date: Sat, 12 Sep 2015 15:05:10 +0800 Subject: [PATCH 02/10] change file --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 27af3fd7af07a..887be71c4acac 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -426,6 +426,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val executorsToKill = knownExecutors.filter { id => !executorsPendingToRemove.contains(id) } executorsPendingToRemove ++= executorsToKill + // If we do not wish to replace the executors we kill, sync the target number of executors + // with the cluster manager to avoid allocating new ones. When computing the new target, + // take into account executors that are pending to be added or removed. + if (!replace) { + doRequestTotalExecutors( + numExistingExecutors + numPendingExecutors - knownExecutors.size) + } + doKillExecutors(executorsToKill) } From 558cd04764d715b114e16d09a81411baa7de174a Mon Sep 17 00:00:00 2001 From: KaiXinXiaoLei Date: Sat, 12 Sep 2015 17:57:59 +0800 Subject: [PATCH 03/10] Recovery --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 8 -------- 1 file changed, 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 887be71c4acac..27af3fd7af07a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -426,14 +426,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val executorsToKill = knownExecutors.filter { id => !executorsPendingToRemove.contains(id) } executorsPendingToRemove ++= executorsToKill - // If we do not wish to replace the executors we kill, sync the target number of executors - // with the cluster manager to avoid allocating new ones. When computing the new target, - // take into account executors that are pending to be added or removed. - if (!replace) { - doRequestTotalExecutors( - numExistingExecutors + numPendingExecutors - knownExecutors.size) - } - doKillExecutors(executorsToKill) } From 5882a10c23cf63a6dde6e9dc1b0a02e7156ebae1 Mon Sep 17 00:00:00 2001 From: KaiXinXiaoLei Date: Thu, 17 Sep 2015 16:30:02 +0800 Subject: [PATCH 04/10] add numReplacingExecutors --- .../CoarseGrainedSchedulerBackend.scala | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 27af3fd7af07a..366f6236f7f4b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -65,6 +65,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Executors we have requested the cluster manager to kill that have not died yet private val executorsPendingToRemove = new HashSet[String] + + // Executors we have requested the cluster manager to replace with new ones that have killed + private val executorsToReplace = new HashSet[String] + + // Number of executors requested from the cluster manager that have not replaced yet + private var numReplacingExecutors = 0 // A map to store hostname with its possible task number running on it protected var hostToLocalTaskCount: Map[String, Int] = Map.empty @@ -147,6 +153,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp numPendingExecutors -= 1 logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") } + if (numReplacingExecutors > 0) { + numReplacingExecutors -= 1 + logError(s"Decremented number of replaceing executors ($numReplacingExecutors left)") + } } // Note: some tests expect the reply to come after we put the executor in the map context.reply(RegisteredExecutor) @@ -236,6 +246,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp addressToExecutorId -= executorInfo.executorAddress executorDataMap -= executorId executorsPendingToRemove -= executorId + if (executorsToReplace.contains(executorId)) { + executorsToReplace -= executorId + if (numReplacingExecutors > 0) { + numReplacingExecutors -= 1 + } + } } totalCoreCount.addAndGet(-executorInfo.totalCores) totalRegisteredExecutors.addAndGet(-1) @@ -426,6 +442,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val executorsToKill = knownExecutors.filter { id => !executorsPendingToRemove.contains(id) } executorsPendingToRemove ++= executorsToKill + // If we do not wish to replace the executors we kill, sync the target number of executors + // with the cluster manager to avoid allocating new ones. When computing the new target, + // take into account executors that are pending to be added or removed. + if (!replace) { + doRequestTotalExecutors( + numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size + numReplacingExecutors) + } else { + executorsToReplace ++= knownExecutors + numReplacingExecutors += knownExecutors.size + } + doKillExecutors(executorsToKill) } From cf56d212dd4f133fa0e27a0f748e5524ceab9534 Mon Sep 17 00:00:00 2001 From: KaiXinXiaoLei Date: Thu, 17 Sep 2015 16:31:10 +0800 Subject: [PATCH 05/10] change logError to logDebug --- .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 366f6236f7f4b..0a3964300fc4c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -155,7 +155,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } if (numReplacingExecutors > 0) { numReplacingExecutors -= 1 - logError(s"Decremented number of replaceing executors ($numReplacingExecutors left)") + logDebug(s"Decremented number of replaceing executors ($numReplacingExecutors left)") } } // Note: some tests expect the reply to come after we put the executor in the map From 272242573f5a812ef48ebb7ec4301da38124b961 Mon Sep 17 00:00:00 2001 From: KaiXinXiaoLei Date: Thu, 17 Sep 2015 20:18:01 +0800 Subject: [PATCH 06/10] scala style, thanks --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 0a3964300fc4c..250fb49ef7b92 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -65,7 +65,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Executors we have requested the cluster manager to kill that have not died yet private val executorsPendingToRemove = new HashSet[String] - + // Executors we have requested the cluster manager to replace with new ones that have killed private val executorsToReplace = new HashSet[String] @@ -447,7 +447,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // take into account executors that are pending to be added or removed. if (!replace) { doRequestTotalExecutors( - numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size + numReplacingExecutors) + numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size + + numReplacingExecutors) } else { executorsToReplace ++= knownExecutors numReplacingExecutors += knownExecutors.size From 7e0c199e5c0f78273d39e333e407a7baef41a5fc Mon Sep 17 00:00:00 2001 From: KaiXinXiaoLei Date: Tue, 22 Sep 2015 23:07:00 +0800 Subject: [PATCH 07/10] delete not need code --- .../cluster/CoarseGrainedSchedulerBackend.scala | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 250fb49ef7b92..c0c2981e24806 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -66,9 +66,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Executors we have requested the cluster manager to kill that have not died yet private val executorsPendingToRemove = new HashSet[String] - // Executors we have requested the cluster manager to replace with new ones that have killed - private val executorsToReplace = new HashSet[String] - // Number of executors requested from the cluster manager that have not replaced yet private var numReplacingExecutors = 0 @@ -246,12 +243,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp addressToExecutorId -= executorInfo.executorAddress executorDataMap -= executorId executorsPendingToRemove -= executorId - if (executorsToReplace.contains(executorId)) { - executorsToReplace -= executorId - if (numReplacingExecutors > 0) { - numReplacingExecutors -= 1 - } - } } totalCoreCount.addAndGet(-executorInfo.totalCores) totalRegisteredExecutors.addAndGet(-1) @@ -450,7 +441,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size + numReplacingExecutors) } else { - executorsToReplace ++= knownExecutors numReplacingExecutors += knownExecutors.size } From d738641590f62adacd19aaaa8cf5f8b61150372a Mon Sep 17 00:00:00 2001 From: KaiXinXiaoLei Date: Wed, 23 Sep 2015 11:35:32 +0800 Subject: [PATCH 08/10] add unit test --- .../StandaloneDynamicAllocationSuite.scala | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 1f2a0f0d309ce..85ef4e324cbe5 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -303,6 +303,32 @@ class StandaloneDynamicAllocationSuite assert(master.apps.head.getExecutorLimit === 1) } + test("the pending replacement executors should not be lost (SPARK-10515)") { + sc = new SparkContext(appConf) + val appId = sc.applicationId + assert(master.apps.size === 1) + assert(master.apps.head.id === appId) + assert(master.apps.head.executors.size === 2) + assert(master.apps.head.getExecutorLimit === Int.MaxValue) + // sync executors between the Master and the driver, needed because + // the driver refuses to kill executors it does not know about + syncExecutors(sc) + val executors = getExecutorIds(sc) + assert(executors.size === 2) + + // kill executor,and replace it + assert(sc.killAndReplaceExecutor(executors.head)) + assert(master.apps.head.executors.size === 2) + + assert(sc.killExecutor(executors.head)) + assert(master.apps.head.executors.size === 2) + assert(master.apps.head.getExecutorLimit === 2) + + assert(sc.killExecutor(executors(1))) + assert(master.apps.head.executors.size === 1) + assert(master.apps.head.getExecutorLimit === 1) + } + // =============================== // | Utility methods for testing | // =============================== From 71a59a3c2c256f5e448e3678c9128c28dff9f317 Mon Sep 17 00:00:00 2001 From: KaiXinXiaoLei Date: Wed, 23 Sep 2015 11:39:13 +0800 Subject: [PATCH 09/10] change state --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index c0c2981e24806..532a503f928f9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -66,7 +66,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Executors we have requested the cluster manager to kill that have not died yet private val executorsPendingToRemove = new HashSet[String] - // Number of executors requested from the cluster manager that have not replaced yet + // Number of executors requested from the cluster manager that have not been replaced yet private var numReplacingExecutors = 0 // A map to store hostname with its possible task number running on it @@ -152,7 +152,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } if (numReplacingExecutors > 0) { numReplacingExecutors -= 1 - logDebug(s"Decremented number of replaceing executors ($numReplacingExecutors left)") + logDebug(s"Decremented number of executors being replaced executors + ($numReplacingExecutors left)") } } // Note: some tests expect the reply to come after we put the executor in the map From 0041fde08d903102f27dff8adca72997736c7387 Mon Sep 17 00:00:00 2001 From: KaiXinXiaoLei Date: Wed, 23 Sep 2015 12:01:09 +0800 Subject: [PATCH 10/10] scala style, thanks --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 532a503f928f9..cedcbcacd9c6e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -152,8 +152,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } if (numReplacingExecutors > 0) { numReplacingExecutors -= 1 - logDebug(s"Decremented number of executors being replaced executors - ($numReplacingExecutors left)") + logDebug(s"Decremented number of executors being replaced executors " + + s"($numReplacingExecutors left)") } } // Note: some tests expect the reply to come after we put the executor in the map