Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
82c3c9a
Add the concept of a Decommissioning state which nodes can enter if t…
holdenk Sep 4, 2019
c935a6e
Add the decom script and test python
holdenk Sep 5, 2019
16b3880
Cleanup and reduce time a bit
holdenk Sep 5, 2019
8f7ed26
Add decom script was previously untracked
holdenk Sep 13, 2019
55b6f9e
Add some more docstring
holdenk Oct 10, 2019
bfa06ce
Style fix (long line)
holdenk Nov 8, 2019
bdda06d
Merge branch 'master' into SPARK-20628-keep-track-of-nodes-which-are-…
holdenk Nov 11, 2019
a63b68f
Minor style and comment fixes
holdenk Nov 12, 2019
9476e22
Now we ACK the decom msg
holdenk Nov 12, 2019
317c76b
Launch a task even in decom state to avoid a race condition.
holdenk Nov 12, 2019
86c0ff6
Allow tasks to launch in the executor base class as well
holdenk Nov 13, 2019
bdd7df3
Merge branch 'master' into SPARK-20628-keep-track-of-nodes-which-are-…
holdenk Nov 13, 2019
545e4d9
Merge in master
holdenk Jan 1, 2020
88c89b1
Finish logical merge
holdenk Jan 1, 2020
0eb089f
Start working on vanzin's feedback on the name and sync to async.
holdenk Jan 1, 2020
8bda4c7
Code review feedback, improve test to use listener instead of sleep, …
holdenk Jan 9, 2020
dcebf8c
Cleanup stray newline and fix wording in comment.
holdenk Jan 9, 2020
f312348
Merge branch 'master' into SPARK-20628-keep-track-of-nodes-which-are-…
holdenk Jan 10, 2020
7c02a63
Look into failures in K8s jenkins job
holdenk Jan 13, 2020
41dc954
Merge branch 'master' into SPARK-20628-keep-track-of-nodes-which-are-…
holdenk Jan 13, 2020
f0aafa8
We need ps to find the pid of the java process
holdenk Jan 15, 2020
5f65e27
Simplify decom script to not depend on strings
holdenk Jan 15, 2020
271984d
Temporarily boosy decom time for manual debugging.
holdenk Jan 15, 2020
2141df0
Revert "Temporarily boosy decom time for manual debugging."
holdenk Jan 15, 2020
d4dcb82
We skip errors since we don't care.
holdenk Jan 15, 2020
eea0da0
Try and debug the weird new behaviour
holdenk Jan 15, 2020
940f4c3
Debug decomm failures
holdenk Jan 15, 2020
2c7c1ba
Figure out why the early exit
holdenk Jan 15, 2020
c72f676
Handle whitespace from ps
holdenk Jan 18, 2020
47e9107
Make sure decom job can finish too after decom
holdenk Jan 19, 2020
95f24ab
Cleanup some debugging and re-enable the tests
holdenk Jan 19, 2020
e29cce9
Merge in master
holdenk Jan 25, 2020
cf3a6d6
PR feedback don't use a stringly typed true for the config in thest
holdenk Jan 25, 2020
1b9f83d
Use string because of overloaded methods
holdenk Jan 25, 2020
556191b
Log when we receive unexpected messages
holdenk Jan 25, 2020
d07770d
Give us a bit more time.
holdenk Jan 25, 2020
5dee0dd
Ask for decom rather than send incase that is what is making the logg…
holdenk Jan 25, 2020
0c616fc
Like remove worker support ask OR send for Decomm
holdenk Jan 25, 2020
28566fa
spelling
holdenk Jan 25, 2020
9377eb0
spelling
holdenk Jan 25, 2020
4a55f3d
Forgot to take the .key off when I made the last change.
holdenk Jan 25, 2020
0c7182a
Add some debugging messages.
holdenk Jan 25, 2020
018e3a8
Looking for the wrong magic string
holdenk Jan 25, 2020
da7faf2
Revert "Ask for decom rather than send incase that is what is making …
holdenk Jan 25, 2020
5bdfbfa
Revert "Revert "Ask for decom rather than send incase that is what is…
holdenk Jan 25, 2020
46d26c8
Merge branch 'master' into SPARK-20628-keep-track-of-nodes-which-are-…
holdenk Jan 29, 2020
15179e5
Remove debugging printlns
holdenk Jan 30, 2020
ed45999
Merge branch 'master' into SPARK-20628-keep-track-of-nodes-which-are-…
holdenk Jan 30, 2020
02d1668
[TEMPORARY] - Debug the failure in Jenkins
holdenk Jan 30, 2020
0a7d084
Revert "[TEMPORARY] - Debug the failure in Jenkins"
holdenk Jan 31, 2020
c8161c3
Merge branch 'master' into SPARK-20628-keep-track-of-nodes-which-are-…
holdenk Jan 31, 2020
bcd6d4f
Merge branch 'master' into SPARK-20628-keep-track-of-nodes-which-are-…
holdenk Feb 5, 2020
ca27da0
Code feedback from rberenguel, don't need future import anymore remov…
holdenk Feb 5, 2020
af55030
Merge in master
holdenk Feb 13, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ private[deploy] object DeployMessages {
assert (port > 0)
}

