diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index 00bd0063c9e3..079340a358ac 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -17,6 +17,8 @@ package org.apache.spark +import org.apache.spark.scheduler.ExecutorDecommissionInfo + /** * A client that communicates with the cluster manager to request or kill executors. * This is currently supported only in YARN mode. @@ -81,6 +83,44 @@ private[spark] trait ExecutorAllocationClient { countFailures: Boolean, force: Boolean = false): Seq[String] + /** + * Request that the cluster manager decommission the specified executors. + * Default implementation delegates to kill, scheduler must override + * if it supports graceful decommissioning. + * + * @param executorsAndDecominfo identifiers of executors & decom info. + * @param adjustTargetNumExecutors whether the target number of executors will be adjusted down + * after these executors have been decommissioned. + * @return the ids of the executors acknowledged by the cluster manager to be removed. + */ + def decommissionExecutors( + executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)], + adjustTargetNumExecutors: Boolean): Seq[String] = { + killExecutors(executorsAndDecomInfo.map(_._1), + adjustTargetNumExecutors, + countFailures = false) + } + + + /** + * Request that the cluster manager decommission the specified executor. + * Delegates to decommissionExecutors. + * + * @param executorId identifiers of executor to decommission + * @param decommissionInfo information about the decommission (reason, host loss) + * @param adjustTargetNumExecutors if we should adjust the target number of executors. + * @return whether the request is acknowledged by the cluster manager. + */ + final def decommissionExecutor(executorId: String, + decommissionInfo: ExecutorDecommissionInfo, + adjustTargetNumExecutors: Boolean): Boolean = { + val decommissionedExecutors = decommissionExecutors( + Array((executorId, decommissionInfo)), + adjustTargetNumExecutors = adjustTargetNumExecutors) + decommissionedExecutors.nonEmpty && decommissionedExecutors(0).equals(executorId) + } + + /** * Request that the cluster manager kill every executor on the specified host. * diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 1570f869c5bd..1cb840f2fe80 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -28,6 +28,7 @@ import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL +import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED import org.apache.spark.metrics.source.Source import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID import org.apache.spark.resource.ResourceProfileManager @@ -127,6 +128,8 @@ private[spark] class ExecutorAllocationManager( private val executorAllocationRatio = conf.get(DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO) + private val decommissionEnabled = conf.get(WORKER_DECOMMISSION_ENABLED) + private val defaultProfileId = resourceProfileManager.defaultResourceProfile.id validateSettings() @@ -204,7 +207,12 @@ private[spark] class ExecutorAllocationManager( s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 0!") } if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) { - if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) { + // If dynamic allocation shuffle tracking or worker decommissioning along with + // storage shuffle decommissioning is enabled we have *experimental* support for + // decommissioning without a shuffle service. + if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED) || + (decommissionEnabled && + conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) { logWarning("Dynamic allocation without a shuffle service is an experimental feature.") } else if (!testing) { throw new SparkException("Dynamic allocation of executors requires the external " + @@ -539,7 +547,9 @@ private[spark] class ExecutorAllocationManager( // get the running total as we remove or initialize it to the count - pendingRemoval val newExecutorTotal = numExecutorsTotalPerRpId.getOrElseUpdate(rpId, (executorMonitor.executorCountWithResourceProfile(rpId) - - executorMonitor.pendingRemovalCountPerResourceProfileId(rpId))) + executorMonitor.pendingRemovalCountPerResourceProfileId(rpId) - + executorMonitor.decommissioningPerResourceProfileId(rpId) + )) if (newExecutorTotal - 1 < minNumExecutors) { logDebug(s"Not removing idle executor $executorIdToBeRemoved because there " + s"are only $newExecutorTotal executor(s) left (minimum number of executor limit " + @@ -565,8 +575,14 @@ private[spark] class ExecutorAllocationManager( } else { // We don't want to change our target number of executors, because we already did that // when the task backlog decreased. - client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false, - countFailures = false, force = false) + if (decommissionEnabled) { + val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map( + id => (id, ExecutorDecommissionInfo("spark scale down", false))).toArray + client.decommissionExecutors(executorIdsWithoutHostLoss, adjustTargetNumExecutors = false) + } else { + client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false, + countFailures = false, force = false) + } } // [SPARK-21834] killExecutors api reduces the target number of executors. @@ -578,7 +594,11 @@ private[spark] class ExecutorAllocationManager( // reset the newExecutorTotal to the existing number of executors if (testing || executorsRemoved.nonEmpty) { - executorMonitor.executorsKilled(executorsRemoved.toSeq) + if (decommissionEnabled) { + executorMonitor.executorsDecommissioned(executorsRemoved) + } else { + executorMonitor.executorsKilled(executorsRemoved.toSeq) + } logInfo(s"Executors ${executorsRemoved.mkString(",")} removed due to idle timeout.") executorsRemoved.toSeq } else { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 200f2d87a8a7..ca657313c14f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -193,7 +193,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case DecommissionExecutor(executorId, decommissionInfo) => logError(s"Received decommission executor message ${executorId}: $decommissionInfo") - decommissionExecutor(executorId, decommissionInfo) + decommissionExecutor(executorId, decommissionInfo, adjustTargetNumExecutors = false) case RemoveWorker(workerId, host, message) => removeWorker(workerId, host, message) @@ -274,8 +274,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case DecommissionExecutor(executorId, decommissionInfo) => logError(s"Received decommission executor message ${executorId}: ${decommissionInfo}.") - decommissionExecutor(executorId, decommissionInfo) - context.reply(true) + context.reply(decommissionExecutor(executorId, decommissionInfo, + adjustTargetNumExecutors = false)) case RetrieveSparkAppConfig(resourceProfileId) => val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(resourceProfileId) @@ -419,59 +419,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp scheduler.workerRemoved(workerId, host, message) } - /** - * Mark a given executor as decommissioned and stop making resource offers for it. - */ - private def decommissionExecutor( - executorId: String, decommissionInfo: ExecutorDecommissionInfo): Boolean = { - val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized { - // Only bother decommissioning executors which are alive. - if (isExecutorActive(executorId)) { - executorsPendingDecommission += executorId - true - } else { - false - } - } - - if (shouldDisable) { - logInfo(s"Starting decommissioning executor $executorId.") - try { - scheduler.executorDecommission(executorId, decommissionInfo) - } catch { - case e: Exception => - logError(s"Unexpected error during decommissioning ${e.toString}", e) - } - // Send decommission message to the executor, this may be a duplicate since the executor - // could have been the one to notify us. But it's also possible the notification came from - // elsewhere and the executor does not yet know. - executorDataMap.get(executorId) match { - case Some(executorInfo) => - executorInfo.executorEndpoint.send(DecommissionSelf) - case None => - // Ignoring the executor since it is not registered. - logWarning(s"Attempted to decommission unknown executor $executorId.") - } - logInfo(s"Finished decommissioning executor $executorId.") - - if (conf.get(STORAGE_DECOMMISSION_ENABLED)) { - try { - logInfo("Starting decommissioning block manager corresponding to " + - s"executor $executorId.") - scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId)) - } catch { - case e: Exception => - logError("Unexpected error during block manager " + - s"decommissioning for executor $executorId: ${e.toString}", e) - } - logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.") - } - } else { - logInfo(s"Skipping decommissioning of executor $executorId.") - } - shouldDisable - } - /** * Stop making resource offers for the given executor. The executor is marked as lost with * the loss reason still pending. @@ -503,6 +450,87 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp protected def minRegisteredRatio: Double = _minRegisteredRatio + /** + * Request that the cluster manager decommission the specified executors. + * + * @param executorsAndDecomInfo Identifiers of executors & decommission info. + * @param adjustTargetNumExecutors whether the target number of executors will be adjusted down + * after these executors have been decommissioned. + * @return the ids of the executors acknowledged by the cluster manager to be removed. + */ + override def decommissionExecutors( + executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)], + adjustTargetNumExecutors: Boolean): Seq[String] = { + + val executorsToDecommission = executorsAndDecomInfo.filter { case (executorId, _) => + CoarseGrainedSchedulerBackend.this.synchronized { + // Only bother decommissioning executors which are alive. + if (isExecutorActive(executorId)) { + executorsPendingDecommission += executorId + true + } else { + false + } + } + } + + // If we don't want to replace the executors we are decommissioning + if (adjustTargetNumExecutors) { + adjustExecutors(executorsToDecommission.map(_._1)) + } + + executorsToDecommission.filter { case (executorId, decomInfo) => + doDecommission(executorId, decomInfo) + }.map(_._1) + } + + + private def doDecommission(executorId: String, + decomInfo: ExecutorDecommissionInfo): Boolean = { + + logInfo(s"Asking executor $executorId to decommissioning.") + try { + scheduler.executorDecommission(executorId, decomInfo) + if (driverEndpoint != null) { + logInfo("Propagating executor decommission to driver.") + driverEndpoint.send(DecommissionExecutor(executorId, decomInfo)) + } + } catch { + case e: Exception => + logError(s"Unexpected error during decommissioning ${e.toString}", e) + return false + } + // Send decommission message to the executor (it could have originated on the executor + // but not necessarily. + CoarseGrainedSchedulerBackend.this.synchronized { + executorDataMap.get(executorId) match { + case Some(executorInfo) => + executorInfo.executorEndpoint.send(DecommissionSelf) + case None => + // Ignoring the executor since it is not registered. + logWarning(s"Attempted to decommission unknown executor $executorId.") + return false + } + } + logInfo(s"Asked executor $executorId to decommission.") + + if (conf.get(STORAGE_DECOMMISSION_ENABLED)) { + try { + logInfo(s"Asking block manager corresponding to executor $executorId to decommission.") + scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId)) + } catch { + case e: Exception => + logError("Unexpected error during block manager " + + s"decommissioning for executor $executorId: ${e.toString}", e) + return false + } + logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.") + } + + true + } + + override def start(): Unit = { if (UserGroupInformation.isSecurityEnabled()) { delegationTokenManager = createTokenManager() @@ -598,17 +626,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp driverEndpoint.send(RemoveWorker(workerId, host, message)) } - /** - * Called by subclasses when notified of a decommissioning executor. - */ - private[spark] def decommissionExecutor( - executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = { - if (driverEndpoint != null) { - logInfo("Propagating executor decommission to driver.") - driverEndpoint.send(DecommissionExecutor(executorId, decommissionInfo)) - } - } - def sufficientResourcesRegistered(): Boolean = true override def isReady(): Boolean = { @@ -760,6 +777,31 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Future[Boolean] = Future.successful(false) + /** + * Adjust the number of executors being requested to no longer include the provided executors. + */ + private def adjustExecutors(executorIds: Seq[String]) = { + if (executorIds.nonEmpty) { + executorIds.foreach { exec => + withLock { + val rpId = executorDataMap(exec).resourceProfileId + val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId) + if (requestedTotalExecutorsPerResourceProfile.isEmpty) { + // Assume that we are killing an executor that was started by default and + // not through the request api + requestedTotalExecutorsPerResourceProfile(rp) = 0 + } else { + val requestedTotalForRp = requestedTotalExecutorsPerResourceProfile(rp) + requestedTotalExecutorsPerResourceProfile(rp) = math.max(requestedTotalForRp - 1, 0) + } + } + } + doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap) + } else { + Future.successful(true) + } + } + /** * Request that the cluster manager kill the specified executors. * @@ -798,19 +840,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // take into account executors that are pending to be added or removed. val adjustTotalExecutors = if (adjustTargetNumExecutors) { - executorsToKill.foreach { exec => - val rpId = executorDataMap(exec).resourceProfileId - val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId) - if (requestedTotalExecutorsPerResourceProfile.isEmpty) { - // Assume that we are killing an executor that was started by default and - // not through the request api - requestedTotalExecutorsPerResourceProfile(rp) = 0 - } else { - val requestedTotalForRp = requestedTotalExecutorsPerResourceProfile(rp) - requestedTotalExecutorsPerResourceProfile(rp) = math.max(requestedTotalForRp - 1, 0) - } - } - doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap) + adjustExecutors(executorsToKill) } else { Future.successful(true) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index d921af602b25..3acb6f1088e1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -176,7 +176,8 @@ private[spark] class StandaloneSchedulerBackend( override def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo) { logInfo("Asked to decommission executor") - decommissionExecutor(fullId.split("/")(1), decommissionInfo) + val execId = fullId.split("/")(1) + decommissionExecutors(Array((execId, decommissionInfo)), adjustTargetNumExecutors = false) logInfo("Executor %s decommissioned: %s".format(fullId, decommissionInfo)) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala index 4d7190726f06..8dbdc84905e3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala @@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID import org.apache.spark.scheduler._ -import org.apache.spark.storage.RDDBlockId +import org.apache.spark.storage.{RDDBlockId, ShuffleDataBlockId} import org.apache.spark.util.Clock /** @@ -114,7 +114,8 @@ private[spark] class ExecutorMonitor( var newNextTimeout = Long.MaxValue timedOutExecs = executors.asScala - .filter { case (_, exec) => !exec.pendingRemoval && !exec.hasActiveShuffle } + .filter { case (_, exec) => + !exec.pendingRemoval && !exec.hasActiveShuffle && !exec.decommissioning} .filter { case (_, exec) => val deadline = exec.timeoutAt if (deadline > now) { @@ -135,6 +136,7 @@ private[spark] class ExecutorMonitor( /** * Mark the given executors as pending to be removed. Should only be called in the EAM thread. + * This covers both kills and decommissions. */ def executorsKilled(ids: Seq[String]): Unit = { ids.foreach { id => @@ -149,6 +151,19 @@ private[spark] class ExecutorMonitor( nextTimeout.set(Long.MinValue) } + private[spark] def executorsDecommissioned(ids: Seq[String]): Unit = { + ids.foreach { id => + val tracker = executors.get(id) + if (tracker != null) { + tracker.decommissioning = true + } + } + + // Recompute timed out executors in the next EAM callback, since this call invalidates + // the current list. + nextTimeout.set(Long.MinValue) + } + def executorCount: Int = executors.size() def executorCountWithResourceProfile(id: Int): Int = { @@ -171,6 +186,16 @@ private[spark] class ExecutorMonitor( executors.asScala.filter { case (k, v) => v.resourceProfileId == id && v.pendingRemoval }.size } + def decommissioningCount: Int = executors.asScala.count { case (_, exec) => + exec.decommissioning + } + + def decommissioningPerResourceProfileId(id: Int): Int = { + executors.asScala.filter { case (k, v) => + v.resourceProfileId == id && v.decommissioning + }.size + } + override def onJobStart(event: SparkListenerJobStart): Unit = { if (!shuffleTrackingEnabled) { return @@ -298,6 +323,7 @@ private[spark] class ExecutorMonitor( // // This means that an executor may be marked as having shuffle data, and thus prevented // from being removed, even though the data may not be used. + // TODO: Only track used files (SPARK-31974) if (shuffleTrackingEnabled && event.reason == Success) { stageToShuffleID.get(event.stageId).foreach { shuffleId => exec.addShuffle(shuffleId) @@ -326,18 +352,35 @@ private[spark] class ExecutorMonitor( val removed = executors.remove(event.executorId) if (removed != null) { decrementExecResourceProfileCount(removed.resourceProfileId) - if (!removed.pendingRemoval) { + if (!removed.pendingRemoval || !removed.decommissioning) { nextTimeout.set(Long.MinValue) } } } override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = { + val exec = ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId, + UNKNOWN_RESOURCE_PROFILE_ID) + + // Check if it is a shuffle file, or RDD to pick the correct codepath for update if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) { + if (event.blockUpdatedInfo.blockId.isInstanceOf[ShuffleDataBlockId] && + shuffleTrackingEnabled) { + /** + * The executor monitor keeps track of locations of cache and shuffle blocks and this can + * be used to decide which executor(s) Spark should shutdown first. Since we move shuffle + * blocks around now this wires it up so that it keeps track of it. We only do this for + * data blocks as index and other blocks blocks do not necessarily mean the entire block + * has been committed. + */ + event.blockUpdatedInfo.blockId match { + case ShuffleDataBlockId(shuffleId, _, _) => exec.addShuffle(shuffleId) + case _ => // For now we only update on data blocks + } + } return } - val exec = ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId, - UNKNOWN_RESOURCE_PROFILE_ID) + val storageLevel = event.blockUpdatedInfo.storageLevel val blockId = event.blockUpdatedInfo.blockId.asInstanceOf[RDDBlockId] @@ -410,10 +453,15 @@ private[spark] class ExecutorMonitor( } // Visible for testing - def executorsPendingToRemove(): Set[String] = { + private[spark] def executorsPendingToRemove(): Set[String] = { executors.asScala.filter { case (_, exec) => exec.pendingRemoval }.keys.toSet } + // Visible for testing + private[spark] def executorsDecommissioning(): Set[String] = { + executors.asScala.filter { case (_, exec) => exec.decommissioning }.keys.toSet + } + /** * This method should be used when updating executor state. It guards against a race condition in * which the `SparkListenerTaskStart` event is posted before the `SparkListenerBlockManagerAdded` @@ -466,6 +514,7 @@ private[spark] class ExecutorMonitor( @volatile var timedOut: Boolean = false var pendingRemoval: Boolean = false + var decommissioning: Boolean = false var hasActiveShuffle: Boolean = false private var idleStart: Long = -1 diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 6ec93df67f7d..ee534f549fd0 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1822,7 +1822,7 @@ private[spark] class BlockManager( } } - /* + /** * Returns the last migration time and a boolean denoting if all the blocks have been migrated. * If there are any tasks running since that time the boolean may be incorrect. */ diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 93492cc6d7db..f544d47b8e13 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -43,9 +43,11 @@ class BlockManagerMaster( logInfo("Removed " + execId + " successfully in removeExecutor") } - /** Decommission block managers corresponding to given set of executors */ + /** Decommission block managers corresponding to given set of executors + * Non-blocking. + */ def decommissionBlockManagers(executorIds: Seq[String]): Unit = { - driverEndpoint.ask[Unit](DecommissionBlockManagers(executorIds)) + driverEndpoint.ask[Boolean](DecommissionBlockManagers(executorIds)) } /** Get Replication Info for all the RDD blocks stored in given blockManagerId */ diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 5b367d2fb01d..3abe051e4708 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -28,6 +28,7 @@ import org.scalatest.PrivateMethodTester import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.config import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL +import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED import org.apache.spark.metrics.MetricsSystem import org.apache.spark.resource._ import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID @@ -1270,6 +1271,68 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { assert(executorsPendingToRemove(manager).size === 6) // limit reached (1 executor remaining) } + test("mock polling loop remove with decommissioning") { + val clock = new ManualClock(2020L) + val manager = createManager(createConf(1, 20, 1, true), clock = clock) + + // Remove idle executors on timeout + onExecutorAddedDefaultProfile(manager, "executor-1") + onExecutorAddedDefaultProfile(manager, "executor-2") + onExecutorAddedDefaultProfile(manager, "executor-3") + assert(executorsDecommissioning(manager).isEmpty) + assert(executorsPendingToRemove(manager).isEmpty) + + // idle threshold not reached yet + clock.advance(executorIdleTimeout * 1000 / 2) + schedule(manager) + assert(manager.executorMonitor.timedOutExecutors().isEmpty) + assert(executorsPendingToRemove(manager).isEmpty) + assert(executorsDecommissioning(manager).isEmpty) + + // idle threshold exceeded + clock.advance(executorIdleTimeout * 1000) + assert(manager.executorMonitor.timedOutExecutors().size === 3) + schedule(manager) + assert(executorsPendingToRemove(manager).isEmpty) // limit reached (1 executor remaining) + assert(executorsDecommissioning(manager).size === 2) // limit reached (1 executor remaining) + + // Mark a subset as busy - only idle executors should be removed + onExecutorAddedDefaultProfile(manager, "executor-4") + onExecutorAddedDefaultProfile(manager, "executor-5") + onExecutorAddedDefaultProfile(manager, "executor-6") + onExecutorAddedDefaultProfile(manager, "executor-7") + assert(manager.executorMonitor.executorCount === 7) + assert(executorsPendingToRemove(manager).isEmpty) // no pending to be removed + assert(executorsDecommissioning(manager).size === 2) // 2 decommissioning + onExecutorBusy(manager, "executor-4") + onExecutorBusy(manager, "executor-5") + onExecutorBusy(manager, "executor-6") // 3 busy and 2 idle (of the 5 active ones) + + // after scheduling, the previously timed out executor should be removed, since + // there are new active ones. + schedule(manager) + assert(executorsDecommissioning(manager).size === 3) + + // advance the clock so that idle executors should time out and move to the pending list + clock.advance(executorIdleTimeout * 1000) + schedule(manager) + assert(executorsPendingToRemove(manager).size === 0) + assert(executorsDecommissioning(manager).size === 4) + assert(!executorsDecommissioning(manager).contains("executor-4")) + assert(!executorsDecommissioning(manager).contains("executor-5")) + assert(!executorsDecommissioning(manager).contains("executor-6")) + + // Busy executors are now idle and should be removed + onExecutorIdle(manager, "executor-4") + onExecutorIdle(manager, "executor-5") + onExecutorIdle(manager, "executor-6") + schedule(manager) + assert(executorsDecommissioning(manager).size === 4) + clock.advance(executorIdleTimeout * 1000) + schedule(manager) + assert(executorsDecommissioning(manager).size === 6) // limit reached (1 executor remaining) + } + test("listeners trigger add executors correctly") { val manager = createManager(createConf(1, 20, 1)) assert(addTime(manager) === NOT_SET) @@ -1588,7 +1651,8 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { private def createConf( minExecutors: Int = 1, maxExecutors: Int = 5, - initialExecutors: Int = 1): SparkConf = { + initialExecutors: Int = 1, + decommissioningEnabled: Boolean = false): SparkConf = { val sparkConf = new SparkConf() .set(config.DYN_ALLOCATION_ENABLED, true) .set(config.DYN_ALLOCATION_MIN_EXECUTORS, minExecutors) @@ -1604,6 +1668,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { // SPARK-22864: effectively disable the allocation schedule by setting the period to a // really long value. .set(TEST_SCHEDULE_INTERVAL, 30000L) + .set(WORKER_DECOMMISSION_ENABLED, decommissioningEnabled) sparkConf } @@ -1670,6 +1735,10 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { private def executorsPendingToRemove(manager: ExecutorAllocationManager): Set[String] = { manager.executorMonitor.executorsPendingToRemove() } + + private def executorsDecommissioning(manager: ExecutorAllocationManager): Set[String] = { + manager.executorMonitor.executorsDecommissioning() + } } /** diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala index d95deb1f5f32..6bfd3f72e632 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala @@ -65,7 +65,8 @@ class WorkerDecommissionExtendedSuite extends SparkFunSuite with LocalSparkConte val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] sc.getExecutorIds().tail.foreach { id => - sched.decommissionExecutor(id, ExecutorDecommissionInfo("", false)) + sched.decommissionExecutor(id, ExecutorDecommissionInfo("", false), + adjustTargetNumExecutors = false) assert(rdd3.sortByKey().collect().length === 100) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala index bb0c33acc0af..ea5be21d16d8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala @@ -76,7 +76,9 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { // decom.sh message passing is tested manually. val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] val execs = sched.getExecutorIds() - execs.foreach(execId => sched.decommissionExecutor(execId, ExecutorDecommissionInfo("", false))) + // Make the executors decommission, finish, exit, and not be replaced. + val execsAndDecomInfo = execs.map((_, ExecutorDecommissionInfo("", false))).toArray + sched.decommissionExecutors(execsAndDecomInfo, adjustTargetNumExecutors = true) val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds) assert(asyncCountResult === 10) } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index 7cf008381af6..82f87a5b58b4 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -188,9 +188,12 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS val execToDecommission = getCandidateExecutorToDecom.get logInfo(s"Decommissioning executor ${execToDecommission}") + + // Decommission executor and ensure it is not relaunched by setting adjustTargetNumExecutors sched.decommissionExecutor( execToDecommission, - ExecutorDecommissionInfo("", isHostDecommissioned = false)) + ExecutorDecommissionInfo("", isHostDecommissioned = false), + adjustTargetNumExecutors = true) val decomTime = new SystemClock().getTimeMillis() // Wait for job to finish. @@ -276,6 +279,8 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS } // Wait for the executor to be removed automatically after migration. + // This is set to a high value since github actions is sometimes high latency + // but I've never seen this go for more than a minute. assert(executorRemovedSem.tryAcquire(1, 5L, TimeUnit.MINUTES)) // Since the RDD is cached or shuffled so further usage of same RDD should use the diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 5a3ac213c205..110c311876f4 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -838,6 +838,8 @@ object Unidoc { f.getCanonicalPath.contains("org/apache/spark/shuffle") && !f.getCanonicalPath.contains("org/apache/spark/shuffle/api"))) .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/executor"))) + .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/ExecutorAllocationClient"))) + .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend"))) .map(_.filterNot(f => f.getCanonicalPath.contains("org/apache/spark/unsafe") && !f.getCanonicalPath.contains("org/apache/spark/unsafe/types/CalendarInterval"))) diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh index 8a5208d49a70..cd973df257f0 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh @@ -32,4 +32,4 @@ timeout 60 tail --pid=${WORKER_PID} -f /dev/null date echo "Done" date -sleep 30 +sleep 1 diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 279386d94b35..28ab37152cf4 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -279,6 +279,7 @@ class KubernetesSuite extends SparkFunSuite appArgs = appArgs) val execPods = scala.collection.mutable.Map[String, Pod]() + val podsDeleted = scala.collection.mutable.HashSet[String]() val (patienceInterval, patienceTimeout) = { executorPatience match { case Some(patience) => (patience._1.getOrElse(INTERVAL), patience._2.getOrElse(TIMEOUT)) @@ -339,27 +340,21 @@ class KubernetesSuite extends SparkFunSuite } // Delete the pod to simulate cluster scale down/migration. // This will allow the pod to remain up for the grace period - val pod = kubernetesTestComponents.kubernetesClient.pods() - .withName(name) - pod.delete() + kubernetesTestComponents.kubernetesClient.pods() + .withName(name).delete() logDebug(s"Triggered pod decom/delete: $name deleted") - // Look for the string that indicates we should force kill the first - // Executor. This simulates the pod being fully lost. - logDebug("Waiting for second collect...") + // Make sure this pod is deleted Eventually.eventually(TIMEOUT, INTERVAL) { - assert(kubernetesTestComponents.kubernetesClient - .pods() - .withName(driverPodName) - .getLog - .contains("Waiting some more, please kill exec 1."), - "Decommission test did not complete second collect.") + assert(podsDeleted.contains(name)) + } + // Then make sure this pod is replaced + Eventually.eventually(TIMEOUT, INTERVAL) { + assert(execPods.size == 3) } - logDebug("Force deleting") - val podNoGrace = pod.withGracePeriod(0) - podNoGrace.delete() } case Action.DELETED | Action.ERROR => execPods.remove(name) + podsDeleted += name } } }) @@ -388,7 +383,6 @@ class KubernetesSuite extends SparkFunSuite Eventually.eventually(TIMEOUT, patienceInterval) { execPods.values.nonEmpty should be (true) } - execWatcher.close() execPods.values.foreach(executorPodChecker(_)) Eventually.eventually(patienceTimeout, patienceInterval) { expectedLogOnCompletion.foreach { e => @@ -400,6 +394,7 @@ class KubernetesSuite extends SparkFunSuite s"The application did not complete, did not find str ${e}") } } + execWatcher.close() } protected def doBasicDriverPodCheck(driverPod: Pod): Unit = { diff --git a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py index d34e61611461..5fcad083b007 100644 --- a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py +++ b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py @@ -47,11 +47,6 @@ def addToAcc(x): print("...") time.sleep(30) rdd.count() - print("Waiting some more, please kill exec 1.") - print("...") - time.sleep(30) - print("Executor node should be deleted now") - rdd.count() rdd.collect() print("Final accumulator value is: " + str(acc.value)) print("Finished waiting, stopping Spark.") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala index 58bd56c591d0..a4b7b7a28fd4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala @@ -23,7 +23,9 @@ import scala.util.Random import org.apache.spark.{ExecutorAllocationClient, SparkConf} import org.apache.spark.internal.Logging import org.apache.spark.internal.config.Streaming._ +import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED import org.apache.spark.resource.ResourceProfile +import org.apache.spark.scheduler.ExecutorDecommissionInfo import org.apache.spark.streaming.util.RecurringTimer import org.apache.spark.util.{Clock, Utils} @@ -133,7 +135,13 @@ private[streaming] class ExecutorAllocationManager( logDebug(s"Removable executors (${removableExecIds.size}): ${removableExecIds}") if (removableExecIds.nonEmpty) { val execIdToRemove = removableExecIds(Random.nextInt(removableExecIds.size)) - client.killExecutor(execIdToRemove) + if (conf.get(WORKER_DECOMMISSION_ENABLED)) { + client.decommissionExecutor(execIdToRemove, + ExecutorDecommissionInfo("spark scale down", false), + adjustTargetNumExecutors = true) + } else { + client.killExecutor(execIdToRemove) + } logInfo(s"Requested to kill executor $execIdToRemove") } else { logInfo(s"No non-receiver executors to kill") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala index 65efa10bfcf9..9e06625371ae 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala @@ -27,7 +27,9 @@ import org.scalatestplus.mockito.MockitoSugar import org.apache.spark.{ExecutorAllocationClient, SparkConf} import org.apache.spark.internal.config.{DYN_ALLOCATION_ENABLED, DYN_ALLOCATION_TESTING} import org.apache.spark.internal.config.Streaming._ +import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED import org.apache.spark.resource.ResourceProfile +import org.apache.spark.scheduler.ExecutorDecommissionInfo import org.apache.spark.streaming.{DummyInputDStream, Seconds, StreamingContext, TestSuiteBase} import org.apache.spark.util.{ManualClock, Utils} @@ -44,11 +46,22 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase } test("basic functionality") { + basicTest(decommissioning = false) + } + + test("basic decommissioning") { + basicTest(decommissioning = true) + } + + def basicTest(decommissioning: Boolean): Unit = { // Test that adding batch processing time info to allocation manager // causes executors to be requested and killed accordingly + conf.set(WORKER_DECOMMISSION_ENABLED, decommissioning) // There is 1 receiver, and exec 1 has been allocated to it - withAllocationManager(numReceivers = 1) { case (receiverTracker, allocationManager) => + withAllocationManager(numReceivers = 1, conf = conf) { + case (receiverTracker, allocationManager) => + when(receiverTracker.allocatedExecutors).thenReturn(Map(1 -> Some("1"))) /** Add data point for batch processing time and verify executor allocation */ @@ -83,53 +96,67 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase Map.empty)} } - /** Verify that a particular executor was killed */ - def verifyKilledExec(expectedKilledExec: Option[String]): Unit = { - if (expectedKilledExec.nonEmpty) { - verify(allocationClient, times(1)).killExecutor(meq(expectedKilledExec.get)) + /** Verify that a particular executor was scaled down. */ + def verifyScaledDownExec(expectedExec: Option[String]): Unit = { + if (expectedExec.nonEmpty) { + val decomInfo = ExecutorDecommissionInfo("spark scale down", false) + if (decommissioning) { + verify(allocationClient, times(1)).decommissionExecutor( + meq(expectedExec.get), meq(decomInfo), meq(true)) + verify(allocationClient, never).killExecutor(meq(expectedExec.get)) + } else { + verify(allocationClient, times(1)).killExecutor(meq(expectedExec.get)) + verify(allocationClient, never).decommissionExecutor( + meq(expectedExec.get), meq(decomInfo), meq(true)) + } } else { - verify(allocationClient, never).killExecutor(null) + if (decommissioning) { + verify(allocationClient, never).decommissionExecutor(null, null, false) + verify(allocationClient, never).decommissionExecutor(null, null, true) + } else { + verify(allocationClient, never).killExecutor(null) + } } } // Batch proc time = batch interval, should increase allocation by 1 addBatchProcTimeAndVerifyAllocation(batchDurationMillis) { verifyTotalRequestedExecs(Some(3)) // one already allocated, increase allocation by 1 - verifyKilledExec(None) + verifyScaledDownExec(None) } // Batch proc time = batch interval * 2, should increase allocation by 2 addBatchProcTimeAndVerifyAllocation(batchDurationMillis * 2) { verifyTotalRequestedExecs(Some(4)) - verifyKilledExec(None) + verifyScaledDownExec(None) } // Batch proc time slightly more than the scale up ratio, should increase allocation by 1 addBatchProcTimeAndVerifyAllocation( batchDurationMillis * STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO.defaultValue.get + 1) { verifyTotalRequestedExecs(Some(3)) - verifyKilledExec(None) + verifyScaledDownExec(None) } // Batch proc time slightly less than the scale up ratio, should not change allocation addBatchProcTimeAndVerifyAllocation( batchDurationMillis * STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO.defaultValue.get - 1) { verifyTotalRequestedExecs(None) - verifyKilledExec(None) + verifyScaledDownExec(None) } // Batch proc time slightly more than the scale down ratio, should not change allocation addBatchProcTimeAndVerifyAllocation( batchDurationMillis * STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO.defaultValue.get + 1) { verifyTotalRequestedExecs(None) - verifyKilledExec(None) + verifyScaledDownExec(None) } // Batch proc time slightly more than the scale down ratio, should not change allocation addBatchProcTimeAndVerifyAllocation( batchDurationMillis * STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO.defaultValue.get - 1) { verifyTotalRequestedExecs(None) - verifyKilledExec(Some("2")) + verifyScaledDownExec(Some("2")) } } }