Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Executors we have requested the cluster manager to kill that have not died yet
private val executorsPendingToRemove = new HashSet[String]

// Number of executors requested from the cluster manager that have not been replaced yet
private var numReplacingExecutors = 0

// A map to store hostname with its possible task number running on it
protected var hostToLocalTaskCount: Map[String, Int] = Map.empty

Expand Down Expand Up @@ -147,6 +150,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
if (numReplacingExecutors > 0) {
numReplacingExecutors -= 1
logDebug(s"Decremented number of executors being replaced executors " +
s"($numReplacingExecutors left)")
}
}
// Note: some tests expect the reply to come after we put the executor in the map
context.reply(RegisteredExecutor)
Expand Down Expand Up @@ -431,7 +439,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// take into account executors that are pending to be added or removed.
if (!replace) {
doRequestTotalExecutors(
numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size
+ numReplacingExecutors)
} else {
numReplacingExecutors += knownExecutors.size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need another variable? Can't we just do

} else {
  numPendingExecutors += knownExecutors.size
}

This makes sense on a high level too; if we replace an executor we expect to get one back, so it should be pending in the mean time.

}

doKillExecutors(executorsToKill)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,32 @@ class StandaloneDynamicAllocationSuite
assert(master.apps.head.getExecutorLimit === 1)
}

test("the pending replacement executors should not be lost (SPARK-10515)") {
sc = new SparkContext(appConf)
val appId = sc.applicationId
assert(master.apps.size === 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update this to reflect the changes made in #8914

assert(master.apps.head.id === appId)
assert(master.apps.head.executors.size === 2)
assert(master.apps.head.getExecutorLimit === Int.MaxValue)
// sync executors between the Master and the driver, needed because
// the driver refuses to kill executors it does not know about
syncExecutors(sc)
val executors = getExecutorIds(sc)
assert(executors.size === 2)

// kill executor,and replace it
assert(sc.killAndReplaceExecutor(executors.head))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... this doesn't look right. It should probably be using a new version of killNExecutors that calls killAndReplaceExecutor and syncs things.

@andrewor14 might be a better person to comment on this, since he wrote the original tests. I'm not sure about how much we can trust the counts to update atomically when killNExecutors and friends are called, but the other tests seem to be passing reliably...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this might be flaky.

assert(master.apps.head.executors.size === 2)

assert(sc.killExecutor(executors.head))
assert(master.apps.head.executors.size === 2)
assert(master.apps.head.getExecutorLimit === 2)

assert(sc.killExecutor(executors(1)))
assert(master.apps.head.executors.size === 1)
assert(master.apps.head.getExecutorLimit === 1)
}

// ===============================
// | Utility methods for testing |
// ===============================
Expand Down