Skip to content

Commit 2d00012

Browse files
KaiXinXiaoLeiAndrew Or
authored andcommitted
[SPARK-10515] When killing executor, the pending replacement executors should not be lost
If the heartbeat receiver kills executors (and new ones are not registered to replace them), the idle timeout for the old executors will be lost (and then change a total number of executors requested by Driver), So new ones will be not to asked to replace them. For example, executorsPendingToRemove=Set(1), and executor 2 is idle timeout before a new executor is asked to replace executor 1. Then driver kill executor 2, and sending RequestExecutors to AM. But executorsPendingToRemove=Set(1,2), So AM doesn't allocate a executor to replace 1. see: #8668 Author: KaiXinXiaoLei <[email protected]> Author: huleilei <[email protected]> Closes #8945 from KaiXinXiaoLei/pendingexecutor.
1 parent 723aa75 commit 2d00012

File tree

2 files changed

+37
-0
lines changed

2 files changed

+37
-0
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
438438
if (!replace) {
439439
doRequestTotalExecutors(
440440
numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
441+
} else {
442+
numPendingExecutors += knownExecutors.size
441443
}
442444

443445
doKillExecutors(executorsToKill)

core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,41 @@ class StandaloneDynamicAllocationSuite
369369
assert(apps.head.getExecutorLimit === 1)
370370
}
371371

372+
test("the pending replacement executors should not be lost (SPARK-10515)") {
373+
sc = new SparkContext(appConf)
374+
val appId = sc.applicationId
375+
eventually(timeout(10.seconds), interval(10.millis)) {
376+
val apps = getApplications()
377+
assert(apps.size === 1)
378+
assert(apps.head.id === appId)
379+
assert(apps.head.executors.size === 2)
380+
assert(apps.head.getExecutorLimit === Int.MaxValue)
381+
}
382+
// sync executors between the Master and the driver, needed because
383+
// the driver refuses to kill executors it does not know about
384+
syncExecutors(sc)
385+
val executors = getExecutorIds(sc)
386+
assert(executors.size === 2)
387+
// kill executor 1, and replace it
388+
assert(sc.killAndReplaceExecutor(executors.head))
389+
eventually(timeout(10.seconds), interval(10.millis)) {
390+
val apps = getApplications()
391+
assert(apps.head.executors.size === 2)
392+
}
393+
394+
var apps = getApplications()
395+
// kill executor 1
396+
assert(sc.killExecutor(executors.head))
397+
apps = getApplications()
398+
assert(apps.head.executors.size === 2)
399+
assert(apps.head.getExecutorLimit === 2)
400+
// kill executor 2
401+
assert(sc.killExecutor(executors(1)))
402+
apps = getApplications()
403+
assert(apps.head.executors.size === 1)
404+
assert(apps.head.getExecutorLimit === 1)
405+
}
406+
372407
// ===============================
373408
// | Utility methods for testing |
374409
// ===============================

0 commit comments

Comments
 (0)