Skip to content

Commit 3c9968a

Browse files
committed
Merge remote-tracking branch 'upstream/master' into SPARK-30768
2 parents df679e6 + ab07c63 commit 3c9968a

File tree

121 files changed

+1460
-551
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

121 files changed

+1460
-551
lines changed

core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,15 @@ private[deploy] object DeployMessages {
6060
assert (port > 0)
6161
}
6262

63+
/**
64+
* @param id the worker id
65+
* @param worker the worker endpoint ref
66+
*/
67+
case class WorkerDecommission(
68+
id: String,
69+
worker: RpcEndpointRef)
70+
extends DeployMessage
71+
6372
case class ExecutorStateChanged(
6473
appId: String,
6574
execId: Int,
@@ -149,6 +158,8 @@ private[deploy] object DeployMessages {
149158

150159
case object ReregisterWithMaster // used when a worker attempts to reconnect to a master
151160

161+
case object DecommissionSelf // Mark as decommissioned. May be Master to Worker in the future.
162+
152163
// AppClient to Master
153164

154165
case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)

core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,13 @@ package org.apache.spark.deploy
1919

2020
private[deploy] object ExecutorState extends Enumeration {
2121

22-
val LAUNCHING, RUNNING, KILLED, FAILED, LOST, EXITED = Value
22+
val LAUNCHING, RUNNING, KILLED, FAILED, LOST, EXITED, DECOMMISSIONED = Value
2323

2424
type ExecutorState = Value
2525

26-
def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST, EXITED).contains(state)
26+
// DECOMMISSIONED isn't listed as finished since we don't want to remove the executor from
27+
// the worker and the executor still exists - but we do want to avoid scheduling new tasks on it.
28+
private val finishedStates = Seq(KILLED, FAILED, LOST, EXITED)
29+
30+
def isFinished(state: ExecutorState): Boolean = finishedStates.contains(state)
2731
}

core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,8 @@ private[spark] class StandaloneAppClient(
180180
logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
181181
if (ExecutorState.isFinished(state)) {
182182
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost)
183+
} else if (state == ExecutorState.DECOMMISSIONED) {
184+
listener.executorDecommissioned(fullId, message.getOrElse(""))
183185
}
184186

185187
case WorkerRemoved(id, host, message) =>

