Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ private[spark] class ExecutorAllocationManager(
private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0)
private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors",
Integer.MAX_VALUE)
private val initialNumExecutors = conf.getInt("spark.dynamicAllocation.initialExecutors",
minNumExecutors)

// How long there must be backlogged tasks for before an addition is triggered (seconds)
private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds(
Expand Down Expand Up @@ -121,8 +123,7 @@ private[spark] class ExecutorAllocationManager(

// The desired number of executors at this moment in time. If all our executors were to die, this
// is the number of executors we would immediately want from the cluster manager.
private var numExecutorsTarget =
conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors)
private var numExecutorsTarget = initialNumExecutors

// Executors that have been requested to be removed but have not been killed yet
private val executorsPendingToRemove = new mutable.HashSet[String]
Expand Down Expand Up @@ -240,6 +241,19 @@ private[spark] class ExecutorAllocationManager(
executor.awaitTermination(10, TimeUnit.SECONDS)
}

/**
* Reset the allocation manager to the initial state. Currently this will only be called in
* yarn-client mode when AM re-registers after a failure.
*/
def reset(): Unit = synchronized {
initializing = true
numExecutorsTarget = initialNumExecutors
numExecutorsToAdd = 1

executorsPendingToRemove.clear()
removeTimes.clear()
}

/**
* The maximum number of executors we would need under the current load to satisfy all running
* and pending tasks, rounded up.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,25 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
}

/**
* Reset the state of CoarseGrainedSchedulerBackend to the initial state. Currently it will only
* be called in the yarn-client mode when AM re-registers after a failure, also dynamic
* allocation is enabled.
* */
protected def reset(): Unit = synchronized {
if (Utils.isDynamicAllocationEnabled(conf)) {
numPendingExecutors = 0
executorsPendingToRemove.clear()

// Remove all the lingering executors that should be removed but not yet. The reason might be
// because (1) disconnected event is not yet received; (2) executors die silently.
executorDataMap.toMap.foreach { case (eid, _) =>
driverEndpoint.askWithRetry[Boolean](
RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered.")))
}
}
}

override def reviveOffers() {
driverEndpoint.send(ReviveOffers)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,90 @@ class ExecutorAllocationManagerSuite
assert(maxNumExecutorsNeeded(manager) === 1)
}

test("reset the state of allocation manager") {
sc = createSparkContext()
val manager = sc.executorAllocationManager.get
assert(numExecutorsTarget(manager) === 1)
assert(numExecutorsToAdd(manager) === 1)

// Allocation manager is reset when adding executor requests are sent without reporting back
// executor added.
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 10)))

assert(addExecutors(manager) === 1)
assert(numExecutorsTarget(manager) === 2)
assert(addExecutors(manager) === 2)
assert(numExecutorsTarget(manager) === 4)
assert(addExecutors(manager) === 1)
assert(numExecutorsTarget(manager) === 5)

manager.reset()
assert(numExecutorsTarget(manager) === 1)
assert(numExecutorsToAdd(manager) === 1)
assert(executorIds(manager) === Set.empty)

// Allocation manager is reset when executors are added.
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 10)))

addExecutors(manager)
addExecutors(manager)
addExecutors(manager)
assert(numExecutorsTarget(manager) === 5)

onExecutorAdded(manager, "first")
onExecutorAdded(manager, "second")
onExecutorAdded(manager, "third")
onExecutorAdded(manager, "fourth")
onExecutorAdded(manager, "fifth")
assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth"))

// Cluster manager lost will make all the live executors lost, so here simulate this behavior
onExecutorRemoved(manager, "first")
onExecutorRemoved(manager, "second")
onExecutorRemoved(manager, "third")
onExecutorRemoved(manager, "fourth")
onExecutorRemoved(manager, "fifth")

manager.reset()
assert(numExecutorsTarget(manager) === 1)
assert(numExecutorsToAdd(manager) === 1)
assert(executorIds(manager) === Set.empty)
assert(removeTimes(manager) === Map.empty)

// Allocation manager is reset when executors are pending to remove
addExecutors(manager)
addExecutors(manager)
addExecutors(manager)
assert(numExecutorsTarget(manager) === 5)

onExecutorAdded(manager, "first")
onExecutorAdded(manager, "second")
onExecutorAdded(manager, "third")
onExecutorAdded(manager, "fourth")
onExecutorAdded(manager, "fifth")
assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth"))

removeExecutor(manager, "first")
removeExecutor(manager, "second")
assert(executorsPendingToRemove(manager) === Set("first", "second"))
assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth"))


// Cluster manager lost will make all the live executors lost, so here simulate this behavior
onExecutorRemoved(manager, "first")
onExecutorRemoved(manager, "second")
onExecutorRemoved(manager, "third")
onExecutorRemoved(manager, "fourth")
onExecutorRemoved(manager, "fifth")

manager.reset()

assert(numExecutorsTarget(manager) === 1)
assert(numExecutorsToAdd(manager) === 1)
assert(executorsPendingToRemove(manager) === Set.empty)
assert(removeTimes(manager) === Map.empty)
}

private def createSparkContext(
minExecutors: Int = 1,
maxExecutors: Int = 5,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ private[spark] abstract class YarnSchedulerBackend(
/** Scheduler extension services. */
private val services: SchedulerExtensionServices = new SchedulerExtensionServices()

// Flag to specify whether this schedulerBackend should be reset.
private var shouldResetOnAmRegister = false

/**
* Bind to YARN. This *must* be done before calling [[start()]].
*
Expand Down Expand Up @@ -155,6 +158,16 @@ private[spark] abstract class YarnSchedulerBackend(
new YarnDriverEndpoint(rpcEnv, properties)
}

/**
* Reset the state of SchedulerBackend to the initial state. This is happened when AM is failed
* and re-registered itself to driver after a failure. The stale state in driver should be
* cleaned.
*/
override protected def reset(): Unit = {
super.reset()
sc.executorAllocationManager.foreach(_.reset())
}

/**
* Override the DriverEndpoint to add extra logic for the case when an executor is disconnected.
* This endpoint communicates with the executors and queries the AM for an executor's exit
Expand Down Expand Up @@ -218,13 +231,22 @@ private[spark] abstract class YarnSchedulerBackend(
case None =>
logWarning("Attempted to check for an executor loss reason" +
" before the AM has registered!")
driverEndpoint.askWithRetry[Boolean](
RemoveExecutor(executorId, SlaveLost("AM is not yet registered.")))
}
}

override def receive: PartialFunction[Any, Unit] = {
case RegisterClusterManager(am) =>
logInfo(s"ApplicationMaster registered as $am")
amEndpoint = Option(am)
if (!shouldResetOnAmRegister) {
shouldResetOnAmRegister = true
} else {
// AM is already registered before, this potentially means that AM is failed and
// re-registered after the failure. This will only be happened in yarn-client mode.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "this can only happen".

I'll fix during merge.

reset()
}

case AddWebUIFilter(filterName, filterParams, proxyBase) =>
addWebUIFilter(filterName, filterParams, proxyBase)
Expand Down Expand Up @@ -270,6 +292,7 @@ private[spark] abstract class YarnSchedulerBackend(
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
if (amEndpoint.exists(_.address == remoteAddress)) {
logWarning(s"ApplicationMaster has disassociated: $remoteAddress")
amEndpoint = None
}
}

Expand Down