/**
* @param id the worker id
* @param worker the worker endpoint ref
*/
case class WorkerDecommission(
id: String,
worker: RpcEndpointRef)
extends DeployMessage

case class ExecutorStateChanged(
appId: String,
execId: Int,
Expand Down Expand Up @@ -149,6 +158,8 @@ private[deploy] object DeployMessages {

case object ReregisterWithMaster // used when a worker attempts to reconnect to a master

case object DecommissionSelf // Mark as decommissioned. May be Master to Worker in the future.

// AppClient to Master

case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@ package org.apache.spark.deploy

private[deploy] object ExecutorState extends Enumeration {

val LAUNCHING, RUNNING, KILLED, FAILED, LOST, EXITED = Value
val LAUNCHING, RUNNING, KILLED, FAILED, LOST, EXITED, DECOMMISSIONED = Value

type ExecutorState = Value

def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST, EXITED).contains(state)
// DECOMMISSIONED isn't listed as finished since we don't want to remove the executor from
// the worker and the executor still exists - but we do want to avoid scheduling new tasks on it.
private val finishedStates = Seq(KILLED, FAILED, LOST, EXITED)

def isFinished(state: ExecutorState): Boolean = finishedStates.contains(state)
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ private[spark] class StandaloneAppClient(
logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
if (ExecutorState.isFinished(state)) {
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost)
} else if (state == ExecutorState.DECOMMISSIONED) {
listener.executorDecommissioned(fullId, message.getOrElse(""))
}

case WorkerRemoved(id, host, message) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,7 @@ private[spark] trait StandaloneAppClientListener {
def executorRemoved(
fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit

def executorDecommissioned(fullId: String, message: String): Unit

def workerRemoved(workerId: String, host: String, message: String): Unit
}
31 changes: 31 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,15 @@ private[deploy] class Master(
logError("Leadership has been revoked -- master shutting down.")
System.exit(0)

case WorkerDecommission(id, workerRef) =>
logInfo("Recording worker %s decommissioning".format(id))
if (state == RecoveryState.STANDBY) {
workerRef.send(MasterInStandby)
} else {
// We use foreach since get gives us an option and we can skip the failures.
idToWorker.get(id).foreach(decommissionWorker)
}

case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl,
masterAddress, resources) =>
Expand Down Expand Up @@ -313,7 +322,9 @@ private[deploy] class Master(
// Only retry certain number of times so we don't go into an infinite loop.
// Important note: this code path is not exercised by tests, so be very careful when
// changing this `if` condition.
// We also don't count failures from decommissioned workers since they are "expected."
if (!normalExit
&& oldState != ExecutorState.DECOMMISSIONED
&& appInfo.incrementRetryCount() >= maxExecutorRetries
&& maxExecutorRetries >= 0) { // < 0 disables this application-killing path
val execs = appInfo.executors.values
Expand Down Expand Up @@ -850,6 +861,26 @@ private[deploy] class Master(
true
}

private def decommissionWorker(worker: WorkerInfo): Unit = {
if (worker.state != WorkerState.DECOMMISSIONED) {
logInfo("Decommissioning worker %s on %s:%d".format(worker.id, worker.host, worker.port))
worker.setState(WorkerState.DECOMMISSIONED)
for (exec <- worker.executors.values) {
logInfo("Telling app of decommission executors")
exec.application.driver.send(ExecutorUpdated(
exec.id, ExecutorState.DECOMMISSIONED,
Some("worker decommissioned"), None, workerLost = false))
exec.state = ExecutorState.DECOMMISSIONED
exec.application.removeExecutor(exec)
}
// On recovery do not add a decommissioned executor
persistenceEngine.removeWorker(worker)
} else {
logWarning("Skipping decommissioning worker %s on %s:%d as worker is already decommissioned".
format(worker.id, worker.host, worker.port))
}
}

private def removeWorker(worker: WorkerInfo, msg: String): Unit = {
logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
worker.setState(WorkerState.DEAD)
Expand Down
26 changes: 26 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ private[deploy] class Worker(
Utils.checkHost(host)
assert (port > 0)

// If worker decommissioning is enabled register a handler on PWR to shutdown.
if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
logInfo("Registering SIGPWR handler to trigger decommissioning.")
SignalUtils.register("PWR")(decommissionSelf)
} else {
logInfo("Worker decommissioning not enabled, SIGPWR will result in exiting.")
}

// A scheduled executor used to send messages at the specified time.
private val forwardMessageScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler")
Expand Down Expand Up @@ -128,6 +136,7 @@ private[deploy] class Worker(
private val workerUri = RpcEndpointAddress(rpcEnv.address, endpointName).toString
private var registered = false
private var connected = false
private var decommissioned = false
private val workerId = generateWorkerId()
private val sparkHome =
if (sys.props.contains(IS_TESTING.key)) {
Expand Down Expand Up @@ -549,6 +558,8 @@ private[deploy] class Worker(
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, resources_) =>
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
} else if (decommissioned) {
logWarning("Asked to launch an executor while decommissioned. Not launching executor.")
} else {
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
Expand Down Expand Up @@ -672,6 +683,9 @@ private[deploy] class Worker(
case ApplicationFinished(id) =>
finishedApps += id
maybeCleanupApplication(id)

case DecommissionSelf =>
decommissionSelf()
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand Down Expand Up @@ -771,6 +785,18 @@ private[deploy] class Worker(
}
}

private[deploy] def decommissionSelf(): Boolean = {
if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
logDebug("Decommissioning self")
decommissioned = true
sendToMaster(WorkerDecommission(workerId, self))
} else {
logWarning("Asked to decommission self, but decommissioning not enabled")
}
// Return true since can be called as a signal handler
true
}

private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
val driverId = driverStateChanged.driverId
val exception = driverStateChanged.exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.spark.rpc._
import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, ThreadUtils, Utils}
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, SignalUtils, ThreadUtils, Utils}

private[spark] class CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
Expand All @@ -64,6 +64,7 @@ private[spark] class CoarseGrainedExecutorBackend(

private[this] val stopping = new AtomicBoolean(false)
var executor: Executor = null
@volatile private var decommissioned = false
@volatile var driver: Option[RpcEndpointRef] = None

// If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need
Expand All @@ -80,6 +81,9 @@ private[spark] class CoarseGrainedExecutorBackend(
private[executor] val taskResources = new mutable.HashMap[Long, Map[String, ResourceInformation]]

override def onStart(): Unit = {
logInfo("Registering PWR handler.")
SignalUtils.register("PWR")(decommissionSelf)

logInfo("Connecting to driver: " + driverUrl)
try {
_resources = parseOrFindResources(resourcesFileOpt)
Expand Down Expand Up @@ -160,6 +164,16 @@ private[spark] class CoarseGrainedExecutorBackend(
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
if (decommissioned) {
logError("Asked to launch a task while decommissioned.")
driver match {
case Some(endpoint) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that instead of doing this here, it should be done in onStart where the driver reference is created. That means the decommission message is sent to the driver as soon as possible after the signal arrives, instead of waiting for the driver to try to use the executor for something.

(That also means this block can go away and you can just keep the log message in Executor.scala.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what about when we are scaling down after the driver reference is created?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://docs.google.com/document/d/1xVO1b6KAwdUhjEJBolVPl9C6sLj7oOveErwDSYdT-pE/edit?disco=AAAAI73a0FM
I have marked a comment on this in design doc as well. I think it can be handled by the driver by not allocating tasks to the executor at the first place.
When driver is aware of the possible decommission of the node, it can stop allocating tasks to this executor. A small code change in the driver's org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#makeOffers

       // Filter out executors on decommissioning worker
        val activeExecutors = executorDataMap.filterKeys(isExecutorActive)
                                 .filter(x => !isNodeDecommissioning(x._2.executorHost))

For this there can be DecommissionTracker ( in the same lines as BlacklistTracker). The DT is filled when the driver is informed of the decommissioning is informed on the host. As comment on the design I have tried to elaborate the flow of populating DT.

  private def isNodeDecommissioning(hostname: String): Boolean = {
    decommissionTracker match {
      case None => return false
      case Some(decommissionTracker) => return decommissionTracker.isNodeDecommissioning(hostname)
    }
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So @itskals on Kubernetes and in standalone the driver won't know first (and those the only parts implemented in this PR). Certainly, when we go to implement YARN support we could try and short circuit talking to the executor -- however to enable later things like migrations I'd still want to send the message.

This could make a difference when we implement YARN support and I've added a note in the design doc so that we don't skip it then.

logInfo("Sending DecommissionExecutor to driver.")
endpoint.send(DecommissionExecutor(executorId))
case _ =>
logError("No registered driver to send Decommission to.")
}
}
val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
taskResources(taskDesc.taskId) = taskDesc.resources
Expand Down Expand Up @@ -242,6 +256,29 @@ private[spark] class CoarseGrainedExecutorBackend(

System.exit(code)
}

private def decommissionSelf(): Boolean = {
logInfo("Decommissioning self w/sync")
try {
decommissioned = true
// Tell master we are are decommissioned so it stops trying to schedule us
if (driver.nonEmpty) {
driver.get.askSync[Boolean](DecommissionExecutor(executorId))
Copy link
Contributor

@SaurabhChawla100 SaurabhChawla100 Feb 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of decommission Executor, Can we have Entire node decommission
eg. driver.get.askSync[Boolean](AddNodeToDecommission(hostname, terminationTime, NodeLossReason))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as previous comment, in standalone only sure, but in YARN/K8s we could see individual executors decommission.

} else {
logError("No driver to message decommissioning.")
}
if (executor != null) {
executor.decommission()
}
logInfo("Done decommissioning self.")
// Return true since we are handling a signal
true
} catch {
case e: Exception =>
logError(s"Error ${e} during attempt to decommission self")
false
}
}
}

private[spark] object CoarseGrainedExecutorBackend extends Logging {
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -216,16 +216,32 @@ private[spark] class Executor(
*/
private var heartbeatFailures = 0

/**
* Flag to prevent launching new tasks while decommissioned. There could be a race condition
* accessing this, but decommissioning is only intended to help not be a hard stop.
*/
private var decommissioned = false

heartbeater.start()

metricsPoller.start()

private[executor] def numRunningTasks: Int = runningTasks.size()

/**
* Mark an executor for decommissioning and avoid launching new tasks.
*/
private[spark] def decommission(): Unit = {
decommissioned = true
}

def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
val tr = new TaskRunner(context, taskDescription)
runningTasks.put(taskDescription.taskId, tr)
threadPool.execute(tr)
if (decommissioned) {
log.error(s"Launching a task while in decommissioned state.")
}
}

def killTask(taskId: Long, interruptThread: Boolean, reason: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,9 @@ private[spark] object Worker {
ConfigBuilder("spark.worker.ui.compressedLogFileLengthCacheSize")
.intConf
.createWithDefault(100)

private[spark] val WORKER_DECOMMISSION_ENABLED =
ConfigBuilder("spark.worker.decommission.enabled")
.booleanConf
.createWithDefault(false)
}
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ abstract class RDD[T: ClassTag](
readCachedBlock = false
computeOrReadCheckpoint(partition, context)
}) match {
// Block hit.
case Left(blockResult) =>
if (readCachedBlock) {
val existingMetrics = context.taskMetrics().inputMetrics
Expand All @@ -374,6 +375,7 @@ abstract class RDD[T: ClassTag](
} else {
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
}
// Need to compute the block.
case Right(iter) =>
new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,11 @@ private [spark] object LossReasonPending extends ExecutorLossReason("Pending los
private[spark]
case class SlaveLost(_message: String = "Slave lost", workerLost: Boolean = false)
extends ExecutorLossReason(_message)

/**
* A loss reason that means the executor is marked for decommissioning.
*
* This is used by the task scheduler to remove state associated with the executor, but
* not yet fail any tasks that were running in the executor before the executor is "fully" lost.
*/
private [spark] object ExecutorDecommission extends ExecutorLossReason("Executor decommission.")
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/Pool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ private[spark] class Pool(
schedulableQueue.asScala.foreach(_.executorLost(executorId, host, reason))
}

override def executorDecommission(executorId: String): Unit = {
schedulableQueue.asScala.foreach(_.executorDecommission(executorId))
}

override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {
var shouldRevive = false
for (schedulable <- schedulableQueue.asScala) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ private[spark] trait Schedulable {
def removeSchedulable(schedulable: Schedulable): Unit
def getSchedulableByName(name: String): Schedulable
def executorLost(executorId: String, host: String, reason: ExecutorLossReason): Unit
def executorDecommission(executorId: String): Unit
def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean
def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager]
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ private[spark] trait SchedulerBackend {

def start(): Unit
def stop(): Unit
/**
* Update the current offers and schedule tasks
*/
def reviveOffers(): Unit
def defaultParallelism(): Int

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ private[spark] trait TaskScheduler {
*/
def applicationId(): String = appId

/**
* Process a decommissioning executor.
*/
def executorDecommission(executorId: String): Unit

/**
* Process a lost executor
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,11 @@ private[spark] class TaskSchedulerImpl(
}
}

override def executorDecommission(executorId: String): Unit = {
rootPool.executorDecommission(executorId)
backend.reviveOffers()
}

override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {
var failedExecutor: Option[String] = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1083,6 +1083,12 @@ private[spark] class TaskSetManager(
levels.toArray
}

def executorDecommission(execId: String): Unit = {
recomputeLocality()
// Future consideration: if an executor is decommissioned it may make sense to add the current
// tasks to the spec exec queue.
}

def recomputeLocality(): Unit = {
// A zombie TaskSetManager may reach here while executorLost happens
if (isZombie) return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ private[spark] object CoarseGrainedClusterMessages {
case class RemoveExecutor(executorId: String, reason: ExecutorLossReason)
extends CoarseGrainedClusterMessage

case class DecommissionExecutor(executorId: String) extends CoarseGrainedClusterMessage

case class RemoveWorker(workerId: String, host: String, message: String)
extends CoarseGrainedClusterMessage

Expand Down
Loading