Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,10 @@ private[spark] class ExecutorAllocationManager(
executors.foreach { executorIdToBeRemoved =>
if (newExecutorTotal - 1 < minNumExecutors) {
logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " +
s"$newExecutorTotal executor(s) left (limit $minNumExecutors)")
s"$newExecutorTotal executor(s) left (minimum number of executor limit $minNumExecutors)")
} else if (newExecutorTotal - 1 < numExecutorsTarget) {
logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " +
s"$newExecutorTotal executor(s) left (number of executor target $numExecutorsTarget)")
} else if (canBeKilled(executorIdToBeRemoved)) {
executorIdsToBeRemoved += executorIdToBeRemoved
newExecutorTotal -= 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,47 @@ class ExecutorAllocationManagerSuite
assert(executorsPendingToRemove(manager).isEmpty)
}

test ("Removing with various numExecutorsTarget condition") {
sc = createSparkContext(5, 12, 5)
val manager = sc.executorAllocationManager.get

sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 8)))

// Remove when numExecutorsTarget is the same as the current number of executors
assert(addExecutors(manager) === 1)
assert(addExecutors(manager) === 2)
(1 to 8).map { i => createTaskInfo(i, i, s"$i") }.foreach {
info => sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, info)) }
assert(executorIds(manager).size === 8)
assert(numExecutorsTarget(manager) === 8)
assert(maxNumExecutorsNeeded(manager) == 8)
assert(!removeExecutor(manager, "1")) // won't work since numExecutorsTarget == numExecutors

// Remove executors when numExecutorsTarget is lower than current number of executors
(1 to 3).map { i => createTaskInfo(i, i, s"$i") }.foreach {
info => sc.listenerBus.postToAll(SparkListenerTaskEnd(0, 0, null, Success, info, null)) }
adjustRequestedExecutors(manager)
assert(executorIds(manager).size === 8)
assert(numExecutorsTarget(manager) === 5)
assert(maxNumExecutorsNeeded(manager) == 5)
assert(removeExecutor(manager, "1"))
assert(removeExecutors(manager, Seq("2", "3"))=== Seq("2", "3"))
onExecutorRemoved(manager, "1")
onExecutorRemoved(manager, "2")
onExecutorRemoved(manager, "3")

// numExecutorsTarget is lower than minNumExecutors
sc.listenerBus.postToAll(
SparkListenerTaskEnd(0, 0, null, Success, createTaskInfo(4, 4, "4"), null))
assert(executorIds(manager).size === 5)
assert(numExecutorsTarget(manager) === 5)
assert(maxNumExecutorsNeeded(manager) == 4)
assert(!removeExecutor(manager, "4")) // lower limit
assert(addExecutors(manager) === 0) // upper limit
}

test ("interleaving add and remove") {
sc = createSparkContext(5, 10, 5)
sc = createSparkContext(5, 12, 5)
val manager = sc.executorAllocationManager.get
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))

Expand All @@ -331,52 +370,59 @@ class ExecutorAllocationManagerSuite
onExecutorAdded(manager, "7")
onExecutorAdded(manager, "8")
assert(executorIds(manager).size === 8)
assert(numExecutorsTarget(manager) === 8)

// Remove until limit
assert(removeExecutor(manager, "1"))
assert(removeExecutors(manager, Seq("2", "3")) === Seq("2", "3"))
assert(!removeExecutor(manager, "4")) // lower limit reached
assert(!removeExecutor(manager, "5"))
onExecutorRemoved(manager, "1")
onExecutorRemoved(manager, "2")
onExecutorRemoved(manager, "3")
assert(executorIds(manager).size === 5)

// Add until limit
assert(addExecutors(manager) === 2) // upper limit reached
assert(addExecutors(manager) === 0)
assert(!removeExecutor(manager, "4")) // still at lower limit
assert((manager, Seq("5")) !== Seq("5"))
// Remove when numTargetExecutors is equal to the current number of executors
assert(!removeExecutor(manager, "1"))
assert(removeExecutors(manager, Seq("2", "3")) !== Seq("2", "3"))

