Skip to content

Commit 88060be

Browse files
committed
Plumb whether a worker would also be decommissioned along with executor
This PR is a giant plumbing PR that plumbs an `ExecutorDecommissionInfo` along with the DecommissionExecutor message. The primary motivation is to know whether a decommissioned executor would also be loosing shuffle files -- and thus it is important to know whether the host would also be decommissioned. This is similar to the existing `workerLost` field in the `ExecutorProcessLost` message. In the future, this `ExecutorDecommissionInfo` can be embellished for knowing how long the executor has to live for scenarios like Cloud spot kills (or Yarn preemption) and the like.
1 parent 7b9d755 commit 88060be

16 files changed

+92
-35
lines changed

core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.spark.deploy.DeployMessages._
3131
import org.apache.spark.deploy.master.Master
3232
import org.apache.spark.internal.Logging
3333
import org.apache.spark.rpc._
34+
import org.apache.spark.scheduler.ExecutorDecommissionInfo
3435
import org.apache.spark.util.{RpcUtils, ThreadUtils}
3536

3637
/**
@@ -181,7 +182,8 @@ private[spark] class StandaloneAppClient(
181182
if (ExecutorState.isFinished(state)) {
182183
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost)
183184
} else if (state == ExecutorState.DECOMMISSIONED) {
184-
listener.executorDecommissioned(fullId, message.getOrElse(""))
185+
listener.executorDecommissioned(fullId,
186+
ExecutorDecommissionInfo(message.getOrElse(""), isHostDecommissioned = workerLost))
185187
}
186188

187189
case WorkerRemoved(id, host, message) =>

core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.deploy.client
1919

20+
import org.apache.spark.scheduler.ExecutorDecommissionInfo
21+
2022
/**
2123
* Callbacks invoked by deploy client when various events happen. There are currently five events:
2224
* connecting to the cluster, disconnecting, being given an executor, having an executor removed
@@ -39,7 +41,7 @@ private[spark] trait StandaloneAppClientListener {
3941
def executorRemoved(
4042
fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit
4143

42-
def executorDecommissioned(fullId: String, message: String): Unit
44+
def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo): Unit
4345

4446
def workerRemoved(workerId: String, host: String, message: String): Unit
4547
}

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -908,7 +908,10 @@ private[deploy] class Master(
908908
logInfo("Telling app of decommission executors")
909909
exec.application.driver.send(ExecutorUpdated(
910910
exec.id, ExecutorState.DECOMMISSIONED,
911-
Some("worker decommissioned"), None, workerLost = false))
911+
Some("worker decommissioned"), None,
912+
// workerLost is being set to true here to let the driver know that the host (aka. worker)
913+
// is also being decommissioned.
914+
workerLost = true))
912915
exec.state = ExecutorState.DECOMMISSIONED
913916
exec.application.removeExecutor(exec)
914917
}

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.apache.spark.resource.ResourceProfile
4040
import org.apache.spark.resource.ResourceProfile._
4141
import org.apache.spark.resource.ResourceUtils._
4242
import org.apache.spark.rpc._
43-
import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
43+
import org.apache.spark.scheduler.{ExecutorDecommissionInfo, ExecutorLossReason, TaskDescription}
4444
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
4545
import org.apache.spark.serializer.SerializerInstance
4646
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, SignalUtils, ThreadUtils, Utils}
@@ -166,11 +166,15 @@ private[spark] class CoarseGrainedExecutorBackend(
166166
exitExecutor(1, "Received LaunchTask command but executor was null")
167167
} else {
168168
if (decommissioned) {
169-
logError("Asked to launch a task while decommissioned.")
169+
val msg = "Asked to launch a task while decommissioned."
170+
logError(msg)
170171
driver match {
171172
case Some(endpoint) =>
172173
logInfo("Sending DecommissionExecutor to driver.")
173-
endpoint.send(DecommissionExecutor(executorId))
174+
endpoint.send(
175+
DecommissionExecutor(
176+
executorId,
177+
ExecutorDecommissionInfo(msg, isHostDecommissioned = false)))
174178
case _ =>
175179
logError("No registered driver to send Decommission to.")
176180
}
@@ -259,12 +263,14 @@ private[spark] class CoarseGrainedExecutorBackend(
259263
}
260264

261265
private def decommissionSelf(): Boolean = {
262-
logInfo("Decommissioning self w/sync")
266+
val msg = "Decommissioning self w/sync"
267+
logInfo(msg)
263268
try {
264269
decommissioned = true
265270
// Tell master we are are decommissioned so it stops trying to schedule us
266271
if (driver.nonEmpty) {
267-
driver.get.askSync[Boolean](DecommissionExecutor(executorId))
272+
driver.get.askSync[Boolean](DecommissionExecutor(
273+
executorId, ExecutorDecommissionInfo(msg, false)))
268274
} else {
269275
logError("No driver to message decommissioning.")
270276
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.scheduler
19+
20+
/**
21+
* Provides more detail when an executor is being decommissioned.
22+
* @param message Human readable reason for why the decommissioning is happening.
23+
* @param isHostDecommissioned Whether the host (aka the `node` or `worker` in other places) is
24+
* being decommissioned too. Used to infer if the shuffle data might
25+
* be lost even if the external shuffle service is enabled.
26+
*/
27+
private[spark]
28+
case class ExecutorDecommissionInfo(message: String, isHostDecommissioned: Boolean)

