From 82c3c9a22064818360cd5b1ce7fdcad5bb407a1e Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 4 Sep 2019 11:24:13 -0700 Subject: [PATCH 01/43] Add the concept of a Decommissioning state which nodes can enter if the cloud provider/scheduler lets them know they aren't going to be removed immeditely but instead will be removed soon. This concept fits nicely in K8s and also with spot-instances on AWS / pre-emptible instances all of which we can get a notice that our host is going away. For now we simply stop scheduling jobs & caching blocks, in the future we could perform some kind of migration of data during scale-down. --- .../apache/spark/deploy/DeployMessage.scala | 7 ++ .../apache/spark/deploy/ExecutorState.scala | 8 +- .../deploy/client/StandaloneAppClient.scala | 3 + .../client/StandaloneAppClientListener.scala | 2 + .../apache/spark/deploy/master/Master.scala | 31 +++++ .../apache/spark/deploy/worker/Worker.scala | 26 ++++ .../CoarseGrainedExecutorBackend.scala | 31 ++++- .../org/apache/spark/executor/Executor.scala | 23 +++- .../apache/spark/internal/config/Worker.scala | 5 + .../main/scala/org/apache/spark/rdd/RDD.scala | 2 + .../spark/scheduler/ExecutorLossReason.scala | 8 ++ .../org/apache/spark/scheduler/Pool.scala | 4 + .../apache/spark/scheduler/Schedulable.scala | 1 + .../spark/scheduler/SchedulerBackend.scala | 3 + .../spark/scheduler/TaskScheduler.scala | 5 + .../spark/scheduler/TaskSchedulerImpl.scala | 5 + .../spark/scheduler/TaskSetManager.scala | 6 + .../cluster/CoarseGrainedClusterMessage.scala | 2 + .../CoarseGrainedSchedulerBackend.scala | 40 +++++- .../cluster/StandaloneSchedulerBackend.scala | 5 + .../org/apache/spark/util/SignalUtils.scala | 3 +- .../spark/deploy/client/AppClientSuite.scala | 39 +++++- .../spark/scheduler/DAGSchedulerSuite.scala | 2 + .../ExternalClusterManagerSuite.scala | 1 + .../scheduler/WorkerDecommissionSuite.scala | 74 +++++++++++ .../spark/deploy/k8s/KubernetesConf.scala | 3 + .../features/BasicExecutorFeatureStep.scala | 20 ++- .../src/main/dockerfiles/spark/Dockerfile | 1 + .../dev/dev-run-integration-tests.sh | 8 +- .../integrationtest/DecommissionSuite.scala | 48 ++++++++ .../k8s/integrationtest/KubernetesSuite.scala | 115 ++++++++++++++---- sbin/spark-daemon.sh | 15 +++ 32 files changed, 504 insertions(+), 42 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala create mode 100644 resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index fba371dcfb76..7937ee06d2a6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -60,6 +60,11 @@ private[deploy] object DeployMessages { assert (port > 0) } + case class WorkerDecommission( + id: String, + worker: RpcEndpointRef) + extends DeployMessage + case class ExecutorStateChanged( appId: String, execId: Int, @@ -149,6 +154,8 @@ private[deploy] object DeployMessages { case object ReregisterWithMaster // used when a worker attempts to reconnect to a master + case object DecommissionSelf // Mark self for decommissioning. + // AppClient to Master case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef) diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala index 69c98e28931d..0751bcf221f8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala @@ -19,9 +19,13 @@ package org.apache.spark.deploy private[deploy] object ExecutorState extends Enumeration { - val LAUNCHING, RUNNING, KILLED, FAILED, LOST, EXITED = Value + val LAUNCHING, RUNNING, KILLED, FAILED, LOST, EXITED, DECOMMISSIONED = Value type ExecutorState = Value - def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST, EXITED).contains(state) + // DECOMMISSIONED isn't listed as finished since we don't want to remove the executor from + // the worker and the executor still exists - but we do want to avoid scheduling new tasks on it. + private val finishedStates = Seq(KILLED, FAILED, LOST, EXITED) + + def isFinished(state: ExecutorState): Boolean = finishedStates.contains(state) } diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala index 8f17159228f8..bfda81da8393 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala @@ -39,6 +39,7 @@ import org.apache.spark.util.{RpcUtils, ThreadUtils} * Takes a master URL, an app description, and a listener for cluster events, and calls * back the listener when various events occur. * + * * @param masterUrls Each url should look like spark://host:port. */ private[spark] class StandaloneAppClient( @@ -180,6 +181,8 @@ private[spark] class StandaloneAppClient( logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText)) if (ExecutorState.isFinished(state)) { listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost) + } else if (state == ExecutorState.DECOMMISSIONED) { + listener.executorDecommissioned(fullId, message.getOrElse("")) } case WorkerRemoved(id, host, message) => diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala index d8bc1a883def..2e38a6847891 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala @@ -39,5 +39,7 @@ private[spark] trait StandaloneAppClientListener { def executorRemoved( fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit + def executorDecommissioned(fullId: String, message: String): Unit + def workerRemoved(workerId: String, host: String, message: String): Unit } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 8d3795cae707..25dcbe2a8b8f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -243,6 +243,15 @@ private[deploy] class Master( logError("Leadership has been revoked -- master shutting down.") System.exit(0) + case WorkerDecommission(id, workerRef) => + logInfo("Recording worker %s decommissioning".format(id)) + if (state == RecoveryState.STANDBY) { + workerRef.send(MasterInStandby) + } else { + // If a worker attempts to decommission that isn't registered ignore it. + idToWorker.get(id).foreach(decommissionWorker) + } + case RegisterWorker( id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress, resources) => @@ -313,7 +322,9 @@ private[deploy] class Master( // Only retry certain number of times so we don't go into an infinite loop. // Important note: this code path is not exercised by tests, so be very careful when // changing this `if` condition. + // We also don't count failures from decommissioned workers since they are "expected." if (!normalExit + && oldState != ExecutorState.DECOMMISSIONED && appInfo.incrementRetryCount() >= maxExecutorRetries && maxExecutorRetries >= 0) { // < 0 disables this application-killing path val execs = appInfo.executors.values @@ -850,6 +861,26 @@ private[deploy] class Master( true } + private def decommissionWorker(worker: WorkerInfo): Unit = { + if (worker.state != WorkerState.DECOMMISSIONED) { + logInfo("Decommissioning worker %s on %s:%d".format(worker.id, worker.host, worker.port)) + worker.setState(WorkerState.DECOMMISSIONED) + for (exec <- worker.executors.values) { + logInfo("Telling app of decommission executors") + exec.application.driver.send(ExecutorUpdated( + exec.id, ExecutorState.DECOMMISSIONED, + Some("worker decommissioned"), None, workerLost = false)) + exec.state = ExecutorState.DECOMMISSIONED + exec.application.removeExecutor(exec) + } + // On recovery do not add a decommissioned executor + persistenceEngine.removeWorker(worker) + } else { + logWarning("Skipping decommissioning worker %s on %s:%d as worker is already decommissioned". + format(worker.id, worker.host, worker.port)) + } + } + private def removeWorker(worker: WorkerInfo, msg: String): Unit = { logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port) worker.setState(WorkerState.DEAD) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 4be495ac4f13..d988bcedb47f 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -67,6 +67,14 @@ private[deploy] class Worker( Utils.checkHost(host) assert (port > 0) + // If worker decommissioning is enabled register a handler on PWR to shutdown. + if (conf.get(WORKER_DECOMMISSION_ENABLED)) { + logInfo("Registering SIGPWR handler to trigger decommissioning.") + SignalUtils.register("PWR")(decommissionSelf) + } else { + logInfo("Worker decommissioning not enabled, SIGPWR will result in exiting.") + } + // A scheduled executor used to send messages at the specified time. private val forwardMessageScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler") @@ -128,6 +136,7 @@ private[deploy] class Worker( private val workerUri = RpcEndpointAddress(rpcEnv.address, endpointName).toString private var registered = false private var connected = false + private var decommissioned = false private val workerId = generateWorkerId() private val sparkHome = if (sys.props.contains(IS_TESTING.key)) { @@ -549,6 +558,8 @@ private[deploy] class Worker( case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, resources_) => if (masterUrl != activeMasterUrl) { logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") + } else if (decommissioned) { + logWarning("Asked to launch an executor while decommissioned. Not launching executor.") } else { try { logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) @@ -672,6 +683,9 @@ private[deploy] class Worker( case ApplicationFinished(id) => finishedApps += id maybeCleanupApplication(id) + + case DecommissionSelf => + decommissionSelf() } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { @@ -771,6 +785,18 @@ private[deploy] class Worker( } } + private[deploy] def decommissionSelf(): Boolean = { + if (conf.get(WORKER_DECOMMISSION_ENABLED)) { + logDebug("Decommissioning self") + decommissioned = true + sendToMaster(WorkerDecommission(workerId, self)) + } else { + logWarning("Asked to decommission self, but decommissioning not enabled") + } + // Return true since can be called as a signal handler + true + } + private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = { val driverId = driverStateChanged.driverId val exception = driverStateChanged.exception diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index b4bca1e9401e..c880ddc62be7 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -40,7 +40,7 @@ import org.apache.spark.rpc._ import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.serializer.SerializerInstance -import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.{SignalUtils, ThreadUtils, Utils} private[spark] class CoarseGrainedExecutorBackend( override val rpcEnv: RpcEnv, @@ -57,6 +57,7 @@ private[spark] class CoarseGrainedExecutorBackend( private[this] val stopping = new AtomicBoolean(false) var executor: Executor = null + @volatile private var decommissioned = false @volatile var driver: Option[RpcEndpointRef] = None // If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need @@ -71,6 +72,9 @@ private[spark] class CoarseGrainedExecutorBackend( private[executor] val taskResources = new mutable.HashMap[Long, Map[String, ResourceInformation]] override def onStart(): Unit = { + logInfo("Registering PWR handler.") + SignalUtils.register("PWR")(decommissionSelf) + logInfo("Connecting to driver: " + driverUrl) val resources = parseOrFindResources(resourcesFileOpt) rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => @@ -137,6 +141,8 @@ private[spark] class CoarseGrainedExecutorBackend( case LaunchTask(data) => if (executor == null) { exitExecutor(1, "Received LaunchTask command but executor was null") + } else if (decommissioned) { + logWarning("Asked to launch a task while decommissioned. Not launching.") } else { val taskDesc = TaskDescription.decode(data.value) logInfo("Got assigned task " + taskDesc.taskId) @@ -220,6 +226,29 @@ private[spark] class CoarseGrainedExecutorBackend( System.exit(code) } + + private def decommissionSelf(): Boolean = { + logError("Decommissioning self") + try { + decommissioned = true + // Tell master we are are decommissioned so it stops trying to schedule us + if (driver.nonEmpty) { + driver.get.send(DecommissionExecutor(executorId)) + } else { + logError("No driver to message decommissioning.") + } + if (executor != null) { + executor.decommission() + } + logInfo("Done decommissioning self.") + // Return true since we are handling a signal + true + } catch { + case e: Exception => + logError(s"Error ${e} during attempt to decommission self") + false + } + } } private[spark] object CoarseGrainedExecutorBackend extends Logging { diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 0f595d095a22..a487b7a0f3e0 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -228,16 +228,33 @@ private[spark] class Executor( */ private var heartbeatFailures = 0 + /** + * Flag to prevent launching new tasks while decommissioned. There could be a race condition + * accessing this, but decommissioning is only intended to help not be a hard stop. + */ + private var decommissioned = false + heartbeater.start() metricsPoller.start() private[executor] def numRunningTasks: Int = runningTasks.size() + /** + * Mark an executor for decommissioning and avoid launching new tasks. + */ + private[spark] def decommission(): Unit = { + decommissioned = true + } + def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = { - val tr = new TaskRunner(context, taskDescription) - runningTasks.put(taskDescription.taskId, tr) - threadPool.execute(tr) + if (!decommissioned) { + val tr = new TaskRunner(context, taskDescription) + runningTasks.put(taskDescription.taskId, tr) + threadPool.execute(tr) + } else { + log.info(s"Not launching task, executor is in decommissioned state") + } } def killTask(taskId: Long, interruptThread: Boolean, reason: String): Unit = { diff --git a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala index f1eaae29f18d..2b175c1e14ee 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala @@ -71,4 +71,9 @@ private[spark] object Worker { ConfigBuilder("spark.worker.ui.compressedLogFileLengthCacheSize") .intConf .createWithDefault(100) + + private[spark] val WORKER_DECOMMISSION_ENABLED = + ConfigBuilder("spark.worker.decommission.enabled") + .booleanConf + .createWithDefault(false) } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 5788b70e75a7..5ea2aaf91a94 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -361,6 +361,7 @@ abstract class RDD[T: ClassTag]( readCachedBlock = false computeOrReadCheckpoint(partition, context) }) match { + // Block hit. case Left(blockResult) => if (readCachedBlock) { val existingMetrics = context.taskMetrics().inputMetrics @@ -374,6 +375,7 @@ abstract class RDD[T: ClassTag]( } else { new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]]) } + // Need to compute the block. case Right(iter) => new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]]) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala index 46a35b6a2eaf..18579e25da01 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala @@ -58,3 +58,11 @@ private [spark] object LossReasonPending extends ExecutorLossReason("Pending los private[spark] case class SlaveLost(_message: String = "Slave lost", workerLost: Boolean = false) extends ExecutorLossReason(_message) + +/** + * A loss reason that means the worker is marked for decommissioning. + * + * This is used by the task scheduler to remove state associated with the executor, but + * not yet fail any tasks that were running in the executor before the executor is "fully" lost. + */ +private [spark] object WorkerDecommission extends ExecutorLossReason("Worker Decommission.") diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala index 80805df256a1..2e2851eb9070 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala @@ -88,6 +88,10 @@ private[spark] class Pool( schedulableQueue.asScala.foreach(_.executorLost(executorId, host, reason)) } + override def executorDecommission(executorId: String): Unit = { + schedulableQueue.asScala.foreach(_.executorDecommission(executorId)) + } + override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = { var shouldRevive = false for (schedulable <- schedulableQueue.asScala) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala index b6f88ed0a93a..8cc239c81d11 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala @@ -43,6 +43,7 @@ private[spark] trait Schedulable { def removeSchedulable(schedulable: Schedulable): Unit def getSchedulableByName(name: String): Schedulable def executorLost(executorId: String, host: String, reason: ExecutorLossReason): Unit + def executorDecommission(executorId: String): Unit def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index 9159d2a0158d..4752353046c1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -27,6 +27,9 @@ private[spark] trait SchedulerBackend { def start(): Unit def stop(): Unit + /** + * Update the current offers and schedule tasks + */ def reviveOffers(): Unit def defaultParallelism(): Int diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 15f5d20e9be7..e9e638a3645a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -98,6 +98,11 @@ private[spark] trait TaskScheduler { */ def applicationId(): String = appId + /** + * Process a decommissioning executor. + */ + def executorDecommission(executorId: String): Unit + /** * Process a lost executor */ diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index f25a36c7af22..fb0b5fa56bc1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -732,6 +732,11 @@ private[spark] class TaskSchedulerImpl( } } + override def executorDecommission(executorId: String): Unit = { + rootPool.executorDecommission(executorId) + backend.reviveOffers() + } + override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = { var failedExecutor: Option[String] = None diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 9defbefabb86..e756a246b1e7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -1035,6 +1035,12 @@ private[spark] class TaskSetManager( levels.toArray } + def executorDecommission(execId: String): Unit = { + recomputeLocality() + // Future consideration: if an executor is decommissioned it may make sense to add the current + // tasks to the spec exec queue. + } + def recomputeLocality(): Unit = { val previousLocalityLevel = myLocalityLevels(currentLocalityIndex) myLocalityLevels = computeValidLocalityLevels() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index a90fff02ac73..805eec01f2c9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -97,6 +97,8 @@ private[spark] object CoarseGrainedClusterMessages { case class RemoveExecutor(executorId: String, reason: ExecutorLossReason) extends CoarseGrainedClusterMessage + case class DecommissionExecutor(executorId: String) extends CoarseGrainedClusterMessage + case class RemoveWorker(workerId: String, host: String, message: String) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index ea045e6280e4..47927b2c08d4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -125,6 +125,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Executors that have been lost, but for which we don't yet know the real exit reason. protected val executorsPendingLossReason = new HashSet[String] + // Executors which are being decommissioned + protected val executorsPendingDecommission = new HashSet[String] protected val addressToExecutorId = new HashMap[RpcAddress, String] @@ -188,6 +190,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case UpdateDelegationTokens(newDelegationTokens) => updateDelegationTokens(newDelegationTokens) + case DecommissionExecutor(executorId) => + logError(s"Decommissioning executor ${executorId}") + decommissionExecutor(executorId) + case RemoveExecutor(executorId, reason) => // We will remove the executor's state and cannot restore it. However, the connection // between the driver and the executor may be still alive so that the executor won't exit @@ -329,7 +335,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp private def executorIsAlive(executorId: String): Boolean = synchronized { !executorsPendingToRemove.contains(executorId) && - !executorsPendingLossReason.contains(executorId) + !executorsPendingLossReason.contains(executorId) && + !executorsPendingDecommission.contains(executorId) } // Launch tasks returned by a set of resource offers @@ -402,6 +409,27 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp scheduler.workerRemoved(workerId, host, message) } + /** + * Mark a given executor as decommissioned and stop making resource offers for it. + */ + private def decommissionExecutor(executorId: String): Boolean = { + val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized { + // Only bother decommissioning executors which are alive. + if (executorIsAlive(executorId)) { + executorsPendingDecommission += executorId + true + } else { + false + } + } + + if (shouldDisable) { + logInfo(s"Decommissioning executor $executorId.") + scheduler.executorDecommission(executorId) + } + shouldDisable + } + /** * Stop making resource offers for the given executor. The executor is marked as lost with * the loss reason still pending. @@ -529,6 +557,16 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp logError(t.getMessage, t))(ThreadUtils.sameThread) } + /** + * Called by subclasses when notified of a decommissioning worker. + */ + private[spark] def decommissionExecutor(executorId: String): Unit = { + // Only log the failure since we don't care about the result. + driverEndpoint.ask[Boolean](DecommissionExecutor(executorId)).onFailure { case t => + logError(t.getMessage, t) + }(ThreadUtils.sameThread) + } + def sufficientResourcesRegistered(): Boolean = true override def isReady(): Boolean = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index a9b607d8cc38..02f612d6c990 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -173,6 +173,11 @@ private[spark] class StandaloneSchedulerBackend( removeExecutor(fullId.split("/")(1), reason) } + override def executorDecommissioned(fullId: String, message: String) { + logInfo("Executor %s decommissioned: %s".format(fullId, message)) + decommissionExecutor(fullId.split("/")(1)) + } + override def workerRemoved(workerId: String, host: String, message: String): Unit = { logInfo("Worker %s removed: %s".format(workerId, message)) removeWorker(workerId, host, message) diff --git a/core/src/main/scala/org/apache/spark/util/SignalUtils.scala b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala index 5a24965170ce..1d507185802c 100644 --- a/core/src/main/scala/org/apache/spark/util/SignalUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala @@ -60,10 +60,11 @@ private[spark] object SignalUtils extends Logging { if (SystemUtils.IS_OS_UNIX) { try { val handler = handlers.getOrElseUpdate(signal, { - logInfo("Registered signal handler for " + signal) + logInfo("Registering signal handler for " + signal) new ActionHandler(new Signal(signal)) }) handler.register(action) + logInfo("Registered signal handler for " + signal) } catch { case ex: Exception => logWarning(s"Failed to register signal handler for " + signal, ex) } diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala index a1d3077b8fc8..a3e39d7f5372 100644 --- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.deploy.{ApplicationDescription, Command} import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.master.{ApplicationInfo, Master} import org.apache.spark.deploy.worker.Worker -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.rpc.RpcEnv import org.apache.spark.util.Utils @@ -44,13 +44,13 @@ class AppClientSuite with Eventually with ScalaFutures { private val numWorkers = 2 - private val conf = new SparkConf() - private val securityManager = new SecurityManager(conf) + private var conf: SparkConf = null private var masterRpcEnv: RpcEnv = null private var workerRpcEnvs: Seq[RpcEnv] = null private var master: Master = null private var workers: Seq[Worker] = null + private var securityManager: SecurityManager = null /** * Start the local cluster. @@ -58,6 +58,8 @@ class AppClientSuite */ override def beforeAll(): Unit = { super.beforeAll() + conf = new SparkConf().set(config.Worker.WORKER_DECOMMISSION_ENABLED.key, "true") + securityManager = new SecurityManager(conf) masterRpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 0, conf, securityManager) workerRpcEnvs = (0 until numWorkers).map { i => RpcEnv.create(Worker.SYSTEM_NAME + i, "localhost", 0, conf, securityManager) @@ -111,8 +113,23 @@ class AppClientSuite assert(apps.head.getExecutorLimit === numExecutorsRequested, s"executor request failed") } + + // Save the executor id before decommissioning so we can kill it + val application = getApplications().head + val executors = application.executors + val executorId: String = executors.head._2.fullId + + // Send a decommission self to all the workers + // Note: normally the worker would send this on their own. + workers.foreach(worker => worker.decommissionSelf()) + + // Decommissioning is async. + eventually(timeout(1.seconds), interval(10.millis)) { + // We only record decommissioning for the executor we've requested + assert(ci.listener.execDecommissionedList.size === 1) + } + // Send request to kill executor, verify request was made - val executorId: String = getApplications().head.executors.head._2.fullId whenReady( ci.client.killExecutors(Seq(executorId)), timeout(10.seconds), @@ -120,6 +137,15 @@ class AppClientSuite assert(acknowledged) } + // Verify that asking for executors on the decommissioned workers fails + whenReady( + ci.client.requestTotalExecutors(numExecutorsRequested), + timeout(10.seconds), + interval(10.millis)) { acknowledged => + assert(acknowledged) + } + assert(getApplications().head.executors.size === 0) + // Issue stop command for Client to disconnect from Master ci.client.stop() @@ -189,6 +215,7 @@ class AppClientSuite val deadReasonList = new ConcurrentLinkedQueue[String]() val execAddedList = new ConcurrentLinkedQueue[String]() val execRemovedList = new ConcurrentLinkedQueue[String]() + val execDecommissionedList = new ConcurrentLinkedQueue[String]() def connected(id: String): Unit = { connectedIdList.add(id) @@ -218,6 +245,10 @@ class AppClientSuite execRemovedList.add(id) } + def executorDecommissioned(id: String, message: String): Unit = { + execDecommissionedList.add(id) + } + def workerRemoved(workerId: String, host: String, message: String): Unit = {} } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index c27d50ab66e6..446aa5904f72 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -167,6 +167,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } override def setDAGScheduler(dagScheduler: DAGScheduler) = {} override def defaultParallelism() = 2 + override def executorDecommission(executorId: String) = {} override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None @@ -707,6 +708,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId, executorUpdates: Map[(Int, Int), ExecutorMetrics]): Boolean = true + override def executorDecommission(executorId: String): Unit = {} override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index 4e71ec1ea7b3..9f593e0039ad 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -89,6 +89,7 @@ private class DummyTaskScheduler extends TaskScheduler { override def notifyPartitionCompletion(stageId: Int, partitionId: Int): Unit = {} override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {} override def defaultParallelism(): Int = 2 + override def executorDecommission(executorId: String): Unit = {} override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala new file mode 100644 index 000000000000..483c449fa9e1 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.concurrent.TimeoutException +import scala.concurrent.duration._ + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite} +import org.apache.spark.internal.config +import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend +import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils} + +class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { + + + override def beforeEach(): Unit = { + val conf = new SparkConf().setAppName("test").setMaster("local") + .set(config.Worker.WORKER_DECOMMISSION_ENABLED.key, "true") + + sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf) + } + + test("verify task with no decommissioning works as expected") { + val input = sc.parallelize(1 to 10) + input.count() + val sleepyRdd = input.mapPartitions{ x => + Thread.sleep(100) + x + } + assert(sleepyRdd.count() === 10) + } + + test("verify a task with all workers decommissioned succeeds") { + val input = sc.parallelize(1 to 10) + // Do a count to wait for the executors to be registered. + input.count() + val sleepyRdd = input.mapPartitions{ x => + Thread.sleep(100) + x + } + // Start the task + val asyncCount = sleepyRdd.countAsync() + Thread.sleep(10) + // Decommission all the executors, this should not halt the current task. + // The master passing message is tested with + val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] + val execs = sched.getExecutorIds() + execs.foreach(execId => sched.decommissionExecutor(execId)) + assert(asyncCount.get() === 10) + // Try and launch task after decommissioning, this should fail + val postDecommissioned = input.map(x => x) + val postDecomAsyncCount = postDecommissioned.countAsync() + val thrown = intercept[java.util.concurrent.TimeoutException]{ + val result = ThreadUtils.awaitResult(postDecomAsyncCount, 1.seconds) + } + assert(postDecomAsyncCount.isCompleted === false, + "After exec decommission new task could not launch") + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 09943b7974ed..f42f3415baa1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -55,6 +55,9 @@ private[spark] abstract class KubernetesConf(val sparkConf: SparkConf) { } } + def workerDecommissioning: Boolean = + sparkConf.get(org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED) + def nodeSelector: Map[String, String] = KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index d88bd5858bc9..203c7d6cfb55 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -24,6 +24,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Python._ import org.apache.spark.rpc.RpcEndpointAddress @@ -33,7 +34,7 @@ import org.apache.spark.util.Utils private[spark] class BasicExecutorFeatureStep( kubernetesConf: KubernetesExecutorConf, secMgr: SecurityManager) - extends KubernetesFeatureConfigStep { + extends KubernetesFeatureConfigStep with Logging { // Consider moving some of these fields to KubernetesConf or KubernetesExecutorSpecificConf private val executorContainerImage = kubernetesConf @@ -192,6 +193,21 @@ private[spark] class BasicExecutorFeatureStep( .endResources() .build() }.getOrElse(executorContainer) + val containerWithLifecycle = kubernetesConf.workerDecommissioning match { + case false => + logInfo("Decommissioning not enabled, skipping shutdown script") + containerWithLimitCores + case true => + logInfo("Adding decommission script to lifecycle") + new ContainerBuilder(containerWithLimitCores).withNewLifecycle() + .withNewPreStop() + .withNewExec() + .addToCommand("/opt/decom.sh") + .endExec() + .endPreStop() + .endLifecycle() + .build() + } val ownerReference = kubernetesConf.driverPod.map { pod => new OwnerReferenceBuilder() .withController(true) @@ -219,6 +235,6 @@ private[spark] class BasicExecutorFeatureStep( kubernetesConf.get(KUBERNETES_EXECUTOR_SCHEDULER_NAME) .foreach(executorPod.getSpec.setSchedulerName) - SparkPod(executorPod, containerWithLimitCores) + SparkPod(executorPod, containerWithLifecycle) } } diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile index b6eeff1cd18a..e43de59309a8 100644 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile @@ -44,6 +44,7 @@ COPY jars /opt/spark/jars COPY bin /opt/spark/bin COPY sbin /opt/spark/sbin COPY kubernetes/dockerfiles/spark/entrypoint.sh /opt/ +COPY kubernetes/dockerfiles/spark/decom.sh /opt/ COPY examples /opt/spark/examples COPY kubernetes/tests /opt/spark/tests COPY data /opt/spark/data diff --git a/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh b/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh index 1f0a8035cea7..cdcb95d6f491 100755 --- a/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh +++ b/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh @@ -16,7 +16,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -set -xo errexit +set -exo errexit TEST_ROOT_DIR=$(git rev-parse --show-toplevel) DEPLOY_MODE="minikube" @@ -41,6 +41,9 @@ SCALA_VERSION=$("$MVN" help:evaluate -Dexpression=scala.binary.version 2>/dev/nu | grep -v "WARNING"\ | tail -n 1) +export SCALA_VERSION +echo $SCALA_VERSION + # Parse arguments while (( "$#" )); do case $1 in @@ -105,7 +108,8 @@ while (( "$#" )); do shift ;; *) - break + echo "Unexpected command line flag $2 $1." + exit 1 ;; esac shift diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala new file mode 100644 index 000000000000..4f75114e2647 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import org.apache.spark.internal.config.Worker + +private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite => + + import DecommissionSuite._ + import KubernetesSuite.k8sTestTag + + test("Test basic decommissioning", k8sTestTag) { + sparkAppConf + .set(Worker.WORKER_DECOMMISSION_ENABLED.key, "true") + .set("spark.kubernetes.pyspark.pythonVersion", "3") + .set("spark.kubernetes.container.image", pyImage) + + runSparkApplicationAndVerifyCompletion( + appResource = PYSPARK_DECOMISSIONING, + mainClass = "", + expectedLogOnCompletion = Seq("Decommissioning executor"), + appArgs = Array.empty[String], + driverPodChecker = doBasicDriverPyPodCheck, + executorPodChecker = doBasicExecutorPyPodCheck, + appLocator = appLocator, + isJVM = false, + decommissioningTest = true) + } +} + +private[spark] object DecommissionSuite { + val TEST_LOCAL_PYSPARK: String = "local:///opt/spark/tests/" + val PYSPARK_DECOMISSIONING: String = TEST_LOCAL_PYSPARK + "decommissioning.py" +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 0d4fcccc35cf..1f7e8fc839e4 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -42,7 +42,8 @@ import org.apache.spark.internal.config._ class KubernetesSuite extends SparkFunSuite with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite with PVTestsSuite - with DepsTestsSuite with RTestsSuite with Logging with Eventually with Matchers { + with DepsTestsSuite with DecommissionSuite with RTestsSuite with Logging with Eventually with Matchers { + import KubernetesSuite._ @@ -254,6 +255,7 @@ class KubernetesSuite extends SparkFunSuite } } + // scalastyle:off argcount protected def runSparkApplicationAndVerifyCompletion( appResource: String, mainClass: String, @@ -264,28 +266,37 @@ class KubernetesSuite extends SparkFunSuite appLocator: String, isJVM: Boolean, pyFiles: Option[String] = None, - executorPatience: Option[(Option[Interval], Option[Timeout])] = None): Unit = { + executorPatience: Option[(Option[Interval], Option[Timeout])] = None, + decommissioningTest: Boolean = false): Unit = { + + // scalastyle:on argcount val appArguments = SparkAppArguments( mainAppResource = appResource, mainClass = mainClass, appArgs = appArgs) - SparkAppLauncher.launch( - appArguments, - sparkAppConf, - TIMEOUT.value.toSeconds.toInt, - sparkHomeDir, - isJVM, - pyFiles) - val driverPod = kubernetesTestComponents.kubernetesClient - .pods() - .withLabel("spark-app-locator", appLocator) - .withLabel("spark-role", "driver") - .list() - .getItems - .get(0) - driverPodChecker(driverPod) val execPods = scala.collection.mutable.Map[String, Pod]() + val (patienceInterval, patienceTimeout) = { + executorPatience match { + case Some(patience) => (patience._1.getOrElse(INTERVAL), patience._2.getOrElse(TIMEOUT)) + case _ => (INTERVAL, TIMEOUT) + } + } + def checkPodReady(namespace: String, name: String) = { + val execPod = kubernetesTestComponents.kubernetesClient + .pods() + .inNamespace(namespace) + .withName(name) + .get() + val resourceStatus = execPod.getStatus + val conditions = resourceStatus.getConditions().asScala + val conditionTypes = conditions.map(_.getType()) + val readyConditions = conditions.filter{cond => cond.getType() == "Ready"} + val result = readyConditions + .map(cond => cond.getStatus() == "True") + .headOption.getOrElse(false) + result + } val execWatcher = kubernetesTestComponents.kubernetesClient .pods() .withLabel("spark-app-locator", appLocator) @@ -296,28 +307,80 @@ class KubernetesSuite extends SparkFunSuite logInfo("Ending watch of executors") override def eventReceived(action: Watcher.Action, resource: Pod): Unit = { val name = resource.getMetadata.getName + val namespace = resource.getMetadata().getNamespace() action match { - case Action.ADDED | Action.MODIFIED => + case Action.MODIFIED => + execPods(name) = resource + case Action.ADDED => + logInfo(s"Add event received for $name.") execPods(name) = resource + // If testing decommissioning start a thread to simulate + // decommissioning. + if (decommissioningTest && execPods.size == 1) { + // Wait for all the containers in the pod to be running + logInfo("Waiting for first pod to become OK prior to deletion") + Eventually.eventually(patienceTimeout, patienceInterval) { + val result = checkPodReady(namespace, name) + result shouldBe (true) + } + // Sleep a small interval to allow execution of job + logInfo("Sleeping before killing pod.") + Thread.sleep(30000) + // Delete the pod to simulate cluster scale down/migration. + val pod = kubernetesTestComponents.kubernetesClient.pods().withName(name) + pod.delete() + logInfo(s"Pod: $name deleted") + } else { + } case Action.DELETED | Action.ERROR => execPods.remove(name) } } }) - val (patienceInterval, patienceTimeout) = { - executorPatience match { - case Some(patience) => (patience._1.getOrElse(INTERVAL), patience._2.getOrElse(TIMEOUT)) - case _ => (INTERVAL, TIMEOUT) - } - } + logDebug("Starting Spark K8s job") + SparkAppLauncher.launch( + appArguments, + sparkAppConf, + TIMEOUT.value.toSeconds.toInt, + sparkHomeDir, + isJVM, + pyFiles) + val driverPod = kubernetesTestComponents.kubernetesClient + .pods() + .withLabel("spark-app-locator", appLocator) + .withLabel("spark-role", "driver") + .list() + .getItems + .get(0) + + driverPodChecker(driverPod) + // If we're testing decommissioning we delete all the executors, but we should have + // an executor at some point. Eventually.eventually(patienceTimeout, patienceInterval) { execPods.values.nonEmpty should be (true) } + // If decommissioning we need to wait and check the executors were removed + if (decommissioningTest) { + // Sleep a small interval to ensure everything is registered. + Thread.sleep(100) + // Wait for the executors to become ready + Eventually.eventually(patienceTimeout, patienceInterval) { + val anyReadyPods = ! execPods.map{ + case (name, resource) => + (name, resource.getMetadata().getNamespace()) + }.filter{ + case (name, namespace) => checkPodReady(namespace, name) + }.isEmpty + val podsEmpty = execPods.values.isEmpty + val podsReadyOrDead = anyReadyPods || podsEmpty + podsReadyOrDead shouldBe (true) + } + } execWatcher.close() execPods.values.foreach(executorPodChecker(_)) - Eventually.eventually(TIMEOUT, patienceInterval) { + Eventually.eventually(patienceTimeout, patienceInterval) { expectedLogOnCompletion.foreach { e => assert(kubernetesTestComponents.kubernetesClient .pods() @@ -425,5 +488,5 @@ private[spark] object KubernetesSuite { val SPARK_REMOTE_MAIN_CLASS: String = "org.apache.spark.examples.SparkRemoteFileTest" val SPARK_DRIVER_MAIN_CLASS: String = "org.apache.spark.examples.DriverSubmissionTest" val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) - val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) + val INTERVAL = PatienceConfiguration.Interval(Span(1, Seconds)) } diff --git a/sbin/spark-daemon.sh b/sbin/spark-daemon.sh index 6de67e039b48..81f2fd40a706 100755 --- a/sbin/spark-daemon.sh +++ b/sbin/spark-daemon.sh @@ -215,6 +215,21 @@ case $option in fi ;; + (decommission) + + if [ -f $pid ]; then + TARGET_ID="$(cat "$pid")" + if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then + echo "decommissioning $command" + kill -s SIGPWR "$TARGET_ID" + else + echo "no $command to decommission" + fi + else + echo "no $command to decommission" + fi + ;; + (status) if [ -f $pid ]; then From c935a6eba113b7ffd67f6efde72fe91ed98a47eb Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 4 Sep 2019 17:56:06 -0700 Subject: [PATCH 02/43] Add the decom script and test python --- .../src/main/dockerfiles/spark/decom.sh | 38 +++++++++++++++++ .../tests/decommissioning.py | 42 +++++++++++++++++++ 2 files changed, 80 insertions(+) create mode 100644 resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh create mode 100644 resource-managers/kubernetes/integration-tests/tests/decommissioning.py diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh new file mode 100644 index 000000000000..e480f61df27c --- /dev/null +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh @@ -0,0 +1,38 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +set -ex +export LOG=/dev/termination-log +echo "Asked to decommission" > ${LOG} +# Find the pid to signal +date | tee -a ${LOG} +WORKER_PID=$(ps axf | grep java |grep org.apache.spark.executor.CoarseGrainedExecutorBackend | grep -v grep) +echo "Using worker pid $WORKER_PID" | tee -a ${LOG} +kill -s SIGPWR ${WORKER_PID} | tee -a ${LOG} +# For now we expect this to timeout, since we don't start exiting the backend. +echo "Waiting for worker pid to exit" |tee -a ${LOG} +date | tee -a ${LOG} +# If the worker does exit stop blocking the cleanup. +timeout 60 tail --pid=${WORKER_PID} -f /dev/null | tee -a ${LOG} +date | tee -a ${LOG} +echo "Done" | tee -a ${LOG} +date | tee -a ${LOG} +echo "Term log was:" +cat $LOG diff --git a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py new file mode 100644 index 000000000000..c7d1fc64409c --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import print_function + +import sys +import time + +from pyspark.sql import SparkSession + + +if __name__ == "__main__": + """ + Usage: decomissioning_water + """ + print("Starting decom test") + spark = SparkSession \ + .builder \ + .appName("PyMemoryTest") \ + .getOrCreate() + sc = spark._sc + rdd = sc.parallelize(range(10)) + rdd.collect() + print("Waiting to give nodes time to finish.") + time.sleep(120) + print("Stopping spark") + spark.stop() + sys.exit(0) From 16b3880110d0c9bde4e522e464da93306e9ad777 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 4 Sep 2019 19:00:07 -0700 Subject: [PATCH 03/43] Cleanup and reduce time a bit --- .../docker/src/main/dockerfiles/spark/Dockerfile | 1 + .../docker/src/main/dockerfiles/spark/decom.sh | 0 .../deploy/k8s/integrationtest/KubernetesSuite.scala | 10 +++++----- .../integration-tests/tests/decommissioning.py | 3 ++- 4 files changed, 8 insertions(+), 6 deletions(-) mode change 100644 => 100755 resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile index e43de59309a8..f6d0c35b5235 100644 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile @@ -53,6 +53,7 @@ ENV SPARK_HOME /opt/spark WORKDIR /opt/spark/work-dir RUN chmod g+w /opt/spark/work-dir +RUN chmod a+x /opt/decom.sh ENTRYPOINT [ "/opt/entrypoint.sh" ] diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh old mode 100644 new mode 100755 diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 1f7e8fc839e4..9e7e1e9c17e0 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -40,7 +40,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ class KubernetesSuite extends SparkFunSuite - with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite + with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite with PVTestsSuite with DepsTestsSuite with DecommissionSuite with RTestsSuite with Logging with Eventually with Matchers { @@ -318,18 +318,18 @@ class KubernetesSuite extends SparkFunSuite // decommissioning. if (decommissioningTest && execPods.size == 1) { // Wait for all the containers in the pod to be running - logInfo("Waiting for first pod to become OK prior to deletion") + logDebug("Waiting for first pod to become OK prior to deletion") Eventually.eventually(patienceTimeout, patienceInterval) { val result = checkPodReady(namespace, name) result shouldBe (true) } // Sleep a small interval to allow execution of job - logInfo("Sleeping before killing pod.") - Thread.sleep(30000) + logDebug("Sleeping before killing pod.") + Thread.sleep(5000) // Delete the pod to simulate cluster scale down/migration. val pod = kubernetesTestComponents.kubernetesClient.pods().withName(name) pod.delete() - logInfo(s"Pod: $name deleted") + logDebug(s"Pod: $name deleted") } else { } case Action.DELETED | Action.ERROR => diff --git a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py index c7d1fc64409c..088bbf993b2c 100644 --- a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py +++ b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py @@ -36,7 +36,8 @@ rdd = sc.parallelize(range(10)) rdd.collect() print("Waiting to give nodes time to finish.") - time.sleep(120) + time.sleep(10) + rdd.collect() print("Stopping spark") spark.stop() sys.exit(0) From 8f7ed26727052eb541d80aa6afabb669c6fc7132 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 12 Sep 2019 17:15:52 -0700 Subject: [PATCH 04/43] Add decom script was previously untracked --- sbin/decommission-slave.sh | 57 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 sbin/decommission-slave.sh diff --git a/sbin/decommission-slave.sh b/sbin/decommission-slave.sh new file mode 100644 index 000000000000..4bbf257ff1d3 --- /dev/null +++ b/sbin/decommission-slave.sh @@ -0,0 +1,57 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# A shell script to decommission all workers on a single slave +# +# Environment variables +# +# SPARK_WORKER_INSTANCES The number of worker instances that should be +# running on this slave. Default is 1. + +# Usage: decommission-slave.sh [--block-until-exit] +# Decommissions all slaves on this worker machine + +set -ex + +if [ -z "${SPARK_HOME}" ]; then + export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)" +fi + +. "${SPARK_HOME}/sbin/spark-config.sh" + +. "${SPARK_HOME}/bin/load-spark-env.sh" + +if [ "$SPARK_WORKER_INSTANCES" = "" ]; then + "${SPARK_HOME}/sbin"/spark-daemon.sh decommission org.apache.spark.deploy.worker.Worker 1 +else + for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do + "${SPARK_HOME}/sbin"/spark-daemon.sh decommission org.apache.spark.deploy.worker.Worker $(( $i + 1 )) + done +fi + +# Check if --block-until-exit is set. +# This is done for systems which block on the decomissioning script and on exit +# shut down the entire system (e.g. K8s). +if [ "$1" == "--block-until-exit" ]; then + shift + # For now we only block on the 0th instance if there multiple instances. + instance=$1 + pid="$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command-$instance.pid" + wait $pid +fi From 55b6f9ec0d8ac985611170fbc4addac5bc7578cb Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 10 Oct 2019 11:23:48 -0700 Subject: [PATCH 05/43] Add some more docstring --- .../main/scala/org/apache/spark/deploy/DeployMessage.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 7937ee06d2a6..18305ad3746a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -60,6 +60,10 @@ private[deploy] object DeployMessages { assert (port > 0) } + /** + * @param id the worker id + * @param worker the worker endpoint ref + */ case class WorkerDecommission( id: String, worker: RpcEndpointRef) @@ -154,7 +158,7 @@ private[deploy] object DeployMessages { case object ReregisterWithMaster // used when a worker attempts to reconnect to a master - case object DecommissionSelf // Mark self for decommissioning. + case object DecommissionSelf // Mark as decommissioned. May be Master to Worker in the future. // AppClient to Master From bfa06ce0e4cf583107a4098b4680891c32403ef2 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 8 Nov 2019 14:44:44 -0800 Subject: [PATCH 06/43] Style fix (long line) --- .../spark/deploy/k8s/integrationtest/KubernetesSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 9e7e1e9c17e0..aadfbe7ad95b 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -42,7 +42,8 @@ import org.apache.spark.internal.config._ class KubernetesSuite extends SparkFunSuite with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite with PVTestsSuite - with DepsTestsSuite with DecommissionSuite with RTestsSuite with Logging with Eventually with Matchers { + with DepsTestsSuite with DecommissionSuite with RTestsSuite with Logging with Eventually + with Matchers { import KubernetesSuite._ From a63b68feb5af9ac81b7ca9fefceee26c9b31ed2c Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 12 Nov 2019 08:41:22 -0800 Subject: [PATCH 07/43] Minor style and comment fixes --- .../org/apache/spark/deploy/client/StandaloneAppClient.scala | 1 - core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala index bfda81da8393..eedf5e969e29 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala @@ -39,7 +39,6 @@ import org.apache.spark.util.{RpcUtils, ThreadUtils} * Takes a master URL, an app description, and a listener for cluster events, and calls * back the listener when various events occur. * - * * @param masterUrls Each url should look like spark://host:port. */ private[spark] class StandaloneAppClient( diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 25dcbe2a8b8f..57625ab8d8d3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -248,7 +248,7 @@ private[deploy] class Master( if (state == RecoveryState.STANDBY) { workerRef.send(MasterInStandby) } else { - // If a worker attempts to decommission that isn't registered ignore it. + // We use foreach since if it isn't registered we just skip it. idToWorker.get(id).foreach(decommissionWorker) } From 9476e22193c712b4a59e29cb347e87e5b651ddc9 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 12 Nov 2019 14:59:56 -0800 Subject: [PATCH 08/43] Now we ACK the decom msg --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 47927b2c08d4..476df84f77de 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -190,10 +190,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case UpdateDelegationTokens(newDelegationTokens) => updateDelegationTokens(newDelegationTokens) - case DecommissionExecutor(executorId) => - logError(s"Decommissioning executor ${executorId}") - decommissionExecutor(executorId) - case RemoveExecutor(executorId, reason) => // We will remove the executor's state and cannot restore it. However, the connection // between the driver and the executor may be still alive so that the executor won't exit @@ -269,6 +265,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } context.reply(true) + case DecommissionExecutor(executorId) => + logInfo(s"Received decommission executor message ${executorId}.") + context.reply(decommissionExecutor(executorId)) + case RemoveWorker(workerId, host, message) => removeWorker(workerId, host, message) context.reply(true) From 317c76bc1aeb310f9d446c2b9134db24824a6cc9 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 12 Nov 2019 15:08:14 -0800 Subject: [PATCH 09/43] Launch a task even in decom state to avoid a race condition. --- .../executor/CoarseGrainedExecutorBackend.scala | 12 ++++++++++-- .../cluster/CoarseGrainedSchedulerBackend.scala | 2 +- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index c880ddc62be7..4eb70bb76ef4 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -141,9 +141,17 @@ private[spark] class CoarseGrainedExecutorBackend( case LaunchTask(data) => if (executor == null) { exitExecutor(1, "Received LaunchTask command but executor was null") - } else if (decommissioned) { - logWarning("Asked to launch a task while decommissioned. Not launching.") } else { + if (decommissioned) { + logError("Asked to launch a task while decommissioned.") + driver match { + case Some(endpoint) => + logError("Sending DecommissionExecutor to driver") + endpoint.send(DecommissionExecutor(executorId)) + case _ => + logError("No registered driver to send Decommission to.") + } + } val taskDesc = TaskDescription.decode(data.value) logInfo("Got assigned task " + taskDesc.taskId) taskResources(taskDesc.taskId) = taskDesc.resources diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 476df84f77de..91337af14d9d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -424,7 +424,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } if (shouldDisable) { - logInfo(s"Decommissioning executor $executorId.") + logError(s"Decommissioning executor $executorId.") scheduler.executorDecommission(executorId) } shouldDisable From 86c0ff6fa6e17e69470d97ed78db645d16a825fa Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 12 Nov 2019 16:19:38 -0800 Subject: [PATCH 10/43] Allow tasks to launch in the executor base class as well --- .../scala/org/apache/spark/executor/Executor.scala | 11 +++++------ .../cluster/CoarseGrainedSchedulerBackend.scala | 2 +- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index a487b7a0f3e0..bd6355d1210f 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -248,12 +248,11 @@ private[spark] class Executor( } def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = { - if (!decommissioned) { - val tr = new TaskRunner(context, taskDescription) - runningTasks.put(taskDescription.taskId, tr) - threadPool.execute(tr) - } else { - log.info(s"Not launching task, executor is in decommissioned state") + val tr = new TaskRunner(context, taskDescription) + runningTasks.put(taskDescription.taskId, tr) + threadPool.execute(tr) + if (decommissioned) { + log.error(s"Launching a task while in decommissioned state.") } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 91337af14d9d..0b4193881c0a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -428,7 +428,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp scheduler.executorDecommission(executorId) } shouldDisable - } + } /** * Stop making resource offers for the given executor. The executor is marked as lost with From 88c89b1858bba6bdd5299b75837a6f1b5d2e1b8e Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 31 Dec 2019 18:26:01 -0800 Subject: [PATCH 11/43] Finish logical merge --- .../cluster/CoarseGrainedSchedulerBackend.scala | 17 +++++++---------- .../scheduler/WorkerDecommissionSuite.scala | 10 ++++++---- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 7acc43b42255..09bbd4f9a58c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -102,6 +102,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Executors that have been lost, but for which we don't yet know the real exit reason. private val executorsPendingLossReason = new HashSet[String] + // Executors which are being decommissioned + protected val executorsPendingDecommission = new HashSet[String] + // A map to store hostname with its possible task number running on it @GuardedBy("CoarseGrainedSchedulerBackend.this") protected var hostToLocalTaskCount: Map[String, Int] = Map.empty @@ -126,9 +129,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp override val rpcEnv: RpcEnv = CoarseGrainedSchedulerBackend.this.rpcEnv - // Executors which are being decommissioned - protected val executorsPendingDecommission = new HashSet[String] - protected val addressToExecutorId = new HashMap[RpcAddress, String] // Spark configuration sent to executors. This is a lazy val so that subclasses of the @@ -339,11 +339,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } } - private def executorIsAlive(executorId: String): Boolean = synchronized { - !executorsPendingToRemove.contains(executorId) && - !executorsPendingDecommission.contains(executorId) - } - // Launch tasks returned by a set of resource offers private def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = { for (task <- tasks.flatten) { @@ -420,7 +415,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp private def decommissionExecutor(executorId: String): Boolean = { val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized { // Only bother decommissioning executors which are alive. - if (executorIsAlive(executorId)) { + if (isExecutorActive(executorId)) { executorsPendingDecommission += executorId true } else { @@ -600,7 +595,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp override def isExecutorActive(id: String): Boolean = synchronized { executorDataMap.contains(id) && !executorsPendingToRemove.contains(id) && - !executorsPendingLossReason.contains(id) + !executorsPendingLossReason.contains(id) && + !executorsPendingDecommission.contains(id) + } override def maxNumConcurrentTasks(): Int = synchronized { diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala index 483c449fa9e1..20301f5a57c3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala @@ -53,20 +53,22 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { Thread.sleep(100) x } - // Start the task + // Start the task. val asyncCount = sleepyRdd.countAsync() - Thread.sleep(10) + // Give the job long enough to start. + Thread.sleep(20) // Decommission all the executors, this should not halt the current task. // The master passing message is tested with val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] val execs = sched.getExecutorIds() execs.foreach(execId => sched.decommissionExecutor(execId)) - assert(asyncCount.get() === 10) + val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 10.seconds) + assert(asyncCountResult === 10) // Try and launch task after decommissioning, this should fail val postDecommissioned = input.map(x => x) val postDecomAsyncCount = postDecommissioned.countAsync() val thrown = intercept[java.util.concurrent.TimeoutException]{ - val result = ThreadUtils.awaitResult(postDecomAsyncCount, 1.seconds) + val result = ThreadUtils.awaitResult(postDecomAsyncCount, 10.seconds) } assert(postDecomAsyncCount.isCompleted === false, "After exec decommission new task could not launch") From 0eb089fdd888f9306fa5416f4b0fd725a121c49e Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 31 Dec 2019 18:26:50 -0800 Subject: [PATCH 12/43] Start working on vanzin's feedback on the name and sync to async. --- .../apache/spark/scheduler/ExecutorLossReason.scala | 2 +- .../cluster/CoarseGrainedSchedulerBackend.scala | 10 ++++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala index 18579e25da01..71a4d0264350 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala @@ -65,4 +65,4 @@ case class SlaveLost(_message: String = "Slave lost", workerLost: Boolean = fals * This is used by the task scheduler to remove state associated with the executor, but * not yet fail any tasks that were running in the executor before the executor is "fully" lost. */ -private [spark] object WorkerDecommission extends ExecutorLossReason("Worker Decommission.") +private [spark] object ExecutorDecommission extends ExecutorLossReason("Executor Decommission.") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 09bbd4f9a58c..9d1c47093dba 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -198,6 +198,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor)) removeExecutor(executorId, reason) + case RemoveWorker(workerId, host, message) => + removeWorker(workerId, host, message) + case LaunchedExecutor(executorId) => executorDataMap.get(executorId).foreach { data => data.freeCores = data.totalCores @@ -275,10 +278,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp logInfo(s"Received decommission executor message ${executorId}.") context.reply(decommissionExecutor(executorId)) - case RemoveWorker(workerId, host, message) => - removeWorker(workerId, host, message) - context.reply(true) - case RetrieveSparkAppConfig => val reply = SparkAppConfig( sparkProperties, @@ -553,8 +552,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } protected def removeWorker(workerId: String, host: String, message: String): Unit = { - driverEndpoint.ask[Boolean](RemoveWorker(workerId, host, message)).failed.foreach(t => - logError(t.getMessage, t))(ThreadUtils.sameThread) + driverEndpoint.send(RemoveWorker(workerId, host, message)) } /** From 8bda4c7cf876d756c5366715d06f6138e959687c Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 9 Jan 2020 14:54:51 -0800 Subject: [PATCH 13/43] Code review feedback, improve test to use listener instead of sleep, remove term log since no longer relevant, and switch from ask to send for message since we don't really need the ack. --- .../apache/spark/deploy/master/Master.scala | 2 +- .../CoarseGrainedSchedulerBackend.scala | 13 ++++++------ .../org/apache/spark/util/SignalUtils.scala | 1 - .../scheduler/WorkerDecommissionSuite.scala | 21 +++++++++++++------ .../features/BasicExecutorFeatureStep.scala | 8 +++---- .../src/main/dockerfiles/spark/decom.sh | 20 ++++++++---------- 6 files changed, 35 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 57625ab8d8d3..71df5dfa423a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -248,7 +248,7 @@ private[deploy] class Master( if (state == RecoveryState.STANDBY) { workerRef.send(MasterInStandby) } else { - // We use foreach since if it isn't registered we just skip it. + // We use foreach since get gives us an option and we can skip the failures. idToWorker.get(id).foreach(decommissionWorker) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 9d1c47093dba..1a8849484efb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -198,6 +198,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor)) removeExecutor(executorId, reason) + case DecommissionExecutor(executorId) => + logInfo(s"Received decommission executor message ${executorId}.") + decommissionExecutor(executorId) + case RemoveWorker(workerId, host, message) => removeWorker(workerId, host, message) @@ -274,10 +278,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } context.reply(true) - case DecommissionExecutor(executorId) => - logInfo(s"Received decommission executor message ${executorId}.") - context.reply(decommissionExecutor(executorId)) - case RetrieveSparkAppConfig => val reply = SparkAppConfig( sparkProperties, @@ -384,6 +384,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp addressToExecutorId -= executorInfo.executorAddress executorDataMap -= executorId executorsPendingLossReason -= executorId + executorsPendingDecommission -= executorId executorsPendingToRemove.remove(executorId).getOrElse(false) } totalCoreCount.addAndGet(-executorInfo.totalCores) @@ -560,9 +561,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp */ private[spark] def decommissionExecutor(executorId: String): Unit = { // Only log the failure since we don't care about the result. - driverEndpoint.ask[Boolean](DecommissionExecutor(executorId)).onFailure { case t => - logError(t.getMessage, t) - }(ThreadUtils.sameThread) + driverEndpoint.send(DecommissionExecutor(executorId)) } def sufficientResourcesRegistered(): Boolean = true diff --git a/core/src/main/scala/org/apache/spark/util/SignalUtils.scala b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala index 1d507185802c..230195da2a12 100644 --- a/core/src/main/scala/org/apache/spark/util/SignalUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala @@ -64,7 +64,6 @@ private[spark] object SignalUtils extends Logging { new ActionHandler(new Signal(signal)) }) handler.register(action) - logInfo("Registered signal handler for " + signal) } catch { case ex: Exception => logWarning(s"Failed to register signal handler for " + signal, ex) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala index 20301f5a57c3..e48c75f5e882 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler +import java.util.concurrent.Semaphore + import scala.concurrent.TimeoutException import scala.concurrent.duration._ @@ -50,25 +52,32 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { // Do a count to wait for the executors to be registered. input.count() val sleepyRdd = input.mapPartitions{ x => - Thread.sleep(100) + Thread.sleep(50) x } + // Listen for the job + val sem = new Semaphore(0) + sc.addSparkListener(new SparkListener { + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { + sem.release() + } + }) // Start the task. val asyncCount = sleepyRdd.countAsync() - // Give the job long enough to start. - Thread.sleep(20) + // Wait for the job to have started + sem.acquire(1) // Decommission all the executors, this should not halt the current task. - // The master passing message is tested with + // decom.sh message passing is tested manually. val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] val execs = sched.getExecutorIds() execs.foreach(execId => sched.decommissionExecutor(execId)) - val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 10.seconds) + val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 2.seconds) assert(asyncCountResult === 10) // Try and launch task after decommissioning, this should fail val postDecommissioned = input.map(x => x) val postDecomAsyncCount = postDecommissioned.countAsync() val thrown = intercept[java.util.concurrent.TimeoutException]{ - val result = ThreadUtils.awaitResult(postDecomAsyncCount, 10.seconds) + val result = ThreadUtils.awaitResult(postDecomAsyncCount, 2.seconds) } assert(postDecomAsyncCount.isCompleted === false, "After exec decommission new task could not launch") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 203c7d6cfb55..b992191f5a0d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -193,11 +193,11 @@ private[spark] class BasicExecutorFeatureStep( .endResources() .build() }.getOrElse(executorContainer) - val containerWithLifecycle = kubernetesConf.workerDecommissioning match { - case false => + val containerWithLifecycle = + if (!kubernetesConf.workerDecommissioning) { logInfo("Decommissioning not enabled, skipping shutdown script") containerWithLimitCores - case true => + } else { logInfo("Adding decommission script to lifecycle") new ContainerBuilder(containerWithLimitCores).withNewLifecycle() .withNewPreStop() @@ -207,7 +207,7 @@ private[spark] class BasicExecutorFeatureStep( .endPreStop() .endLifecycle() .build() - } + } val ownerReference = kubernetesConf.driverPod.map { pod => new OwnerReferenceBuilder() .withController(true) diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh index e480f61df27c..26bddac171be 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh @@ -19,20 +19,18 @@ set -ex -export LOG=/dev/termination-log -echo "Asked to decommission" > ${LOG} +echo "Asked to decommission" # Find the pid to signal date | tee -a ${LOG} -WORKER_PID=$(ps axf | grep java |grep org.apache.spark.executor.CoarseGrainedExecutorBackend | grep -v grep) -echo "Using worker pid $WORKER_PID" | tee -a ${LOG} -kill -s SIGPWR ${WORKER_PID} | tee -a ${LOG} +WORKER_PID=$(ps axf | grep java | grep org.apache.spark.executor.CoarseGrainedExecutorBackend | grep -v grep) +echo "Using worker pid $WORKER_PID" +kill -s SIGPWR ${WORKER_PID} # For now we expect this to timeout, since we don't start exiting the backend. -echo "Waiting for worker pid to exit" |tee -a ${LOG} -date | tee -a ${LOG} +echo "Waiting for worker pid to exit" # If the worker does exit stop blocking the cleanup. -timeout 60 tail --pid=${WORKER_PID} -f /dev/null | tee -a ${LOG} -date | tee -a ${LOG} -echo "Done" | tee -a ${LOG} -date | tee -a ${LOG} +timeout 60 tail --pid=${WORKER_PID} -f /dev/null +date +echo "Done" +date echo "Term log was:" cat $LOG From dcebf8c35d20fed19d9f9ca51ce293217af5481c Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 9 Jan 2020 14:59:45 -0800 Subject: [PATCH 14/43] Cleanup stray newline and fix wording in comment. --- .../scala/org/apache/spark/scheduler/ExecutorLossReason.scala | 2 +- .../org/apache/spark/scheduler/WorkerDecommissionSuite.scala | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala index 71a4d0264350..d79a10da15be 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala @@ -60,7 +60,7 @@ case class SlaveLost(_message: String = "Slave lost", workerLost: Boolean = fals extends ExecutorLossReason(_message) /** - * A loss reason that means the worker is marked for decommissioning. + * A loss reason that means the executor is marked for decommissioning. * * This is used by the task scheduler to remove state associated with the executor, but * not yet fail any tasks that were running in the executor before the executor is "fully" lost. diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala index e48c75f5e882..9190b8606363 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala @@ -29,7 +29,6 @@ import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils} class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { - override def beforeEach(): Unit = { val conf = new SparkConf().setAppName("test").setMaster("local") .set(config.Worker.WORKER_DECOMMISSION_ENABLED.key, "true") From 7c02a63c20abee407f2723e7249777cacd88914e Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sun, 12 Jan 2020 16:56:06 -0800 Subject: [PATCH 15/43] Look into failures in K8s jenkins job --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 4 +++- .../spark/deploy/k8s/integrationtest/KubernetesSuite.scala | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index ddea95945521..64e908a8a4b9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -199,7 +199,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp removeExecutor(executorId, reason) case DecommissionExecutor(executorId) => - logInfo(s"Received decommission executor message ${executorId}.") + logError(s"Received decommission executor message ${executorId}.") decommissionExecutor(executorId) case RemoveWorker(workerId, host, message) => @@ -426,6 +426,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp if (shouldDisable) { logError(s"Decommissioning executor $executorId.") scheduler.executorDecommission(executorId) + } else { + logError(s"Skipping decommissioning of executor $executorId.") } shouldDisable } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index aadfbe7ad95b..398476bd8017 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -326,7 +326,7 @@ class KubernetesSuite extends SparkFunSuite } // Sleep a small interval to allow execution of job logDebug("Sleeping before killing pod.") - Thread.sleep(5000) + Thread.sleep(2000) // Delete the pod to simulate cluster scale down/migration. val pod = kubernetesTestComponents.kubernetesClient.pods().withName(name) pod.delete() From f0aafa8ce8876d3972b9107f82e31f054929869d Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 14 Jan 2020 18:14:08 -0800 Subject: [PATCH 16/43] We need ps to find the pid of the java process --- .../kubernetes/docker/src/main/dockerfiles/spark/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile index f6d0c35b5235..a076bb042c49 100644 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile @@ -29,7 +29,7 @@ ARG spark_uid=185 RUN set -ex && \ apt-get update && \ ln -s /lib /lib64 && \ - apt install -y bash tini libc6 libpam-modules krb5-user libnss3 && \ + apt install -y bash tini libc6 libpam-modules krb5-user libnss3 procps && \ mkdir -p /opt/spark && \ mkdir -p /opt/spark/examples && \ mkdir -p /opt/spark/work-dir && \ From 5f65e2781de6261ab2b91283cb2361fe30a53710 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 14 Jan 2020 18:40:03 -0800 Subject: [PATCH 17/43] Simplify decom script to not depend on strings --- .../kubernetes/docker/src/main/dockerfiles/spark/decom.sh | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh index 26bddac171be..397bd2f58c44 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh @@ -22,7 +22,7 @@ set -ex echo "Asked to decommission" # Find the pid to signal date | tee -a ${LOG} -WORKER_PID=$(ps axf | grep java | grep org.apache.spark.executor.CoarseGrainedExecutorBackend | grep -v grep) +WORKER_PID=$(ps -o pid -C java | tail -n 1) echo "Using worker pid $WORKER_PID" kill -s SIGPWR ${WORKER_PID} # For now we expect this to timeout, since we don't start exiting the backend. @@ -32,5 +32,4 @@ timeout 60 tail --pid=${WORKER_PID} -f /dev/null date echo "Done" date -echo "Term log was:" -cat $LOG +sleep 30 From 271984d16e335630c57a17f96adc947257a176ee Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 14 Jan 2020 19:04:41 -0800 Subject: [PATCH 18/43] Temporarily boosy decom time for manual debugging. --- .../kubernetes/integration-tests/tests/decommissioning.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py index 088bbf993b2c..c873bb18f62b 100644 --- a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py +++ b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py @@ -36,7 +36,7 @@ rdd = sc.parallelize(range(10)) rdd.collect() print("Waiting to give nodes time to finish.") - time.sleep(10) + time.sleep(2400) rdd.collect() print("Stopping spark") spark.stop() From 2141df0610fb40ae7e1e9174334a46ae6e5cf189 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 14 Jan 2020 19:13:55 -0800 Subject: [PATCH 19/43] Revert "Temporarily boosy decom time for manual debugging." This reverts commit 271984d16e335630c57a17f96adc947257a176ee. --- .../kubernetes/integration-tests/tests/decommissioning.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py index c873bb18f62b..088bbf993b2c 100644 --- a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py +++ b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py @@ -36,7 +36,7 @@ rdd = sc.parallelize(range(10)) rdd.collect() print("Waiting to give nodes time to finish.") - time.sleep(2400) + time.sleep(10) rdd.collect() print("Stopping spark") spark.stop() From d4dcb8285df3630175db49cf19b695e13e1e0e88 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 14 Jan 2020 19:21:46 -0800 Subject: [PATCH 20/43] We skip errors since we don't care. --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 64e908a8a4b9..83e850bac5d0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -562,8 +562,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * Called by subclasses when notified of a decommissioning worker. */ private[spark] def decommissionExecutor(executorId: String): Unit = { - // Only log the failure since we don't care about the result. - driverEndpoint.send(DecommissionExecutor(executorId)) + if (driverEndpoint != null) { + driverEndpoint.send(DecommissionExecutor(executorId)) + } } def sufficientResourcesRegistered(): Boolean = true From eea0da07dcbe85f681d926b6f16b089472fd0310 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 14 Jan 2020 22:20:54 -0800 Subject: [PATCH 21/43] Try and debug the weird new behaviour --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 6 ++++-- .../scheduler/cluster/StandaloneSchedulerBackend.scala | 3 ++- .../kubernetes/integration-tests/tests/decommissioning.py | 4 ++++ 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 83e850bac5d0..6c681b915b61 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -199,7 +199,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp removeExecutor(executorId, reason) case DecommissionExecutor(executorId) => - logError(s"Received decommission executor message ${executorId}.") + logInfo(s"Received decommission executor message ${executorId}.") decommissionExecutor(executorId) case RemoveWorker(workerId, host, message) => @@ -426,6 +426,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp if (shouldDisable) { logError(s"Decommissioning executor $executorId.") scheduler.executorDecommission(executorId) + logError(s"Finished decommissioning executor $executorId.") } else { logError(s"Skipping decommissioning of executor $executorId.") } @@ -559,10 +560,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } /** - * Called by subclasses when notified of a decommissioning worker. + * Called by subclasses when notified of a decommissioning executor. */ private[spark] def decommissionExecutor(executorId: String): Unit = { if (driverEndpoint != null) { + logInfo("Propegating executor decommission to driver.") driverEndpoint.send(DecommissionExecutor(executorId)) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 02f612d6c990..f3db30d31453 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -174,8 +174,9 @@ private[spark] class StandaloneSchedulerBackend( } override def executorDecommissioned(fullId: String, message: String) { - logInfo("Executor %s decommissioned: %s".format(fullId, message)) + logInfo("Asked to decommission executor") decommissionExecutor(fullId.split("/")(1)) + logInfo("Executor %s decommissioned: %s".format(fullId, message)) } override def workerRemoved(workerId: String, host: String, message: String): Unit = { diff --git a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py index 088bbf993b2c..75a71fb40b13 100644 --- a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py +++ b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py @@ -38,6 +38,10 @@ print("Waiting to give nodes time to finish.") time.sleep(10) rdd.collect() + print("Waiting some more....") + time.sleep(2400) + rdd.collect() print("Stopping spark") spark.stop() + print("Done, exiting Python") sys.exit(0) From 940f4c3b32f0f700358de20c26ab5cabed6990f5 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 14 Jan 2020 22:33:40 -0800 Subject: [PATCH 22/43] Debug decomm failures --- .../cluster/CoarseGrainedSchedulerBackend.scala | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 6c681b915b61..ab69e25f9fbe 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -424,11 +424,16 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } if (shouldDisable) { - logError(s"Decommissioning executor $executorId.") - scheduler.executorDecommission(executorId) - logError(s"Finished decommissioning executor $executorId.") + logInfo(s"Starting decommissioning executor $executorId.") + try { + scheduler.executorDecommission(executorId) + } catch { + case e: Exception => + logError(s"Unexpected error during decommissioning ${e.toString}", e) + } + logInfo(s"Finished decommissioning executor $executorId.") } else { - logError(s"Skipping decommissioning of executor $executorId.") + logInfo(s"Skipping decommissioning of executor $executorId.") } shouldDisable } From 2c7c1ba36b5f0050033f12d7f7ab9d85fb2adc96 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 14 Jan 2020 22:51:53 -0800 Subject: [PATCH 23/43] Figure out why the early exit --- .../executor/CoarseGrainedExecutorBackend.scala | 5 +++-- .../org/apache/spark/util/SignalUtils.scala | 1 + .../src/main/dockerfiles/spark/entrypoint.sh | 6 +++--- .../dev/dev-run-integration-tests.sh | 1 + .../k8s/integrationtest/DecommissionSuite.scala | 3 ++- .../k8s/integrationtest/KubernetesSuite.scala | 16 ++++++++-------- .../integration-tests/tests/decommissioning.py | 2 +- 7 files changed, 19 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 31b9833533ed..e276b7cb6779 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -17,6 +17,7 @@ package org.apache.spark.executor +import java.lang.Thread import java.net.URL import java.nio.ByteBuffer import java.util.Locale @@ -148,7 +149,7 @@ private[spark] class CoarseGrainedExecutorBackend( logError("Asked to launch a task while decommissioned.") driver match { case Some(endpoint) => - logError("Sending DecommissionExecutor to driver") + logInfo("Sending DecommissionExecutor to driver") endpoint.send(DecommissionExecutor(executorId)) case _ => logError("No registered driver to send Decommission to.") @@ -238,7 +239,7 @@ private[spark] class CoarseGrainedExecutorBackend( } private def decommissionSelf(): Boolean = { - logError("Decommissioning self") + logInfo("Decommissioning self") try { decommissioned = true // Tell master we are are decommissioned so it stops trying to schedule us diff --git a/core/src/main/scala/org/apache/spark/util/SignalUtils.scala b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala index 230195da2a12..67eb54f64a65 100644 --- a/core/src/main/scala/org/apache/spark/util/SignalUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala @@ -98,6 +98,7 @@ private[spark] object SignalUtils extends Logging { // after reaching a first false predicate. val escalate = actions.asScala.map(action => action()).forall(_ == false) if (escalate) { + println("Escalating signal...") prevHandler.handle(sig) } diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh index 6ee3523c8eda..05ab782caeca 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh @@ -30,9 +30,9 @@ set -e # If there is no passwd entry for the container UID, attempt to create one if [ -z "$uidentry" ] ; then if [ -w /etc/passwd ] ; then - echo "$myuid:x:$myuid:$mygid:${SPARK_USER_NAME:-anonymous uid}:$SPARK_HOME:/bin/false" >> /etc/passwd + echo "$myuid:x:$myuid:$mygid:${SPARK_USER_NAME:-anonymous uid}:$SPARK_HOME:/bin/false" >> /etc/passwd else - echo "Container ENTRYPOINT failed to add passwd entry for anonymous UID" + echo "Container ENTRYPOINT failed to add passwd entry for anonymous UID" fi fi @@ -59,7 +59,7 @@ fi # If HADOOP_HOME is set and SPARK_DIST_CLASSPATH is not set, set it here so Hadoop jars are available to the executor. # It does not set SPARK_DIST_CLASSPATH if already set, to avoid overriding customizations of this value from elsewhere e.g. Docker/K8s. if [ -n ${HADOOP_HOME} ] && [ -z ${SPARK_DIST_CLASSPATH} ]; then - export SPARK_DIST_CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath) + export SPARK_DIST_CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath) fi if ! [ -z ${HADOOP_CONF_DIR+x} ]; then diff --git a/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh b/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh index cdcb95d6f491..b49f8fa8139b 100755 --- a/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh +++ b/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh @@ -158,6 +158,7 @@ properties+=( -Dspark.kubernetes.test.jvmImage=$JVM_IMAGE_NAME -Dspark.kubernetes.test.pythonImage=$PYTHON_IMAGE_NAME -Dspark.kubernetes.test.rImage=$R_IMAGE_NAME + -Dlog4j.logger.org.apache.spark=DEBUG ) $TEST_ROOT_DIR/build/mvn integration-test -f $TEST_ROOT_DIR/pom.xml -pl resource-managers/kubernetes/integration-tests -am -Pscala-$SCALA_VERSION -Pkubernetes -Pkubernetes-integration-tests ${properties[@]} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala index 4f75114e2647..a27082d0d912 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala @@ -32,7 +32,8 @@ private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite => runSparkApplicationAndVerifyCompletion( appResource = PYSPARK_DECOMISSIONING, mainClass = "", - expectedLogOnCompletion = Seq("Decommissioning executor"), + expectedLogOnCompletion = Seq("Decommissioning executor", + "Finished waiting, stopping Spark"), appArgs = Array.empty[String], driverPodChecker = doBasicDriverPyPodCheck, executorPodChecker = doBasicExecutorPyPodCheck, diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 398476bd8017..60a78d8b8a8e 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -40,9 +40,9 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ class KubernetesSuite extends SparkFunSuite - with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite + with BeforeAndAfterAll with BeforeAndAfter /* with BasicTestsSuite with SecretsTestsSuite with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite with PVTestsSuite - with DepsTestsSuite with DecommissionSuite with RTestsSuite with Logging with Eventually + with DepsTestsSuite */ with DecommissionSuite /* with RTestsSuite */ with Logging with Eventually with Matchers { @@ -303,7 +303,7 @@ class KubernetesSuite extends SparkFunSuite .withLabel("spark-app-locator", appLocator) .withLabel("spark-role", "executor") .watch(new Watcher[Pod] { - logInfo("Beginning watch of executors") + println("Beginning watch of executors") override def onClose(cause: KubernetesClientException): Unit = logInfo("Ending watch of executors") override def eventReceived(action: Watcher.Action, resource: Pod): Unit = { @@ -313,24 +313,24 @@ class KubernetesSuite extends SparkFunSuite case Action.MODIFIED => execPods(name) = resource case Action.ADDED => - logInfo(s"Add event received for $name.") + println(s"Add event received for $name.") execPods(name) = resource // If testing decommissioning start a thread to simulate // decommissioning. if (decommissioningTest && execPods.size == 1) { // Wait for all the containers in the pod to be running - logDebug("Waiting for first pod to become OK prior to deletion") + println("Waiting for first pod to become OK prior to deletion") Eventually.eventually(patienceTimeout, patienceInterval) { val result = checkPodReady(namespace, name) result shouldBe (true) } // Sleep a small interval to allow execution of job - logDebug("Sleeping before killing pod.") - Thread.sleep(2000) + println("Sleeping before killing pod.") + Thread.sleep(700000) // Delete the pod to simulate cluster scale down/migration. val pod = kubernetesTestComponents.kubernetesClient.pods().withName(name) pod.delete() - logDebug(s"Pod: $name deleted") + println(s"Triggered pod decom/delete: $name deleted") } else { } case Action.DELETED | Action.ERROR => diff --git a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py index 75a71fb40b13..061f972cad3f 100644 --- a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py +++ b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py @@ -41,7 +41,7 @@ print("Waiting some more....") time.sleep(2400) rdd.collect() - print("Stopping spark") + print("Finished waiting, stopping Spark.") spark.stop() print("Done, exiting Python") sys.exit(0) From c72f6765fa3c2a8ba145d2f7f89eae3c806b74ed Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 18 Jan 2020 11:40:05 -0800 Subject: [PATCH 24/43] Handle whitespace from ps --- .../kubernetes/docker/src/main/dockerfiles/spark/decom.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh index 397bd2f58c44..8a5208d49a70 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh @@ -22,7 +22,7 @@ set -ex echo "Asked to decommission" # Find the pid to signal date | tee -a ${LOG} -WORKER_PID=$(ps -o pid -C java | tail -n 1) +WORKER_PID=$(ps -o pid -C java | tail -n 1| awk '{ sub(/^[ \t]+/, ""); print }') echo "Using worker pid $WORKER_PID" kill -s SIGPWR ${WORKER_PID} # For now we expect this to timeout, since we don't start exiting the backend. From 47e9107f4445743caa51f5eca6d155b16d81e2f6 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 18 Jan 2020 19:51:27 -0800 Subject: [PATCH 25/43] Make sure decom job can finish too after decom --- .../kubernetes/integration-tests/tests/decommissioning.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py index 061f972cad3f..7e86abc5f3f6 100644 --- a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py +++ b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py @@ -36,10 +36,10 @@ rdd = sc.parallelize(range(10)) rdd.collect() print("Waiting to give nodes time to finish.") - time.sleep(10) + time.sleep(5) rdd.collect() print("Waiting some more....") - time.sleep(2400) + time.sleep(5) rdd.collect() print("Finished waiting, stopping Spark.") spark.stop() From 95f24ab33b149713cd8b0c90b3c4fb65c64829d3 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 18 Jan 2020 19:52:29 -0800 Subject: [PATCH 26/43] Cleanup some debugging and re-enable the tests --- .../executor/CoarseGrainedExecutorBackend.scala | 1 - .../org/apache/spark/util/SignalUtils.scala | 1 - .../k8s/integrationtest/KubernetesSuite.scala | 17 ++++++++--------- 3 files changed, 8 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index e276b7cb6779..14871a2ffa81 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -17,7 +17,6 @@ package org.apache.spark.executor -import java.lang.Thread import java.net.URL import java.nio.ByteBuffer import java.util.Locale diff --git a/core/src/main/scala/org/apache/spark/util/SignalUtils.scala b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala index 67eb54f64a65..230195da2a12 100644 --- a/core/src/main/scala/org/apache/spark/util/SignalUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/SignalUtils.scala @@ -98,7 +98,6 @@ private[spark] object SignalUtils extends Logging { // after reaching a first false predicate. val escalate = actions.asScala.map(action => action()).forall(_ == false) if (escalate) { - println("Escalating signal...") prevHandler.handle(sig) } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 60a78d8b8a8e..61e1f27b5546 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -40,9 +40,9 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ class KubernetesSuite extends SparkFunSuite - with BeforeAndAfterAll with BeforeAndAfter /* with BasicTestsSuite with SecretsTestsSuite + with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite with PVTestsSuite - with DepsTestsSuite */ with DecommissionSuite /* with RTestsSuite */ with Logging with Eventually + with DepsTestsSuite with DecommissionSuite with RTestsSuite with Logging with Eventually with Matchers { @@ -303,7 +303,7 @@ class KubernetesSuite extends SparkFunSuite .withLabel("spark-app-locator", appLocator) .withLabel("spark-role", "executor") .watch(new Watcher[Pod] { - println("Beginning watch of executors") + logDebug("Beginning watch of executors") override def onClose(cause: KubernetesClientException): Unit = logInfo("Ending watch of executors") override def eventReceived(action: Watcher.Action, resource: Pod): Unit = { @@ -313,25 +313,24 @@ class KubernetesSuite extends SparkFunSuite case Action.MODIFIED => execPods(name) = resource case Action.ADDED => - println(s"Add event received for $name.") + logDebug(s"Add event received for $name.") execPods(name) = resource // If testing decommissioning start a thread to simulate // decommissioning. if (decommissioningTest && execPods.size == 1) { // Wait for all the containers in the pod to be running - println("Waiting for first pod to become OK prior to deletion") + logDebug("Waiting for first pod to become OK prior to deletion") Eventually.eventually(patienceTimeout, patienceInterval) { val result = checkPodReady(namespace, name) result shouldBe (true) } // Sleep a small interval to allow execution of job - println("Sleeping before killing pod.") - Thread.sleep(700000) + logDebug("Sleeping before killing pod.") + Thread.sleep(2000) // Delete the pod to simulate cluster scale down/migration. val pod = kubernetesTestComponents.kubernetesClient.pods().withName(name) pod.delete() - println(s"Triggered pod decom/delete: $name deleted") - } else { + logDebug(s"Triggered pod decom/delete: $name deleted") } case Action.DELETED | Action.ERROR => execPods.remove(name) From cf3a6d682e0d88b54a24a29ace66ed4f182726ce Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 24 Jan 2020 16:20:35 -0800 Subject: [PATCH 27/43] PR feedback don't use a stringly typed true for the config in thest --- .../org/apache/spark/scheduler/WorkerDecommissionSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala index 9190b8606363..381574738507 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala @@ -31,7 +31,7 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { override def beforeEach(): Unit = { val conf = new SparkConf().setAppName("test").setMaster("local") - .set(config.Worker.WORKER_DECOMMISSION_ENABLED.key, "true") + .set(config.Worker.WORKER_DECOMMISSION_ENABLED.key, true) sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf) } From 1b9f83d614c96b5191c09eef75a4b56d78fa9499 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 24 Jan 2020 16:25:42 -0800 Subject: [PATCH 28/43] Use string because of overloaded methods --- .../org/apache/spark/scheduler/WorkerDecommissionSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala index 381574738507..9190b8606363 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala @@ -31,7 +31,7 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { override def beforeEach(): Unit = { val conf = new SparkConf().setAppName("test").setMaster("local") - .set(config.Worker.WORKER_DECOMMISSION_ENABLED.key, true) + .set(config.Worker.WORKER_DECOMMISSION_ENABLED.key, "true") sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf) } From 556191b729eca733c21f729d46912c1784b10390 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 24 Jan 2020 16:58:57 -0800 Subject: [PATCH 29/43] Log when we receive unexpected messages --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 65417b8be9ab..95facace19e6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -210,6 +210,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp data.freeCores = data.totalCores } makeOffers(executorId) + case e => + logError(s"Received unexpected message. ${e}") } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { @@ -291,6 +293,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp Option(delegationTokens.get()), rp) context.reply(reply) + case e => + logError(s"Received unexpected ask ${e}") } // Make fake resource offers on all executors From d07770d3eaf67e2b1a522fcdcebf5f473fe868f4 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 24 Jan 2020 16:59:11 -0800 Subject: [PATCH 30/43] Give us a bit more time. --- .../kubernetes/integration-tests/tests/decommissioning.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py index 7e86abc5f3f6..bae76c84c51d 100644 --- a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py +++ b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py @@ -39,7 +39,7 @@ time.sleep(5) rdd.collect() print("Waiting some more....") - time.sleep(5) + time.sleep(10) rdd.collect() print("Finished waiting, stopping Spark.") spark.stop() From 5dee0ddfcc7c123158ee146afad3536feb371d65 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 24 Jan 2020 17:07:52 -0800 Subject: [PATCH 31/43] Ask for decom rather than send incase that is what is making the logging not work as expected during the test. --- .../apache/spark/executor/CoarseGrainedExecutorBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 04ba9176bdee..3968260e3650 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -233,7 +233,7 @@ private[spark] class CoarseGrainedExecutorBackend( decommissioned = true // Tell master we are are decommissioned so it stops trying to schedule us if (driver.nonEmpty) { - driver.get.send(DecommissionExecutor(executorId)) + driver.get.ask[boolean](DecommissionExecutor(executorId)) } else { logError("No driver to message decommissioning.") } From 0c616fc089ced83bdea0cfb87ec9643006e5c1dd Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 24 Jan 2020 17:08:16 -0800 Subject: [PATCH 32/43] Like remove worker support ask OR send for Decomm --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 95facace19e6..7c9b53941c8b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -282,6 +282,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp removeWorker(workerId, host, message) context.reply(true) + case DecommissionExecutor(executorId) => + logInfo(s"Received decommission executor message ${executorId}.") + decommissionExecutor(executorId) + context.reply(true) + case RetrieveSparkAppConfig(resourceProfileId) => // note this will be updated in later prs to get the ResourceProfile from a // ResourceProfileManager that matches the resource profile id From 28566faa7f2fb33a171a3c53348115c7130e92ec Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 24 Jan 2020 17:08:57 -0800 Subject: [PATCH 33/43] spelling --- .../apache/spark/executor/CoarseGrainedExecutorBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 3968260e3650..8b1ef04f33b0 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -233,7 +233,7 @@ private[spark] class CoarseGrainedExecutorBackend( decommissioned = true // Tell master we are are decommissioned so it stops trying to schedule us if (driver.nonEmpty) { - driver.get.ask[boolean](DecommissionExecutor(executorId)) + driver.get.ask[Boolean](DecommissionExecutor(executorId)) } else { logError("No driver to message decommissioning.") } From 9377eb0387e984f6d5518c92e8eec5da665bde8a Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 24 Jan 2020 17:09:18 -0800 Subject: [PATCH 34/43] spelling --- .../apache/spark/executor/CoarseGrainedExecutorBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 8b1ef04f33b0..578793634586 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -233,7 +233,7 @@ private[spark] class CoarseGrainedExecutorBackend( decommissioned = true // Tell master we are are decommissioned so it stops trying to schedule us if (driver.nonEmpty) { - driver.get.ask[Boolean](DecommissionExecutor(executorId)) + driver.get.askSync[Boolean](DecommissionExecutor(executorId)) } else { logError("No driver to message decommissioning.") } From 4a55f3d05fada93119c0264df4f312ecc2e27504 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 24 Jan 2020 17:13:07 -0800 Subject: [PATCH 35/43] Forgot to take the .key off when I made the last change. --- .../org/apache/spark/scheduler/WorkerDecommissionSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala index 9190b8606363..15733b0d932e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala @@ -31,7 +31,7 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { override def beforeEach(): Unit = { val conf = new SparkConf().setAppName("test").setMaster("local") - .set(config.Worker.WORKER_DECOMMISSION_ENABLED.key, "true") + .set(config.Worker.WORKER_DECOMMISSION_ENABLED, true) sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf) } From 0c7182a8d10178c73bdb698266c6b2447c57df9b Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 24 Jan 2020 17:41:17 -0800 Subject: [PATCH 36/43] Add some debugging messages. --- .../spark/executor/CoarseGrainedExecutorBackend.scala | 4 ++-- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 578793634586..91a01d8f8657 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -138,7 +138,7 @@ private[spark] class CoarseGrainedExecutorBackend( logError("Asked to launch a task while decommissioned.") driver match { case Some(endpoint) => - logInfo("Sending DecommissionExecutor to driver") + logInfo("Sending DecommissionExecutor to driver.") endpoint.send(DecommissionExecutor(executorId)) case _ => logError("No registered driver to send Decommission to.") @@ -228,7 +228,7 @@ private[spark] class CoarseGrainedExecutorBackend( } private def decommissionSelf(): Boolean = { - logInfo("Decommissioning self") + logInfo("Decommissioning self w/sync") try { decommissioned = true // Tell master we are are decommissioned so it stops trying to schedule us diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 7c9b53941c8b..f96e2c584824 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -199,7 +199,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp removeExecutor(executorId, reason) case DecommissionExecutor(executorId) => - logInfo(s"Received decommission executor message ${executorId}.") + println("****DECOM NO RPLY****") + logError(s"Received decommission executor message ${executorId}.") decommissionExecutor(executorId) case RemoveWorker(workerId, host, message) => @@ -283,7 +284,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp context.reply(true) case DecommissionExecutor(executorId) => - logInfo(s"Received decommission executor message ${executorId}.") + println("****DECOM RPLY***") + logError(s"Received decommission executor message ${executorId}.") decommissionExecutor(executorId) context.reply(true) From 018e3a872603548fa2137f41cd5e8cf8163c611f Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 24 Jan 2020 18:04:30 -0800 Subject: [PATCH 37/43] Looking for the wrong magic string --- .../spark/deploy/k8s/integrationtest/DecommissionSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala index a27082d0d912..f5eab6e4bbad 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala @@ -32,7 +32,7 @@ private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite => runSparkApplicationAndVerifyCompletion( appResource = PYSPARK_DECOMISSIONING, mainClass = "", - expectedLogOnCompletion = Seq("Decommissioning executor", + expectedLogOnCompletion = Seq("decommissioning executor", "Finished waiting, stopping Spark"), appArgs = Array.empty[String], driverPodChecker = doBasicDriverPyPodCheck, From da7faf2aa43e596241b5432526467118341d72e1 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 24 Jan 2020 18:05:57 -0800 Subject: [PATCH 38/43] Revert "Ask for decom rather than send incase that is what is making the logging not work as expected during the test." We were looking for the wrong magic string. This reverts commit 5dee0ddfcc7c123158ee146afad3536feb371d65. --- .../apache/spark/executor/CoarseGrainedExecutorBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 91a01d8f8657..dfca860ea6ed 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -233,7 +233,7 @@ private[spark] class CoarseGrainedExecutorBackend( decommissioned = true // Tell master we are are decommissioned so it stops trying to schedule us if (driver.nonEmpty) { - driver.get.askSync[Boolean](DecommissionExecutor(executorId)) + driver.get.send(DecommissionExecutor(executorId)) } else { logError("No driver to message decommissioning.") } From 5bdfbfaf4e7ff721a53c6ecfca98b1e63d156f3b Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 24 Jan 2020 18:08:27 -0800 Subject: [PATCH 39/43] Revert "Revert "Ask for decom rather than send incase that is what is making the logging not work as expected during the test."" This reverts commit da7faf2aa43e596241b5432526467118341d72e1. --- .../apache/spark/executor/CoarseGrainedExecutorBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index dfca860ea6ed..91a01d8f8657 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -233,7 +233,7 @@ private[spark] class CoarseGrainedExecutorBackend( decommissioned = true // Tell master we are are decommissioned so it stops trying to schedule us if (driver.nonEmpty) { - driver.get.send(DecommissionExecutor(executorId)) + driver.get.askSync[Boolean](DecommissionExecutor(executorId)) } else { logError("No driver to message decommissioning.") } From 15179e5046503815e1b4796126bd24f14351239b Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 30 Jan 2020 10:42:50 -0800 Subject: [PATCH 40/43] Remove debugging printlns --- .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index f96e2c584824..19ba881972aa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -199,7 +199,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp removeExecutor(executorId, reason) case DecommissionExecutor(executorId) => - println("****DECOM NO RPLY****") logError(s"Received decommission executor message ${executorId}.") decommissionExecutor(executorId) @@ -284,7 +283,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp context.reply(true) case DecommissionExecutor(executorId) => - println("****DECOM RPLY***") logError(s"Received decommission executor message ${executorId}.") decommissionExecutor(executorId) context.reply(true) From 02d1668af9410d6c015b9282502416faca55e810 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 30 Jan 2020 14:29:13 -0800 Subject: [PATCH 41/43] [TEMPORARY] - Debug the failure in Jenkins --- .../deploy/k8s/integrationtest/KubernetesSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 61e1f27b5546..c9ef522b7f80 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -313,24 +313,24 @@ class KubernetesSuite extends SparkFunSuite case Action.MODIFIED => execPods(name) = resource case Action.ADDED => - logDebug(s"Add event received for $name.") + println(s"Add event received for $name.") execPods(name) = resource // If testing decommissioning start a thread to simulate // decommissioning. if (decommissioningTest && execPods.size == 1) { // Wait for all the containers in the pod to be running - logDebug("Waiting for first pod to become OK prior to deletion") + println("Waiting for first pod to become OK prior to deletion") Eventually.eventually(patienceTimeout, patienceInterval) { val result = checkPodReady(namespace, name) result shouldBe (true) } // Sleep a small interval to allow execution of job - logDebug("Sleeping before killing pod.") - Thread.sleep(2000) + println("Sleeping before killing pod.") + Thread.sleep(5000) // Delete the pod to simulate cluster scale down/migration. val pod = kubernetesTestComponents.kubernetesClient.pods().withName(name) pod.delete() - logDebug(s"Triggered pod decom/delete: $name deleted") + println(s"Triggered pod decom/delete: $name deleted") } case Action.DELETED | Action.ERROR => execPods.remove(name) From 0a7d084744dc0e647ce54923375e0fc80e7620f0 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 30 Jan 2020 22:09:45 -0800 Subject: [PATCH 42/43] Revert "[TEMPORARY] - Debug the failure in Jenkins" This reverts commit 02d1668af9410d6c015b9282502416faca55e810. --- .../deploy/k8s/integrationtest/KubernetesSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index c9ef522b7f80..61e1f27b5546 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -313,24 +313,24 @@ class KubernetesSuite extends SparkFunSuite case Action.MODIFIED => execPods(name) = resource case Action.ADDED => - println(s"Add event received for $name.") + logDebug(s"Add event received for $name.") execPods(name) = resource // If testing decommissioning start a thread to simulate // decommissioning. if (decommissioningTest && execPods.size == 1) { // Wait for all the containers in the pod to be running - println("Waiting for first pod to become OK prior to deletion") + logDebug("Waiting for first pod to become OK prior to deletion") Eventually.eventually(patienceTimeout, patienceInterval) { val result = checkPodReady(namespace, name) result shouldBe (true) } // Sleep a small interval to allow execution of job - println("Sleeping before killing pod.") - Thread.sleep(5000) + logDebug("Sleeping before killing pod.") + Thread.sleep(2000) // Delete the pod to simulate cluster scale down/migration. val pod = kubernetesTestComponents.kubernetesClient.pods().withName(name) pod.delete() - println(s"Triggered pod decom/delete: $name deleted") + logDebug(s"Triggered pod decom/delete: $name deleted") } case Action.DELETED | Action.ERROR => execPods.remove(name) From ca27da04b339f4b7e3e054ac66dea793af5c05fe Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 5 Feb 2020 11:46:31 -0800 Subject: [PATCH 43/43] Code feedback from rberenguel, don't need future import anymore remove old name from decommissioning usage test, make executor loss reason string consistent/ --- .../scala/org/apache/spark/scheduler/ExecutorLossReason.scala | 2 +- .../kubernetes/integration-tests/tests/decommissioning.py | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala index d79a10da15be..ee31093ec065 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala @@ -65,4 +65,4 @@ case class SlaveLost(_message: String = "Slave lost", workerLost: Boolean = fals * This is used by the task scheduler to remove state associated with the executor, but * not yet fail any tasks that were running in the executor before the executor is "fully" lost. */ -private [spark] object ExecutorDecommission extends ExecutorLossReason("Executor Decommission.") +private [spark] object ExecutorDecommission extends ExecutorLossReason("Executor decommission.") diff --git a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py index bae76c84c51d..f68f24d49763 100644 --- a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py +++ b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py @@ -15,8 +15,6 @@ # limitations under the License. # -from __future__ import print_function - import sys import time @@ -25,7 +23,7 @@ if __name__ == "__main__": """ - Usage: decomissioning_water + Usage: decommissioning """ print("Starting decom test") spark = SparkSession \