core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,5 +39,7 @@ private[spark] trait StandaloneAppClientListener {
3939
def executorRemoved(
4040
fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit
4141

42+
def executorDecommissioned(fullId: String, message: String): Unit
43+
4244
def workerRemoved(workerId: String, host: String, message: String): Unit
4345
}

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,15 @@ private[deploy] class Master(
243243
logError("Leadership has been revoked -- master shutting down.")
244244
System.exit(0)
245245

246+
case WorkerDecommission(id, workerRef) =>
247+
logInfo("Recording worker %s decommissioning".format(id))
248+
if (state == RecoveryState.STANDBY) {
249+
workerRef.send(MasterInStandby)
250+
} else {
251+
// We use foreach since get gives us an option and we can skip the failures.
252+
idToWorker.get(id).foreach(decommissionWorker)
253+
}
254+
246255
case RegisterWorker(
247256
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl,
248257
masterAddress, resources) =>
@@ -313,7 +322,9 @@ private[deploy] class Master(
313322
// Only retry certain number of times so we don't go into an infinite loop.
314323
// Important note: this code path is not exercised by tests, so be very careful when
315324
// changing this `if` condition.
325+
// We also don't count failures from decommissioned workers since they are "expected."
316326
if (!normalExit
327+
&& oldState != ExecutorState.DECOMMISSIONED
317328
&& appInfo.incrementRetryCount() >= maxExecutorRetries
318329
&& maxExecutorRetries >= 0) { // < 0 disables this application-killing path
319330
val execs = appInfo.executors.values
@@ -850,6 +861,26 @@ private[deploy] class Master(
850861
true
851862
}
852863

864+
private def decommissionWorker(worker: WorkerInfo): Unit = {
865+
if (worker.state != WorkerState.DECOMMISSIONED) {
866+
logInfo("Decommissioning worker %s on %s:%d".format(worker.id, worker.host, worker.port))
867+
worker.setState(WorkerState.DECOMMISSIONED)
868+
for (exec <- worker.executors.values) {
869+
logInfo("Telling app of decommission executors")
870+
exec.application.driver.send(ExecutorUpdated(
871+
exec.id, ExecutorState.DECOMMISSIONED,
872+
Some("worker decommissioned"), None, workerLost = false))
873+
exec.state = ExecutorState.DECOMMISSIONED
874+
exec.application.removeExecutor(exec)
875+
}
876+
// On recovery do not add a decommissioned executor
877+
persistenceEngine.removeWorker(worker)
878+
} else {
879+
logWarning("Skipping decommissioning worker %s on %s:%d as worker is already decommissioned".
880+
format(worker.id, worker.host, worker.port))
881+
}
882+
}
883+
853884
private def removeWorker(worker: WorkerInfo, msg: String): Unit = {
854885
logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
855886
worker.setState(WorkerState.DEAD)

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,14 @@ private[deploy] class Worker(
6767
Utils.checkHost(host)
6868
assert (port > 0)
6969

70+
// If worker decommissioning is enabled register a handler on PWR to shutdown.
71+
if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
72+
logInfo("Registering SIGPWR handler to trigger decommissioning.")
73+
SignalUtils.register("PWR")(decommissionSelf)
74+
} else {
75+
logInfo("Worker decommissioning not enabled, SIGPWR will result in exiting.")
76+
}
77+
7078
// A scheduled executor used to send messages at the specified time.
7179
private val forwardMessageScheduler =
7280
ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler")
@@ -128,6 +136,7 @@ private[deploy] class Worker(
128136
private val workerUri = RpcEndpointAddress(rpcEnv.address, endpointName).toString
129137
private var registered = false
130138
private var connected = false
139+
private var decommissioned = false
131140
private val workerId = generateWorkerId()
132141
private val sparkHome =
133142
if (sys.props.contains(IS_TESTING.key)) {
@@ -549,6 +558,8 @@ private[deploy] class Worker(
549558
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, resources_) =>
550559
if (masterUrl != activeMasterUrl) {
551560
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
561+
} else if (decommissioned) {
562+
logWarning("Asked to launch an executor while decommissioned. Not launching executor.")
552563
} else {
553564
try {
554565
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
@@ -672,6 +683,9 @@ private[deploy] class Worker(
672683
case ApplicationFinished(id) =>
673684
finishedApps += id
674685
maybeCleanupApplication(id)
686+
687+
case DecommissionSelf =>
688+
decommissionSelf()
675689
}
676690

677691
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -771,6 +785,18 @@ private[deploy] class Worker(
771785
}
772786
}
773787

788+
private[deploy] def decommissionSelf(): Boolean = {
789+
if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
790+
logDebug("Decommissioning self")
791+
decommissioned = true
792+
sendToMaster(WorkerDecommission(workerId, self))
793+
} else {
794+
logWarning("Asked to decommission self, but decommissioning not enabled")
795+
}
796+
// Return true since can be called as a signal handler
797+
true
798+
}
799+
774800
private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
775801
val driverId = driverStateChanged.driverId
776802
val exception = driverStateChanged.exception

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ import org.apache.spark.rpc._
4343
import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
4444
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
4545
import org.apache.spark.serializer.SerializerInstance
46-
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, ThreadUtils, Utils}
46+
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, SignalUtils, ThreadUtils, Utils}
4747

4848
private[spark] class CoarseGrainedExecutorBackend(
4949
override val rpcEnv: RpcEnv,
@@ -64,6 +64,7 @@ private[spark] class CoarseGrainedExecutorBackend(
6464

6565
private[this] val stopping = new AtomicBoolean(false)
6666
var executor: Executor = null
67+
@volatile private var decommissioned = false
6768
@volatile var driver: Option[RpcEndpointRef] = None
6869

6970
// If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need
@@ -80,6 +81,9 @@ private[spark] class CoarseGrainedExecutorBackend(
8081
private[executor] val taskResources = new mutable.HashMap[Long, Map[String, ResourceInformation]]
8182

8283
override def onStart(): Unit = {
84+
logInfo("Registering PWR handler.")
85+
SignalUtils.register("PWR")(decommissionSelf)
86+
8387
logInfo("Connecting to driver: " + driverUrl)
8488
try {
8589
_resources = parseOrFindResources(resourcesFileOpt)
@@ -160,6 +164,16 @@ private[spark] class CoarseGrainedExecutorBackend(
160164
if (executor == null) {
161165
exitExecutor(1, "Received LaunchTask command but executor was null")
162166
} else {
167+
if (decommissioned) {
168+
logError("Asked to launch a task while decommissioned.")
169+
driver match {
170+
case Some(endpoint) =>
171+
logInfo("Sending DecommissionExecutor to driver.")
172+
endpoint.send(DecommissionExecutor(executorId))
173+
case _ =>
174+
logError("No registered driver to send Decommission to.")
175+
}
176+
}
163177
val taskDesc = TaskDescription.decode(data.value)
164178
logInfo("Got assigned task " + taskDesc.taskId)
165179
taskResources(taskDesc.taskId) = taskDesc.resources
@@ -242,6 +256,29 @@ private[spark] class CoarseGrainedExecutorBackend(
242256

243257
System.exit(code)
244258
}
259+
260+
private def decommissionSelf(): Boolean = {
261+
logInfo("Decommissioning self w/sync")
262+
try {
263+
decommissioned = true
264+
// Tell master we are are decommissioned so it stops trying to schedule us
265+
if (driver.nonEmpty) {
266+
driver.get.askSync[Boolean](DecommissionExecutor(executorId))
267+
} else {
268+
logError("No driver to message decommissioning.")
269+
}
270+
if (executor != null) {
271+
executor.decommission()
272+
}
273+
logInfo("Done decommissioning self.")
274+
// Return true since we are handling a signal
275+
true
276+
} catch {
277+
case e: Exception =>
278+
logError(s"Error ${e} during attempt to decommission self")
279+
false
280+
}
281+
}
245282
}
246283

247284
private[spark] object CoarseGrainedExecutorBackend extends Logging {

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,16 +216,32 @@ private[spark] class Executor(
216216
*/
217217
private var heartbeatFailures = 0
218218

219+
/**
220+
* Flag to prevent launching new tasks while decommissioned. There could be a race condition
221+
* accessing this, but decommissioning is only intended to help not be a hard stop.
222+
*/
223+
private var decommissioned = false
224+
219225
heartbeater.start()
220226

221227
metricsPoller.start()
222228

223229
private[executor] def numRunningTasks: Int = runningTasks.size()
224230

231+
/**
232+
* Mark an executor for decommissioning and avoid launching new tasks.
233+
*/
234+
private[spark] def decommission(): Unit = {
235+
decommissioned = true
236+
}
237+
225238
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
226239
val tr = new TaskRunner(context, taskDescription)
227240
runningTasks.put(taskDescription.taskId, tr)
228241
threadPool.execute(tr)
242+
if (decommissioned) {
243+
log.error(s"Launching a task while in decommissioned state.")
244+
}
229245
}
230246

231247
def killTask(taskId: Long, interruptThread: Boolean, reason: String): Unit = {

core/src/main/scala/org/apache/spark/internal/config/Worker.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,9 @@ private[spark] object Worker {
7171
ConfigBuilder("spark.worker.ui.compressedLogFileLengthCacheSize")
7272
.intConf
7373
.createWithDefault(100)
74+
75+
private[spark] val WORKER_DECOMMISSION_ENABLED =
76+
ConfigBuilder("spark.worker.decommission.enabled")
77+
.booleanConf
78+
.createWithDefault(false)
7479
}

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,7 @@ abstract class RDD[T: ClassTag](
361361
readCachedBlock = false
362362
computeOrReadCheckpoint(partition, context)
363363
}) match {
364+
// Block hit.
364365
case Left(blockResult) =>
365366
if (readCachedBlock) {
366367
val existingMetrics = context.taskMetrics().inputMetrics
@@ -374,6 +375,7 @@ abstract class RDD[T: ClassTag](
374375
} else {
375376
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
376377
}
378+
// Need to compute the block.
377379
case Right(iter) =>
378380
new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
379381
}

0 commit comments

Comments
 (0)