Skip to content

Commit 38a413e

Browse files
committed
Connect decommissioning to dynamic scaling
Because the mock always says there is an RDD we may replicate more than once, and now that there are independent threads Make Spark's dynamic allocation use decommissioning Track the decommissioning executors in the core dynamic scheduler so we don't scale down too low, update the streaming ExecutorAllocationManager to also delegate to decommission Fix up executor add for resource profile Fix our exiting and cleanup thread for better debugging next time. Cleanup the locks we use in decommissioning and clarify some more bits. Verify executors decommissioned, then killed by external external cluster manager are re-launched Verify some additional calls are not occuring in the executor allocation manager suite. Dont' close the watcher until the end of the test Use decommissionExecutors and set adjustTargetNumExecutors to false so that we can match the pattern for killExecutor/killExecutors bump numparts up to 6 Revert "bump numparts up to 6" This reverts commit daf96dd. Small coment & visibility cleanup
1 parent 375d348 commit 38a413e

File tree

16 files changed

+349
-57
lines changed

16 files changed

+349
-57
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark
1919

20+
import org.apache.spark.scheduler.ExecutorDecommissionInfo
2021
/**
2122
* A client that communicates with the cluster manager to request or kill executors.
2223
* This is currently supported only in YARN mode.
@@ -81,6 +82,43 @@ private[spark] trait ExecutorAllocationClient {
8182
countFailures: Boolean,
8283
force: Boolean = false): Seq[String]
8384

85+
/**
86+
* Request that the cluster manager decommission the specified executors.
87+
* Default implementation delegates to kill, scheduler must override
88+
* if it supports graceful decommissioning.
89+
*
90+
* @param executors identifiers of executors & decom info.
91+
* @param adjustTargetNumExecutors whether the target number of executors will be adjusted down
92+
* after these executors have been decommissioned.
93+
* @return the ids of the executors acknowledged by the cluster manager to be removed.
94+
*/
95+
def decommissionExecutors(
96+
executors: Seq[(String, ExecutorDecommissionInfo)],
97+
adjustTargetNumExecutors: Boolean): Seq[String] = {
98+
killExecutors(executors.map(_._1),
99+
adjustTargetNumExecutors,
100+
countFailures = false)
101+
}
102+
103+
104+
/**
105+
* Request that the cluster manager decommission the specified executor.
106+
* Default implementation delegates to decommissionExecutors, scheduler can override
107+
* if it supports graceful decommissioning.
108+
*
109+
* @param executorId identifiers of executor to decommission
110+
* @param decommissionInfo information about the decommission (reason, host loss)
111+
* @return whether the request is acknowledged by the cluster manager.
112+
*/
113+
def decommissionExecutor(executorId: String,
114+
decommissionInfo: ExecutorDecommissionInfo): Boolean = {
115+
val decommissionedExecutors = decommissionExecutors(
116+
Seq((executorId, decommissionInfo)),
117+
adjustTargetNumExecutors = true)
118+
decommissionedExecutors.nonEmpty && decommissionedExecutors(0).equals(executorId)
119+
}
120+
121+
84122
/**
85123
* Request that the cluster manager kill every executor on the specified host.
86124
*

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
2828
import org.apache.spark.internal.{config, Logging}
2929
import org.apache.spark.internal.config._
3030
import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
31+
import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED
3132
import org.apache.spark.metrics.source.Source
3233
import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID
3334
import org.apache.spark.resource.ResourceProfileManager
@@ -204,7 +205,12 @@ private[spark] class ExecutorAllocationManager(
204205
s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 0!")
205206
}
206207
if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
207-
if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) {
208+
// If dynamic allocation shuffle tracking or worker decommissioning along with
209+
// storage shuffle decommissioning is enabled we have *experimental* support for
210+
// decommissioning without a shuffle service.
211+
if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED) ||
212+
(conf.get(WORKER_DECOMMISSION_ENABLED) &&
213+
conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) {
208214
logWarning("Dynamic allocation without a shuffle service is an experimental feature.")
209215
} else if (!testing) {
210216
throw new SparkException("Dynamic allocation of executors requires the external " +
@@ -539,7 +545,9 @@ private[spark] class ExecutorAllocationManager(
539545
// get the running total as we remove or initialize it to the count - pendingRemoval
540546
val newExecutorTotal = numExecutorsTotalPerRpId.getOrElseUpdate(rpId,
541547
(executorMonitor.executorCountWithResourceProfile(rpId) -
542-
executorMonitor.pendingRemovalCountPerResourceProfileId(rpId)))
548+
executorMonitor.pendingRemovalCountPerResourceProfileId(rpId) -
549+
executorMonitor.pendingDecommissioningPerResourceProfileId(rpId)
550+
))
543551
if (newExecutorTotal - 1 < minNumExecutors) {
544552
logDebug(s"Not removing idle executor $executorIdToBeRemoved because there " +
545553
s"are only $newExecutorTotal executor(s) left (minimum number of executor limit " +
@@ -565,8 +573,14 @@ private[spark] class ExecutorAllocationManager(
565573
} else {
566574
// We don't want to change our target number of executors, because we already did that
567575
// when the task backlog decreased.
568-
client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false,
569-
countFailures = false, force = false)
576+
if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
577+
val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map(
578+
id => (id, ExecutorDecommissionInfo("spark scale down", false)))
579+
client.decommissionExecutors(executorIdsWithoutHostLoss, adjustTargetNumExecutors = false)
580+
} else {
581+
client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false,
582+
countFailures = false, force = false)
583+
}
570584
}
571585

572586
// [SPARK-21834] killExecutors api reduces the target number of executors.
@@ -578,7 +592,11 @@ private[spark] class ExecutorAllocationManager(
578592

579593
// reset the newExecutorTotal to the existing number of executors
580594
if (testing || executorsRemoved.nonEmpty) {
581-
executorMonitor.executorsKilled(executorsRemoved.toSeq)
595+
if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
596+
executorMonitor.executorsDecommissioned(executorsRemoved)
597+
} else {
598+
executorMonitor.executorsKilled(executorsRemoved.toSeq)
599+
}
582600
logInfo(s"Executors ${executorsRemoved.mkString(",")} removed due to idle timeout.")
583601
executorsRemoved.toSeq
584602
} else {

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 98 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
193193

194194
case DecommissionExecutor(executorId, decommissionInfo) =>
195195
logError(s"Received decommission executor message ${executorId}: $decommissionInfo")
196-
decommissionExecutor(executorId, decommissionInfo)
196+
decommissionExecutors(Seq((executorId, decommissionInfo)),
197+
adjustTargetNumExecutors = false)
197198

198199
case RemoveWorker(workerId, host, message) =>
199200
removeWorker(workerId, host, message)
@@ -274,7 +275,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
274275

275276
case DecommissionExecutor(executorId, decommissionInfo) =>
276277
logError(s"Received decommission executor message ${executorId}: ${decommissionInfo}.")
277-
decommissionExecutor(executorId, decommissionInfo)
278+
decommissionExecutors(Seq((executorId, decommissionInfo)),
279+
adjustTargetNumExecutors = false)
278280
context.reply(true)
279281

280282
case RetrieveSparkAppConfig(resourceProfileId) =>
@@ -503,6 +505,100 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
503505

504506
protected def minRegisteredRatio: Double = _minRegisteredRatio
505507

508+
/**
509+
* Request that the cluster manager decommission the specified executors.
510+
*
511+
* @param executors Identifiers of executors & decommission info.
512+
* @param adjustTargetNumExecutors whether the target number of executors will be adjusted down
513+
* after these executors have been decommissioned.
514+
* @return the ids of the executors acknowledged by the cluster manager to be removed.
515+
*/
516+
override def decommissionExecutors(
517+
executors: Seq[(String, ExecutorDecommissionInfo)],
518+
adjustTargetNumExecutors: Boolean): Seq[String] = {
519+
520+
val executorsToDecommission = executors.filter{case (executorId, _) =>
521+
CoarseGrainedSchedulerBackend.this.synchronized {
522+
// Only bother decommissioning executors which are alive.
523+
if (isExecutorActive(executorId)) {
524+
executorsPendingDecommission += executorId
525+
true
526+
} else {
527+
false
528+
}
529+
}
530+
}
531+
532+
// If we don't want to replace the executors we are decommissioning
533+
if (adjustTargetNumExecutors) {
534+
executorsToDecommission.foreach { case (exec, _) =>
535+
val rpId = executorDataMap(exec).resourceProfileId
536+
val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
537+
if (requestedTotalExecutorsPerResourceProfile.isEmpty) {
538+
// Assume that we are killing an executor that was started by default and
539+
// not through the request api
540+
requestedTotalExecutorsPerResourceProfile(rp) = 0
541+
} else {
542+
val requestedTotalForRp = requestedTotalExecutorsPerResourceProfile(rp)
543+
requestedTotalExecutorsPerResourceProfile(rp) = math.max(requestedTotalForRp - 1, 0)
544+
}
545+
}
546+
doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
547+
}
548+
549+
val decommissioned = executorsToDecommission.filter{case (executorId, decomInfo) =>
550+
doDecommission(executorId, decomInfo)
551+
}.map(_._1)
552+
decommissioned
553+
}
554+
555+
556+
private def doDecommission(executorId: String,
557+
decomInfo: ExecutorDecommissionInfo): Boolean = {
558+
559+
logInfo(s"Starting decommissioning executor $executorId.")
560+
try {
561+
scheduler.executorDecommission(executorId, decomInfo)
562+
if (driverEndpoint != null) {
563+
logInfo("Propagating executor decommission to driver.")
564+
driverEndpoint.send(DecommissionExecutor(executorId, decomInfo))
565+
}
566+
} catch {
567+
case e: Exception =>
568+
logError(s"Unexpected error during decommissioning ${e.toString}", e)
569+
return false
570+
}
571+
// Send decommission message to the executor (it could have originated on the executor
572+
// but not necessarily.
573+
CoarseGrainedSchedulerBackend.this.synchronized {
574+
executorDataMap.get(executorId) match {
575+
case Some(executorInfo) =>
576+
executorInfo.executorEndpoint.send(DecommissionSelf)
577+
case None =>
578+
// Ignoring the executor since it is not registered.
579+
logWarning(s"Attempted to decommission unknown executor $executorId.")
580+
return false
581+
}
582+
}
583+
logInfo(s"Finished decommissioning executor $executorId.")
584+
585+
if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {
586+
try {
587+
logInfo("Starting decommissioning block manager corresponding to " +
588+
s"executor $executorId.")
589+
scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId))
590+
} catch {
591+
case e: Exception =>
592+
logError("Unexpected error during block manager " +
593+
s"decommissioning for executor $executorId: ${e.toString}", e)
594+
return false
595+
}
596+
logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.")
597+
}
598+
true
599+
}
600+
601+
506602
override def start(): Unit = {
507603
if (UserGroupInformation.isSecurityEnabled()) {
508604
delegationTokenManager = createTokenManager()
@@ -598,17 +694,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
598694
driverEndpoint.send(RemoveWorker(workerId, host, message))
599695
}
600696

601-
/**
602-
* Called by subclasses when notified of a decommissioning executor.
603-
*/
604-
private[spark] def decommissionExecutor(
605-
executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = {
606-
if (driverEndpoint != null) {
607-
logInfo("Propagating executor decommission to driver.")
608-
driverEndpoint.send(DecommissionExecutor(executorId, decommissionInfo))
609-
}
610-
}
611-
612697
def sufficientResourcesRegistered(): Boolean = true
613698

614699
override def isReady(): Boolean = {

core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,8 @@ private[spark] class StandaloneSchedulerBackend(
176176

177177
override def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo) {
178178
logInfo("Asked to decommission executor")
179-
decommissionExecutor(fullId.split("/")(1), decommissionInfo)
179+
val execId = fullId.split("/")(1)
180+
decommissionExecutors(Seq((execId, decommissionInfo)), adjustTargetNumExecutors = false)
180181
logInfo("Executor %s decommissioned: %s".format(fullId, decommissionInfo))
181182
}
182183

core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala

Lines changed: 54 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging
2828
import org.apache.spark.internal.config._
2929
import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID
3030
import org.apache.spark.scheduler._
31-
import org.apache.spark.storage.RDDBlockId
31+
import org.apache.spark.storage.{RDDBlockId, ShuffleDataBlockId}
3232
import org.apache.spark.util.Clock
3333

3434
/**
@@ -114,7 +114,8 @@ private[spark] class ExecutorMonitor(
114114

115115
var newNextTimeout = Long.MaxValue
116116
timedOutExecs = executors.asScala
117-
.filter { case (_, exec) => !exec.pendingRemoval && !exec.hasActiveShuffle }
117+
.filter { case (_, exec) =>
118+
!exec.pendingRemoval && !exec.hasActiveShuffle && !exec.pendingDecommissioning}
118119
.filter { case (_, exec) =>
119120
val deadline = exec.timeoutAt
120121
if (deadline > now) {
@@ -135,6 +136,7 @@ private[spark] class ExecutorMonitor(
135136

136137
/**
137138
* Mark the given executors as pending to be removed. Should only be called in the EAM thread.
139+
* This covers both kills and decommissions.
138140
*/
139141
def executorsKilled(ids: Seq[String]): Unit = {
140142
ids.foreach { id =>
@@ -149,6 +151,19 @@ private[spark] class ExecutorMonitor(
149151
nextTimeout.set(Long.MinValue)
150152
}
151153

154+
private[spark] def executorsDecommissioned(ids: Seq[String]): Unit = {
155+
ids.foreach { id =>
156+
val tracker = executors.get(id)
157+
if (tracker != null) {
158+
tracker.pendingDecommissioning = true
159+
}
160+
}
161+
162+
// Recompute timed out executors in the next EAM callback, since this call invalidates
163+
// the current list.
164+
nextTimeout.set(Long.MinValue)
165+
}
166+
152167
def executorCount: Int = executors.size()
153168

154169
def executorCountWithResourceProfile(id: Int): Int = {
@@ -171,6 +186,16 @@ private[spark] class ExecutorMonitor(
171186
executors.asScala.filter { case (k, v) => v.resourceProfileId == id && v.pendingRemoval }.size
172187
}
173188

189+
def pendingDecommissioningCount: Int = executors.asScala.count { case (_, exec) =>
190+
exec.pendingDecommissioning
191+
}
192+
193+
def pendingDecommissioningPerResourceProfileId(id: Int): Int = {
194+
executors.asScala.filter { case (k, v) =>
195+
v.resourceProfileId == id && v.pendingDecommissioning
196+
}.size
197+
}
198+
174199
override def onJobStart(event: SparkListenerJobStart): Unit = {
175200
if (!shuffleTrackingEnabled) {
176201
return
@@ -298,6 +323,7 @@ private[spark] class ExecutorMonitor(
298323
//
299324
// This means that an executor may be marked as having shuffle data, and thus prevented
300325
// from being removed, even though the data may not be used.
326+
// TODO: Only track used files (SPARK-31974)
301327
if (shuffleTrackingEnabled && event.reason == Success) {
302328
stageToShuffleID.get(event.stageId).foreach { shuffleId =>
303329
exec.addShuffle(shuffleId)
@@ -326,18 +352,33 @@ private[spark] class ExecutorMonitor(
326352
val removed = executors.remove(event.executorId)
327353
if (removed != null) {
328354
decrementExecResourceProfileCount(removed.resourceProfileId)
329-
if (!removed.pendingRemoval) {
355+
if (!removed.pendingRemoval || !removed.pendingDecommissioning) {
330356
nextTimeout.set(Long.MinValue)
331357
}
332358
}
333359
}
334360

335361
override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
336-
if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) {
337-
return
338-
}
339362
val exec = ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId,
340363
UNKNOWN_RESOURCE_PROFILE_ID)
364+
365+
// Check if it is a shuffle file, or RDD to pick the correct codepath for update
366+
if (event.blockUpdatedInfo.blockId.isInstanceOf[ShuffleDataBlockId] && shuffleTrackingEnabled) {
367+
/**
368+
* The executor monitor keeps track of locations of cache and shuffle blocks and this can be
369+
* used to decide which executor(s) Spark should shutdown first. Since we move shuffle blocks
370+
* around now this wires it up so that it keeps track of it. We only do this for data blocks
371+
* as index and other blocks blocks do not necessarily mean the entire block has been
372+
* committed.
373+
*/
374+
event.blockUpdatedInfo.blockId match {
375+
case ShuffleDataBlockId(shuffleId, _, _) => exec.addShuffle(shuffleId)
376+
case _ => // For now we only update on data blocks
377+
}
378+
return
379+
} else if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) {
380+
return
381+
}
341382
val storageLevel = event.blockUpdatedInfo.storageLevel
342383
val blockId = event.blockUpdatedInfo.blockId.asInstanceOf[RDDBlockId]
343384

@@ -410,10 +451,15 @@ private[spark] class ExecutorMonitor(
410451
}
411452

412453
// Visible for testing
413-
def executorsPendingToRemove(): Set[String] = {
454+
private[spark] def executorsPendingToRemove(): Set[String] = {
414455
executors.asScala.filter { case (_, exec) => exec.pendingRemoval }.keys.toSet
415456
}
416457

458+
// Visible for testing
459+
private[spark] def executorsDecommissioning(): Set[String] = {
460+
executors.asScala.filter { case (_, exec) => exec.pendingDecommissioning }.keys.toSet
461+
}
462+
417463
/**
418464
* This method should be used when updating executor state. It guards against a race condition in
419465
* which the `SparkListenerTaskStart` event is posted before the `SparkListenerBlockManagerAdded`
@@ -466,6 +512,7 @@ private[spark] class ExecutorMonitor(
466512
@volatile var timedOut: Boolean = false
467513

468514
var pendingRemoval: Boolean = false
515+
var pendingDecommissioning: Boolean = false
469516
var hasActiveShuffle: Boolean = false
470517

471518
private var idleStart: Long = -1

0 commit comments

Comments
 (0)