From e559482ae4404320a1a242d0aa082f00611e02bb Mon Sep 17 00:00:00 2001 From: huleilei Date: Wed, 30 Sep 2015 13:16:27 +0800 Subject: [PATCH 1/6] add unit test --- .../StandaloneDynamicAllocationSuite.scala | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 2e2fa22eb4772..d7b473424ac38 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -369,6 +369,35 @@ class StandaloneDynamicAllocationSuite assert(apps.head.getExecutorLimit === 1) } +test("the pending replacement executors should not be lost (SPARK-10515)") { + sc = new SparkContext(appConf) + val appId = sc.applicationId + eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(apps.size === 1) + assert(apps.head.id === appId) + assert(apps.head.executors.size === 2) + assert(apps.head.getExecutorLimit === Int.MaxValue) + } + // sync executors between the Master and the driver, needed because + // the driver refuses to kill executors it does not know about + syncExecutors(sc) + val executors = getExecutorIds(sc) + assert(executors.size === 2) + + // kill executor,and replace it + assert(sc.killAndReplaceExecutor(executors.head)) + assert(master.apps.head.executors.size === 2) + + assert(sc.killExecutor(executors.head)) + assert(master.apps.head.executors.size === 2) + assert(master.apps.head.getExecutorLimit === 2) + + assert(sc.killExecutor(executors(1))) + assert(master.apps.head.executors.size === 1) + assert(master.apps.head.getExecutorLimit === 1) + } + // =============================== // | Utility methods for testing | // =============================== From 1bdde8e88ed36194e020e773db415e65181357a9 Mon Sep 17 00:00:00 2001 From: huleilei Date: Wed, 30 Sep 2015 14:35:51 +0800 Subject: [PATCH 2/6] add numPendingExecutors --- .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 18771f79b44bb..55a564b5c8eac 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -438,6 +438,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp if (!replace) { doRequestTotalExecutors( numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size) + } else { + numPendingExecutors += knownExecutors.size } doKillExecutors(executorsToKill) From e382315e9905b735837bc37a63acd16b72242ada Mon Sep 17 00:00:00 2001 From: KaiXinXiaoLei Date: Thu, 8 Oct 2015 14:51:11 +0800 Subject: [PATCH 3/6] indentation style, thanks --- .../StandaloneDynamicAllocationSuite.scala | 48 +++++++++---------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index d7b473424ac38..3b62b725f3471 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -369,33 +369,33 @@ class StandaloneDynamicAllocationSuite assert(apps.head.getExecutorLimit === 1) } -test("the pending replacement executors should not be lost (SPARK-10515)") { - sc = new SparkContext(appConf) - val appId = sc.applicationId - eventually(timeout(10.seconds), interval(10.millis)) { - val apps = getApplications() - assert(apps.size === 1) - assert(apps.head.id === appId) - assert(apps.head.executors.size === 2) - assert(apps.head.getExecutorLimit === Int.MaxValue) - } - // sync executors between the Master and the driver, needed because - // the driver refuses to kill executors it does not know about - syncExecutors(sc) - val executors = getExecutorIds(sc) - assert(executors.size === 2) + test("the pending replacement executors should not be lost (SPARK-10515)") { + sc = new SparkContext(appConf) + val appId = sc.applicationId + eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(apps.size === 1) + assert(apps.head.id === appId) + assert(apps.head.executors.size === 2) + assert(apps.head.getExecutorLimit === Int.MaxValue) + } + // sync executors between the Master and the driver, needed because + // the driver refuses to kill executors it does not know about + syncExecutors(sc) + val executors = getExecutorIds(sc) + assert(executors.size === 2) - // kill executor,and replace it - assert(sc.killAndReplaceExecutor(executors.head)) - assert(master.apps.head.executors.size === 2) + // kill executor,and replace it + assert(sc.killAndReplaceExecutor(executors.head)) + assert(master.apps.head.executors.size === 2) - assert(sc.killExecutor(executors.head)) - assert(master.apps.head.executors.size === 2) - assert(master.apps.head.getExecutorLimit === 2) + assert(sc.killExecutor(executors.head)) + assert(master.apps.head.executors.size === 2) + assert(master.apps.head.getExecutorLimit === 2) - assert(sc.killExecutor(executors(1))) - assert(master.apps.head.executors.size === 1) - assert(master.apps.head.getExecutorLimit === 1) + assert(sc.killExecutor(executors(1))) + assert(master.apps.head.executors.size === 1) + assert(master.apps.head.getExecutorLimit === 1) } // =============================== From b7b42ccbd8364fedbd652e6763fd3c566539bf93 Mon Sep 17 00:00:00 2001 From: KaiXinXiaoLei Date: Fri, 9 Oct 2015 12:48:49 +0800 Subject: [PATCH 4/6] change test --- .../StandaloneDynamicAllocationSuite.scala | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 3b62b725f3471..c3620a4773fed 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -385,17 +385,20 @@ class StandaloneDynamicAllocationSuite val executors = getExecutorIds(sc) assert(executors.size === 2) - // kill executor,and replace it + // kill executor 1, and replace it assert(sc.killAndReplaceExecutor(executors.head)) - assert(master.apps.head.executors.size === 2) - - assert(sc.killExecutor(executors.head)) - assert(master.apps.head.executors.size === 2) - assert(master.apps.head.getExecutorLimit === 2) - - assert(sc.killExecutor(executors(1))) - assert(master.apps.head.executors.size === 1) - assert(master.apps.head.getExecutorLimit === 1) + var apps = getApplications() + assert(apps.head.executors.size === 2) + // kill executor 1 + assert(killNExecutors(sc, 1)) + apps = getApplications() + assert(apps.head.executors.size === 2) + assert(apps.head.getExecutorLimit === 2) + // kill all executors + assert(killAllExecutors(sc) + apps = getApplications() + assert(apps.head.executors.size === 1) + assert(apps.head.getExecutorLimit === 1) } // =============================== From cb69dc5b1795e49a1f10db2db0e7fe8a5ba6e5cc Mon Sep 17 00:00:00 2001 From: KaiXinXiaoLei Date: Fri, 9 Oct 2015 17:00:00 +0800 Subject: [PATCH 5/6] change test --- .../spark/deploy/StandaloneDynamicAllocationSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index c3620a4773fed..82f5fdfc6d264 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -390,12 +390,12 @@ class StandaloneDynamicAllocationSuite var apps = getApplications() assert(apps.head.executors.size === 2) // kill executor 1 - assert(killNExecutors(sc, 1)) + assert(sc.killExecutor(executors.head)) apps = getApplications() assert(apps.head.executors.size === 2) assert(apps.head.getExecutorLimit === 2) - // kill all executors - assert(killAllExecutors(sc) + // kill executor 2 + assert(sc.killExecutor(executors(1))) apps = getApplications() assert(apps.head.executors.size === 1) assert(apps.head.getExecutorLimit === 1) From da13040aca20f0c739be8958675f5b39dac4d82c Mon Sep 17 00:00:00 2001 From: KaiXinXiaoLei Date: Tue, 13 Oct 2015 15:02:40 +0800 Subject: [PATCH 6/6] Make sure the total number not changed. Kill a executor and a new executor should replaces it. Make sure the total number of executor be not changed. --- .../spark/deploy/StandaloneDynamicAllocationSuite.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 82f5fdfc6d264..d145e78834b1b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -384,11 +384,14 @@ class StandaloneDynamicAllocationSuite syncExecutors(sc) val executors = getExecutorIds(sc) assert(executors.size === 2) - // kill executor 1, and replace it assert(sc.killAndReplaceExecutor(executors.head)) + eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(apps.head.executors.size === 2) + } + var apps = getApplications() - assert(apps.head.executors.size === 2) // kill executor 1 assert(sc.killExecutor(executors.head)) apps = getApplications()