Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -386,15 +386,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* Reset the state of CoarseGrainedSchedulerBackend to the initial state. Currently it will only
* be called in the yarn-client mode when AM re-registers after a failure.
* */
protected def reset(): Unit = synchronized {
numPendingExecutors = 0
executorsPendingToRemove.clear()
protected def reset(): Unit = {
val executors = synchronized {
numPendingExecutors = 0
executorsPendingToRemove.clear()
Set() ++ executorDataMap.keys
}

// Remove all the lingering executors that should be removed but not yet. The reason might be
// because (1) disconnected event is not yet received; (2) executors die silently.
executorDataMap.toMap.foreach { case (eid, _) =>
driverEndpoint.askWithRetry[Boolean](
RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered.")))
executors.foreach { eid =>
removeExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered."))
}
}

Expand Down