// Remove until limit
onExecutorAdded(manager, "9")
onExecutorAdded(manager, "10")
onExecutorAdded(manager, "11")
onExecutorAdded(manager, "12")
onExecutorAdded(manager, "13")
assert(executorIds(manager).size === 10)
assert(executorIds(manager).size === 12)
assert(numExecutorsTarget(manager) === 8)

// Remove succeeds again, now that we are no longer at the lower limit
assert(removeExecutors(manager, Seq("4", "5", "6")) === Seq("4", "5", "6"))
assert(removeExecutor(manager, "7"))
assert(executorIds(manager).size === 10)
assert(addExecutors(manager) === 0)
assert(removeExecutor(manager, "1"))
assert(removeExecutors(manager, Seq("2", "3", "4")) === Seq("2", "3", "4"))
assert(!removeExecutor(manager, "5")) // lower limit reached
assert(!removeExecutor(manager, "6"))
onExecutorRemoved(manager, "1")
onExecutorRemoved(manager, "2")
onExecutorRemoved(manager, "3")
onExecutorRemoved(manager, "4")
onExecutorRemoved(manager, "5")
assert(executorIds(manager).size === 8)

// Number of executors pending restarts at 1
assert(numExecutorsToAdd(manager) === 1)
assert(addExecutors(manager) === 0)
assert(executorIds(manager).size === 8)
onExecutorRemoved(manager, "6")
onExecutorRemoved(manager, "7")
// Add until limit
assert(!removeExecutor(manager, "7")) // still at lower limit
assert((manager, Seq("8")) !== Seq("8"))
onExecutorAdded(manager, "13")
onExecutorAdded(manager, "14")
onExecutorAdded(manager, "15")
assert(executorIds(manager).size === 8)
assert(addExecutors(manager) === 0) // still at upper limit
onExecutorAdded(manager, "16")
assert(executorIds(manager).size === 12)

// Remove succeeds again, now that we are no longer at the lower limit
assert(removeExecutors(manager, Seq("5", "6", "7")) === Seq("5", "6", "7"))
assert(removeExecutor(manager, "8"))
assert(executorIds(manager).size === 12)
onExecutorRemoved(manager, "5")
onExecutorRemoved(manager, "6")
assert(executorIds(manager).size === 10)
assert(numExecutorsToAdd(manager) === 4)
onExecutorRemoved(manager, "9")
onExecutorRemoved(manager, "10")
assert(addExecutors(manager) === 4) // at upper limit
onExecutorAdded(manager, "17")
onExecutorAdded(manager, "18")
assert(executorIds(manager).size === 10)
assert(numExecutorsTarget(manager) === 10)
assert(addExecutors(manager) === 0) // still at upper limit
onExecutorAdded(manager, "19")
onExecutorAdded(manager, "20")
assert(executorIds(manager).size === 12)
assert(numExecutorsTarget(manager) === 12)
}

test("starting/canceling add timer") {
Expand Down Expand Up @@ -915,12 +961,17 @@ class ExecutorAllocationManagerSuite
onExecutorAdded(manager, "third")
onExecutorAdded(manager, "fourth")
onExecutorAdded(manager, "fifth")
assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth"))
onExecutorAdded(manager, "sixth")
onExecutorAdded(manager, "seventh")
onExecutorAdded(manager, "eighth")
assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth",
"sixth", "seventh", "eighth"))

removeExecutor(manager, "first")
removeExecutors(manager, Seq("second", "third"))
assert(executorsPendingToRemove(manager) === Set("first", "second", "third"))
assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth"))
assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth",
"sixth", "seventh", "eighth"))


// Cluster manager lost will make all the live executors lost, so here simulate this behavior
Expand Down