diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala index eedf5e969e291..a6da8393bf405 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala @@ -31,6 +31,7 @@ import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.Master import org.apache.spark.internal.Logging import org.apache.spark.rpc._ +import org.apache.spark.scheduler.ExecutorDecommissionInfo import org.apache.spark.util.{RpcUtils, ThreadUtils} /** @@ -181,7 +182,8 @@ private[spark] class StandaloneAppClient( if (ExecutorState.isFinished(state)) { listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost) } else if (state == ExecutorState.DECOMMISSIONED) { - listener.executorDecommissioned(fullId, message.getOrElse("")) + listener.executorDecommissioned(fullId, + ExecutorDecommissionInfo(message.getOrElse(""), isHostDecommissioned = workerLost)) } case WorkerRemoved(id, host, message) => diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala index 2e38a6847891d..e72f7e976bb0a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala @@ -17,6 +17,8 @@ package org.apache.spark.deploy.client +import org.apache.spark.scheduler.ExecutorDecommissionInfo + /** * Callbacks invoked by deploy client when various events happen. There are currently five events: * connecting to the cluster, disconnecting, being given an executor, having an executor removed @@ -39,7 +41,7 @@ private[spark] trait StandaloneAppClientListener { def executorRemoved( fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit - def executorDecommissioned(fullId: String, message: String): Unit + def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo): Unit def workerRemoved(workerId: String, host: String, message: String): Unit } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 0070df1d66dee..220e1c963d5ea 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -908,7 +908,10 @@ private[deploy] class Master( logInfo("Telling app of decommission executors") exec.application.driver.send(ExecutorUpdated( exec.id, ExecutorState.DECOMMISSIONED, - Some("worker decommissioned"), None, workerLost = false)) + Some("worker decommissioned"), None, + // workerLost is being set to true here to let the driver know that the host (aka. worker) + // is also being decommissioned. + workerLost = true)) exec.state = ExecutorState.DECOMMISSIONED exec.application.removeExecutor(exec) } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index e072d7919450e..def125bb6bfb6 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -40,7 +40,7 @@ import org.apache.spark.resource.ResourceProfile import org.apache.spark.resource.ResourceProfile._ import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.rpc._ -import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription} +import org.apache.spark.scheduler.{ExecutorDecommissionInfo, ExecutorLossReason, TaskDescription} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.serializer.SerializerInstance import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, SignalUtils, ThreadUtils, Utils} @@ -166,11 +166,15 @@ private[spark] class CoarseGrainedExecutorBackend( exitExecutor(1, "Received LaunchTask command but executor was null") } else { if (decommissioned) { - logError("Asked to launch a task while decommissioned.") + val msg = "Asked to launch a task while decommissioned." + logError(msg) driver match { case Some(endpoint) => logInfo("Sending DecommissionExecutor to driver.") - endpoint.send(DecommissionExecutor(executorId)) + endpoint.send( + DecommissionExecutor( + executorId, + ExecutorDecommissionInfo(msg, isHostDecommissioned = false))) case _ => logError("No registered driver to send Decommission to.") } @@ -259,12 +263,14 @@ private[spark] class CoarseGrainedExecutorBackend( } private def decommissionSelf(): Boolean = { - logInfo("Decommissioning self w/sync") + val msg = "Decommissioning self w/sync" + logInfo(msg) try { decommissioned = true // Tell master we are are decommissioned so it stops trying to schedule us if (driver.nonEmpty) { - driver.get.askSync[Boolean](DecommissionExecutor(executorId)) + driver.get.askSync[Boolean](DecommissionExecutor( + executorId, ExecutorDecommissionInfo(msg, false))) } else { logError("No driver to message decommissioning.") } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala new file mode 100644 index 0000000000000..a82b5d38afe9f --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +/** + * Provides more detail when an executor is being decommissioned. + * @param message Human readable reason for why the decommissioning is happening. + * @param isHostDecommissioned Whether the host (aka the `node` or `worker` in other places) is + * being decommissioned too. Used to infer if the shuffle data might + * be lost even if the external shuffle service is enabled. + */ +private[spark] +case class ExecutorDecommissionInfo(message: String, isHostDecommissioned: Boolean) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 08f9f3c256e69..b29458c481413 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -101,7 +101,7 @@ private[spark] trait TaskScheduler { /** * Process a decommissioning executor. */ - def executorDecommission(executorId: String): Unit + def executorDecommission(executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit /** * Process a lost executor diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 12bd93286d736..28e138ea9b79c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -912,7 +912,8 @@ private[spark] class TaskSchedulerImpl( } } - override def executorDecommission(executorId: String): Unit = { + override def executorDecommission( + executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = { rootPool.executorDecommission(executorId) backend.reviveOffers() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index bb929c27b6a65..91485f01bf007 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -22,6 +22,7 @@ import java.nio.ByteBuffer import org.apache.spark.TaskState.TaskState import org.apache.spark.resource.{ResourceInformation, ResourceProfile} import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.ExecutorDecommissionInfo import org.apache.spark.scheduler.ExecutorLossReason import org.apache.spark.util.SerializableBuffer @@ -94,7 +95,8 @@ private[spark] object CoarseGrainedClusterMessages { case class RemoveExecutor(executorId: String, reason: ExecutorLossReason) extends CoarseGrainedClusterMessage - case class DecommissionExecutor(executorId: String) extends CoarseGrainedClusterMessage + case class DecommissionExecutor(executorId: String, decommissionInfo: ExecutorDecommissionInfo) + extends CoarseGrainedClusterMessage case class RemoveWorker(workerId: String, host: String, message: String) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 6b9b4d6fe57e0..9a5529ef48c5f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -191,9 +191,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor)) removeExecutor(executorId, reason) - case DecommissionExecutor(executorId) => - logError(s"Received decommission executor message ${executorId}.") - decommissionExecutor(executorId) + case DecommissionExecutor(executorId, decommissionInfo) => + logError(s"Received decommission executor message ${executorId}: $decommissionInfo") + decommissionExecutor(executorId, decommissionInfo) case RemoveWorker(workerId, host, message) => removeWorker(workerId, host, message) @@ -272,9 +272,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp removeWorker(workerId, host, message) context.reply(true) - case DecommissionExecutor(executorId) => - logError(s"Received decommission executor message ${executorId}.") - decommissionExecutor(executorId) + case DecommissionExecutor(executorId, decommissionInfo) => + logError(s"Received decommission executor message ${executorId}: ${decommissionInfo}.") + decommissionExecutor(executorId, decommissionInfo) context.reply(true) case RetrieveSparkAppConfig(resourceProfileId) => @@ -422,7 +422,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp /** * Mark a given executor as decommissioned and stop making resource offers for it. */ - private def decommissionExecutor(executorId: String): Boolean = { + private def decommissionExecutor( + executorId: String, decommissionInfo: ExecutorDecommissionInfo): Boolean = { val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized { // Only bother decommissioning executors which are alive. if (isExecutorActive(executorId)) { @@ -436,7 +437,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp if (shouldDisable) { logInfo(s"Starting decommissioning executor $executorId.") try { - scheduler.executorDecommission(executorId) + scheduler.executorDecommission(executorId, decommissionInfo) } catch { case e: Exception => logError(s"Unexpected error during decommissioning ${e.toString}", e) @@ -590,10 +591,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp /** * Called by subclasses when notified of a decommissioning executor. */ - private[spark] def decommissionExecutor(executorId: String): Unit = { + private[spark] def decommissionExecutor( + executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = { if (driverEndpoint != null) { logInfo("Propagating executor decommission to driver.") - driverEndpoint.send(DecommissionExecutor(executorId)) + driverEndpoint.send(DecommissionExecutor(executorId, decommissionInfo)) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 4024b44bdfd2f..d921af602b254 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -174,10 +174,10 @@ private[spark] class StandaloneSchedulerBackend( removeExecutor(fullId.split("/")(1), reason) } - override def executorDecommissioned(fullId: String, message: String) { + override def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo) { logInfo("Asked to decommission executor") - decommissionExecutor(fullId.split("/")(1)) - logInfo("Executor %s decommissioned: %s".format(fullId, message)) + decommissionExecutor(fullId.split("/")(1), decommissionInfo) + logInfo("Executor %s decommissioned: %s".format(fullId, decommissionInfo)) } override def workerRemoved(workerId: String, host: String, message: String): Unit = { diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala index a3e39d7f53728..e091bd05c2dc8 100644 --- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.client import java.io.Closeable -import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue} import scala.concurrent.duration._ @@ -32,6 +32,7 @@ import org.apache.spark.deploy.master.{ApplicationInfo, Master} import org.apache.spark.deploy.worker.Worker import org.apache.spark.internal.{config, Logging} import org.apache.spark.rpc.RpcEnv +import org.apache.spark.scheduler.ExecutorDecommissionInfo import org.apache.spark.util.Utils /** @@ -126,7 +127,10 @@ class AppClientSuite // Decommissioning is async. eventually(timeout(1.seconds), interval(10.millis)) { // We only record decommissioning for the executor we've requested - assert(ci.listener.execDecommissionedList.size === 1) + assert(ci.listener.execDecommissionedMap.size === 1) + val decommissionInfo = ci.listener.execDecommissionedMap.get(executorId) + assert(decommissionInfo != null && decommissionInfo.isHostDecommissioned, + s"$executorId should have been decommissioned along with its worker") } // Send request to kill executor, verify request was made @@ -215,7 +219,7 @@ class AppClientSuite val deadReasonList = new ConcurrentLinkedQueue[String]() val execAddedList = new ConcurrentLinkedQueue[String]() val execRemovedList = new ConcurrentLinkedQueue[String]() - val execDecommissionedList = new ConcurrentLinkedQueue[String]() + val execDecommissionedMap = new ConcurrentHashMap[String, ExecutorDecommissionInfo]() def connected(id: String): Unit = { connectedIdList.add(id) @@ -245,8 +249,9 @@ class AppClientSuite execRemovedList.add(id) } - def executorDecommissioned(id: String, message: String): Unit = { - execDecommissionedList.add(id) + def executorDecommissioned(id: String, decommissionInfo: ExecutorDecommissionInfo): Unit = { + val previousDecommissionInfo = execDecommissionedMap.putIfAbsent(id, decommissionInfo) + assert(previousDecommissionInfo === null, s"Expected no previous decommission info for $id") } def workerRemoved(workerId: String, host: String, message: String): Unit = {} diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 7013832757e38..503975d8e91dc 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -169,10 +169,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } override def setDAGScheduler(dagScheduler: DAGScheduler) = {} override def defaultParallelism() = 2 - override def executorDecommission(executorId: String) = {} override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None + override def executorDecommission( + executorId: String, + decommissionInfo: ExecutorDecommissionInfo): Unit = {} } /** @@ -715,10 +717,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId, executorUpdates: Map[(Int, Int), ExecutorMetrics]): Boolean = true - override def executorDecommission(executorId: String): Unit = {} override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None + override def executorDecommission( + executorId: String, + decommissionInfo: ExecutorDecommissionInfo): Unit = {} } val noKillScheduler = new DAGScheduler( sc, diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index 7ead51bc691fb..b2a5f77b4b04c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -90,7 +90,6 @@ private class DummyTaskScheduler extends TaskScheduler { override def notifyPartitionCompletion(stageId: Int, partitionId: Int): Unit = {} override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {} override def defaultParallelism(): Int = 2 - override def executorDecommission(executorId: String): Unit = {} override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None @@ -99,4 +98,7 @@ private class DummyTaskScheduler extends TaskScheduler { accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId, executorMetrics: Map[(Int, Int), ExecutorMetrics]): Boolean = true + override def executorDecommission( + executorId: String, + decommissionInfo: ExecutorDecommissionInfo): Unit = {} } diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala index 4de5aaeab5c51..d95deb1f5f327 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala @@ -65,7 +65,7 @@ class WorkerDecommissionExtendedSuite extends SparkFunSuite with LocalSparkConte val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] sc.getExecutorIds().tail.foreach { id => - sched.decommissionExecutor(id) + sched.decommissionExecutor(id, ExecutorDecommissionInfo("", false)) assert(rdd3.sortByKey().collect().length === 100) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala index cd3ab4db77f85..3c34070e8bb97 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala @@ -73,7 +73,7 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { // decom.sh message passing is tested manually. val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] val execs = sched.getExecutorIds() - execs.foreach(execId => sched.decommissionExecutor(execId)) + execs.foreach(execId => sched.decommissionExecutor(execId, ExecutorDecommissionInfo("", false))) val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds) assert(asyncCountResult === 10) // Try and launch task after decommissioning, this should fail diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index afcb38bc38836..57410103dd080 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -146,7 +146,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS val execToDecommission = execs.head logDebug(s"Decommissioning executor ${execToDecommission}") - sched.decommissionExecutor(execToDecommission) + sched.decommissionExecutor(execToDecommission, ExecutorDecommissionInfo("", false)) // Wait for job to finish. val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 15.seconds)