@@ -27,13 +27,20 @@ import org.apache.spark.util.{ThreadUtils, Clock, SystemClock, Utils}
2727/**
2828 * An agent that dynamically allocates and removes executors based on the workload.
2929 *
30- * The add policy depends on whether there are backlogged tasks waiting to be scheduled. If
31- * the scheduler queue is not drained in N seconds, then new executors are added. If the queue
32- * persists for another M seconds, then more executors are added and so on. The number added
33- * in each round increases exponentially from the previous round until an upper bound on the
34- * number of executors has been reached. The upper bound is based both on a configured property
35- * and on the number of tasks pending: the policy will never increase the number of executor
36- * requests past the number needed to handle all pending tasks.
30+ * The ExecutorAllocationManager maintains a moving target number of executors which is periodically
31+ * synced to the cluster manager. The target starts at a configured initial value and changes with
32+ * the number of pending and running tasks.
33+ *
34+ * Decreasing the target number of executors happens when the current target is more than needed to
35+ * handle the current load. The target number of executors is always truncated to the number of
36+ * executors that could run all current running and pending tasks at once.
37+ *
38+ * Increasing the target number of executors happens in response to backlogged tasks waiting to be
39+ * scheduled. If the scheduler queue is not drained in N seconds, then new executors are added. If
40+ * the queue persists for another M seconds, then more executors are added and so on. The number
41+ * added in each round increases exponentially from the previous round until an upper bound has been
42+ * reached. The upper bound is based both on a configured property and on the current number of
43+ * running and pending tasks, as described above.
3744 *
3845 * The rationale for the exponential increase is twofold: (1) Executors should be added slowly
3946 * in the beginning in case the number of extra executors needed turns out to be small. Otherwise,
@@ -105,8 +112,10 @@ private[spark] class ExecutorAllocationManager(
105112 // Number of executors to add in the next round
106113 private var numExecutorsToAdd = 1
107114
108- // Number of executors that have been requested but have not registered yet
109- private var numExecutorsPending = 0
115+ // The desired number of executors at this moment in time. If all our executors were to die, this
116+ // is the number of executors we would immediately want from the cluster manager.
117+ private var numExecutorsTarget =
118+ conf.getInt(" spark.dynamicAllocation.initialExecutors" , minNumExecutors)
110119
111120 // Executors that have been requested to be removed but have not been killed yet
112121 private val executorsPendingToRemove = new mutable.HashSet [String ]
@@ -199,13 +208,6 @@ private[spark] class ExecutorAllocationManager(
199208 executor.awaitTermination(10 , TimeUnit .SECONDS )
200209 }
201210
202- /**
203- * The number of executors we would have if the cluster manager were to fulfill all our existing
204- * requests.
205- */
206- private def targetNumExecutors (): Int =
207- numExecutorsPending + executorIds.size - executorsPendingToRemove.size
208-
209211 /**
210212 * The maximum number of executors we would need under the current load to satisfy all running
211213 * and pending tasks, rounded up.
@@ -227,7 +229,7 @@ private[spark] class ExecutorAllocationManager(
227229 private def schedule (): Unit = synchronized {
228230 val now = clock.getTimeMillis
229231
230- addOrCancelExecutorRequests (now)
232+ updateAndSyncNumExecutorsTarget (now)
231233
232234 removeTimes.retain { case (executorId, expireTime) =>
233235 val expired = now >= expireTime
@@ -239,26 +241,28 @@ private[spark] class ExecutorAllocationManager(
239241 }
240242
241243 /**
244+ * Updates our target number of executors and syncs the result with the cluster manager.
245+ *
242246 * Check to see whether our existing allocation and the requests we've made previously exceed our
243- * current needs. If so, let the cluster manager know so that it can cancel pending requests that
244- * are unneeded.
247+ * current needs. If so, truncate our target and let the cluster manager know so that it can
248+ * cancel pending requests that are unneeded.
245249 *
246250 * If not, and the add time has expired, see if we can request new executors and refresh the add
247251 * time.
248252 *
249253 * @return the delta in the target number of executors.
250254 */
251- private def addOrCancelExecutorRequests (now : Long ): Int = synchronized {
252- val currentTarget = targetNumExecutors
255+ private def updateAndSyncNumExecutorsTarget (now : Long ): Int = synchronized {
253256 val maxNeeded = maxNumExecutorsNeeded
254257
255- if (maxNeeded < currentTarget ) {
258+ if (maxNeeded < numExecutorsTarget ) {
256259 // The target number exceeds the number we actually need, so stop adding new
257- // executors and inform the cluster manager to cancel the extra pending requests.
258- val newTotalExecutors = math.max(maxNeeded, minNumExecutors)
259- client.requestTotalExecutors(newTotalExecutors)
260+ // executors and inform the cluster manager to cancel the extra pending requests
261+ val oldNumExecutorsTarget = numExecutorsTarget
262+ numExecutorsTarget = math.max(maxNeeded, minNumExecutors)
263+ client.requestTotalExecutors(numExecutorsTarget)
260264 numExecutorsToAdd = 1
261- updateNumExecutorsPending(newTotalExecutors)
265+ numExecutorsTarget - oldNumExecutorsTarget
262266 } else if (addTime != NOT_SET && now >= addTime) {
263267 val delta = addExecutors(maxNeeded)
264268 logDebug(s " Starting timer to add more executors (to " +
@@ -281,21 +285,30 @@ private[spark] class ExecutorAllocationManager(
281285 */
282286 private def addExecutors (maxNumExecutorsNeeded : Int ): Int = {
283287 // Do not request more executors if it would put our target over the upper bound
284- val currentTarget = targetNumExecutors
285- if (currentTarget >= maxNumExecutors) {
286- logDebug(s " Not adding executors because there are already ${executorIds.size} " +
287- s " registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors) " )
288+ if (numExecutorsTarget >= maxNumExecutors) {
289+ val numExecutorsPending = numExecutorsTarget - executorIds.size
290+ logDebug(s " Not adding executors because there are already ${executorIds.size} registered " +
291+ s " and ${ numExecutorsPending} pending executor(s) (limit $maxNumExecutors) " )
288292 numExecutorsToAdd = 1
289293 return 0
290294 }
291295
292- val actualMaxNumExecutors = math.min(maxNumExecutors, maxNumExecutorsNeeded)
293- val newTotalExecutors = math.min(currentTarget + numExecutorsToAdd, actualMaxNumExecutors)
294- val addRequestAcknowledged = testing || client.requestTotalExecutors(newTotalExecutors)
296+ val oldNumExecutorsTarget = numExecutorsTarget
297+ // There's no point in wasting time ramping up to the number of executors we already have, so
298+ // make sure our target is at least as much as our current allocation:
299+ numExecutorsTarget = math.max(numExecutorsTarget, executorIds.size)
300+ // Boost our target with the number to add for this round:
301+ numExecutorsTarget += numExecutorsToAdd
302+ // Ensure that our target doesn't exceed what we need at the present moment:
303+ numExecutorsTarget = math.min(numExecutorsTarget, maxNumExecutorsNeeded)
304+ // Ensure that our target fits within configured bounds:
305+ numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors)
306+
307+ val addRequestAcknowledged = testing || client.requestTotalExecutors(numExecutorsTarget)
295308 if (addRequestAcknowledged) {
296- val delta = updateNumExecutorsPending(newTotalExecutors)
309+ val delta = numExecutorsTarget - oldNumExecutorsTarget
297310 logInfo(s " Requesting $delta new executor(s) because tasks are backlogged " +
298- s " (new desired total will be $newTotalExecutors ) " )
311+ s " (new desired total will be $numExecutorsTarget ) " )
299312 numExecutorsToAdd = if (delta == numExecutorsToAdd) {
300313 numExecutorsToAdd * 2
301314 } else {
@@ -304,23 +317,11 @@ private[spark] class ExecutorAllocationManager(
304317 delta
305318 } else {
306319 logWarning(
307- s " Unable to reach the cluster manager to request $newTotalExecutors total executors! " )
320+ s " Unable to reach the cluster manager to request $numExecutorsTarget total executors! " )
308321 0
309322 }
310323 }
311324
312- /**
313- * Given the new target number of executors, update the number of pending executor requests,
314- * and return the delta from the old number of pending requests.
315- */
316- private def updateNumExecutorsPending (newTotalExecutors : Int ): Int = {
317- val newNumExecutorsPending =
318- newTotalExecutors - executorIds.size + executorsPendingToRemove.size
319- val delta = newNumExecutorsPending - numExecutorsPending
320- numExecutorsPending = newNumExecutorsPending
321- delta
322- }
323-
324325 /**
325326 * Request the cluster manager to remove the given executor.
326327 * Return whether the request is received.
@@ -372,10 +373,6 @@ private[spark] class ExecutorAllocationManager(
372373 // as idle again so as not to forget that it is a candidate for removal. (see SPARK-4951)
373374 executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle)
374375 logInfo(s " New executor $executorId has registered (new total is ${executorIds.size}) " )
375- if (numExecutorsPending > 0 ) {
376- numExecutorsPending -= 1
377- logDebug(s " Decremented number of pending executors ( $numExecutorsPending left) " )
378- }
379376 } else {
380377 logWarning(s " Duplicate executor $executorId has registered " )
381378 }
0 commit comments