core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ private[spark] trait TaskScheduler {
101101
/**
102102
* Process a decommissioning executor.
103103
*/
104-
def executorDecommission(executorId: String): Unit
104+
def executorDecommission(executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit
105105

106106
/**
107107
* Process a lost executor

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -912,7 +912,8 @@ private[spark] class TaskSchedulerImpl(
912912
}
913913
}
914914

915-
override def executorDecommission(executorId: String): Unit = {
915+
override def executorDecommission(
916+
executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = {
916917
rootPool.executorDecommission(executorId)
917918
backend.reviveOffers()
918919
}

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer
2222
import org.apache.spark.TaskState.TaskState
2323
import org.apache.spark.resource.{ResourceInformation, ResourceProfile}
2424
import org.apache.spark.rpc.RpcEndpointRef
25+
import org.apache.spark.scheduler.ExecutorDecommissionInfo
2526
import org.apache.spark.scheduler.ExecutorLossReason
2627
import org.apache.spark.util.SerializableBuffer
2728

@@ -94,7 +95,8 @@ private[spark] object CoarseGrainedClusterMessages {
9495
case class RemoveExecutor(executorId: String, reason: ExecutorLossReason)
9596
extends CoarseGrainedClusterMessage
9697

97-
case class DecommissionExecutor(executorId: String) extends CoarseGrainedClusterMessage
98+
case class DecommissionExecutor(executorId: String, decommissionInfo: ExecutorDecommissionInfo)
99+
extends CoarseGrainedClusterMessage
98100

99101
case class RemoveWorker(workerId: String, host: String, message: String)
100102
extends CoarseGrainedClusterMessage

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -191,9 +191,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
191191
executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))
192192
removeExecutor(executorId, reason)
193193

194-
case DecommissionExecutor(executorId) =>
195-
logError(s"Received decommission executor message ${executorId}.")
196-
decommissionExecutor(executorId)
194+
case DecommissionExecutor(executorId, decommissionInfo) =>
195+
logError(s"Received decommission executor message ${executorId}: $decommissionInfo")
196+
decommissionExecutor(executorId, decommissionInfo)
197197

198198
case RemoveWorker(workerId, host, message) =>
199199
removeWorker(workerId, host, message)
@@ -272,9 +272,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
272272
removeWorker(workerId, host, message)
273273
context.reply(true)
274274

275-
case DecommissionExecutor(executorId) =>
276-
logError(s"Received decommission executor message ${executorId}.")
277-
decommissionExecutor(executorId)
275+
case DecommissionExecutor(executorId, decommissionInfo) =>
276+
logError(s"Received decommission executor message ${executorId}: ${decommissionInfo}.")
277+
decommissionExecutor(executorId, decommissionInfo)
278278
context.reply(true)
279279

280280
case RetrieveSparkAppConfig(resourceProfileId) =>
@@ -422,7 +422,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
422422
/**
423423
* Mark a given executor as decommissioned and stop making resource offers for it.
424424
*/
425-
private def decommissionExecutor(executorId: String): Boolean = {
425+
private def decommissionExecutor(
426+
executorId: String, decommissionInfo: ExecutorDecommissionInfo): Boolean = {
426427
val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized {
427428
// Only bother decommissioning executors which are alive.
428429
if (isExecutorActive(executorId)) {
@@ -436,7 +437,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
436437
if (shouldDisable) {
437438
logInfo(s"Starting decommissioning executor $executorId.")
438439
try {
439-
scheduler.executorDecommission(executorId)
440+
scheduler.executorDecommission(executorId, decommissionInfo)
440441
} catch {
441442
case e: Exception =>
442443
logError(s"Unexpected error during decommissioning ${e.toString}", e)
@@ -590,10 +591,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
590591
/**
591592
* Called by subclasses when notified of a decommissioning executor.
592593
*/
593-
private[spark] def decommissionExecutor(executorId: String): Unit = {
594+
private[spark] def decommissionExecutor(
595+
executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = {
594596
if (driverEndpoint != null) {
595597
logInfo("Propagating executor decommission to driver.")
596-
driverEndpoint.send(DecommissionExecutor(executorId))
598+
driverEndpoint.send(DecommissionExecutor(executorId, decommissionInfo))
597599
}
598600
}
599601

core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,10 +174,10 @@ private[spark] class StandaloneSchedulerBackend(
174174
removeExecutor(fullId.split("/")(1), reason)
175175
}
176176

177-
override def executorDecommissioned(fullId: String, message: String) {
177+
override def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo) {
178178
logInfo("Asked to decommission executor")
179-
decommissionExecutor(fullId.split("/")(1))
180-
logInfo("Executor %s decommissioned: %s".format(fullId, message))
179+
decommissionExecutor(fullId.split("/")(1), decommissionInfo)
180+
logInfo("Executor %s decommissioned: %s".format(fullId, decommissionInfo))
181181
}
182182

183183
override def workerRemoved(workerId: String, host: String, message: String): Unit = {

0 commit comments

Comments
 (0)