Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,16 @@ private[spark] trait ExecutorAllocationClient {

/**
* Request that the cluster manager kill the specified executors.
* @return whether the request is acknowledged by the cluster manager.
* @return the ids of the executors acknowledged by the cluster manager to be removed.
*/
def killExecutors(executorIds: Seq[String]): Boolean
def killExecutors(executorIds: Seq[String]): Seq[String]

/**
* Request that the cluster manager kill the specified executor.
* @return whether the request is acknowledged by the cluster manager.
*/
def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId))
def killExecutor(executorId: String): Boolean = {
val killedExecutors = killExecutors(Seq(executorId))
killedExecutors.nonEmpty && killedExecutors(0).equals(executorId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I seriously hope the backend is not killing some random executor instead of the one that was asked...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't. Just added an additional check here to be sure. I need to change the short circuit operator here to &.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark
import java.util.concurrent.TimeUnit

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.control.ControlThrowable

import com.codahale.metrics.{Gauge, MetricRegistry}
Expand Down Expand Up @@ -279,14 +280,18 @@ private[spark] class ExecutorAllocationManager(

updateAndSyncNumExecutorsTarget(now)

val executorIdsToBeRemoved = ArrayBuffer[String]()
removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
if (expired) {
initializing = false
removeExecutor(executorId)
executorIdsToBeRemoved += executorId
}
!expired
}
if (executorIdsToBeRemoved.nonEmpty) {
removeExecutors(executorIdsToBeRemoved)
}
}

/**
Expand Down Expand Up @@ -391,11 +396,67 @@ private[spark] class ExecutorAllocationManager(
}
}

/**
* Request the cluster manager to remove the given executors.
* Returns the list of executors which are removed.
*/
private def removeExecutors(executors: Seq[String]): Seq[String] = synchronized {
val executorIdsToBeRemoved = new ArrayBuffer[String]

logInfo("Request to remove executorIds: " + executors.mkString(", "))
val numExistingExecutors = allocationManager.executorIds.size - executorsPendingToRemove.size

var newExecutorTotal = numExistingExecutors
executors.foreach { executorIdToBeRemoved =>
if (newExecutorTotal - 1 < minNumExecutors) {
logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " +
s"$newExecutorTotal executor(s) left (limit $minNumExecutors)")
} else if (canBeKilled(executorIdToBeRemoved)) {
executorIdsToBeRemoved += executorIdToBeRemoved
newExecutorTotal -= 1
}
}

if (executorIdsToBeRemoved.isEmpty) {
return Seq.empty[String]
}

// Send a request to the backend to kill this executor(s)
val executorsRemoved = if (testing) {
executorIdsToBeRemoved
} else {
client.killExecutors(executorIdsToBeRemoved)
}
// reset the newExecutorTotal to the existing number of executors
newExecutorTotal = numExistingExecutors
if (testing || executorsRemoved.nonEmpty) {
executorsRemoved.foreach { removedExecutorId =>
newExecutorTotal -= 1
logInfo(s"Removing executor $removedExecutorId because it has been idle for " +
s"$executorIdleTimeoutS seconds (new desired total will be $newExecutorTotal)")
executorsPendingToRemove.add(removedExecutorId)
}
executorsRemoved
} else {
logWarning(s"Unable to reach the cluster manager to kill executor/s " +
"executorIdsToBeRemoved.mkString(\",\") or no executor eligible to kill!")
Seq.empty[String]
}
}

/**
* Request the cluster manager to remove the given executor.
* Return whether the request is received.
* Return whether the request is acknowledged.
*/
private def removeExecutor(executorId: String): Boolean = synchronized {
val executorsRemoved = removeExecutors(Seq(executorId))
executorsRemoved.nonEmpty && executorsRemoved(0) == executorId
}

/**
* Determine if the given executor can be killed.
*/
private def canBeKilled(executorId: String): Boolean = synchronized {
// Do not kill the executor if we are not aware of it (should never happen)
if (!executorIds.contains(executorId)) {
logWarning(s"Attempted to remove unknown executor $executorId!")
Expand All @@ -409,26 +470,7 @@ private[spark] class ExecutorAllocationManager(
return false
}

// Do not kill the executor if we have already reached the lower bound
val numExistingExecutors = executorIds.size - executorsPendingToRemove.size
if (numExistingExecutors - 1 < minNumExecutors) {
logDebug(s"Not removing idle executor $executorId because there are only " +
s"$numExistingExecutors executor(s) left (limit $minNumExecutors)")
return false
}

// Send a request to the backend to kill this executor
val removeRequestAcknowledged = testing || client.killExecutor(executorId)
if (removeRequestAcknowledged) {
logInfo(s"Removing executor $executorId because it has been idle for " +
s"$executorIdleTimeoutS seconds (new desired total will be ${numExistingExecutors - 1})")
executorsPendingToRemove.add(executorId)
true
} else {
logWarning(s"Unable to reach the cluster manager to kill executor $executorId," +
s"or no executor eligible to kill!")
false
}
true
}

/**
Expand Down
24 changes: 15 additions & 9 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ import org.apache.spark.util._
* @param config a Spark Config object describing the application configuration. Any settings in
* this config overrides the default configs as well as system properties.
*/
class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
class SparkContext(config: SparkConf) extends Logging {

// The call site where this SparkContext was constructed.
private val creationSite: CallSite = Utils.getCallSite()
Expand Down Expand Up @@ -533,7 +533,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
_executorAllocationManager =
if (dynamicAllocationEnabled) {
Some(new ExecutorAllocationManager(this, listenerBus, _conf))
schedulerBackend match {
case b: ExecutorAllocationClient =>
Some(new ExecutorAllocationManager(
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf))
case _ =>
None
}
} else {
None
}
Expand Down Expand Up @@ -1472,7 +1478,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
listenerBus.addListener(listener)
}

private[spark] override def getExecutorIds(): Seq[String] = {
private[spark] def getExecutorIds(): Seq[String] = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.getExecutorIds()
Expand All @@ -1497,7 +1503,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @return whether the request is acknowledged by the cluster manager.
*/
@DeveloperApi
override def requestTotalExecutors(
def requestTotalExecutors(
numExecutors: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: scala.collection.immutable.Map[String, Int]
Expand All @@ -1517,7 +1523,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @return whether the request is received.
*/
@DeveloperApi
override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
def requestExecutors(numAdditionalExecutors: Int): Boolean = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.requestExecutors(numAdditionalExecutors)
Expand All @@ -1539,10 +1545,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @return whether the request is received.
*/
@DeveloperApi
override def killExecutors(executorIds: Seq[String]): Boolean = {
def killExecutors(executorIds: Seq[String]): Boolean = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.killExecutors(executorIds, replace = false, force = true)
b.killExecutors(executorIds, replace = false, force = true).nonEmpty
case _ =>
logWarning("Killing executors is only supported in coarse-grained mode")
false
Expand All @@ -1561,7 +1567,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @return whether the request is received.
*/
@DeveloperApi
override def killExecutor(executorId: String): Boolean = super.killExecutor(executorId)
def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId))

/**
* Request that the cluster manager kill the specified executor without adjusting the
Expand All @@ -1580,7 +1586,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private[spark] def killAndReplaceExecutor(executorId: String): Boolean = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.killExecutors(Seq(executorId), replace = true, force = true)
b.killExecutors(Seq(executorId), replace = true, force = true).nonEmpty
case _ =>
logWarning("Killing executors is only supported in coarse-grained mode")
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* @return whether the kill request is acknowledged. If list to kill is empty, it will return
* false.
*/
final override def killExecutors(executorIds: Seq[String]): Boolean = {
final override def killExecutors(executorIds: Seq[String]): Seq[String] = {
killExecutors(executorIds, replace = false, force = false)
}

Expand All @@ -548,7 +548,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
final def killExecutors(
executorIds: Seq[String],
replace: Boolean,
force: Boolean): Boolean = {
force: Boolean): Seq[String] = {
logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")

val response = synchronized {
Expand All @@ -564,6 +564,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
.filter { id => force || !scheduler.isExecutorBusy(id) }
executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace }

logInfo(s"Actual list of executor(s) to be killed is ${executorsToKill.mkString(", ")}")

// If we do not wish to replace the executors we kill, sync the target number of executors
// with the cluster manager to avoid allocating new ones. When computing the new target,
// take into account executors that are pending to be added or removed.
Expand All @@ -583,7 +585,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
_ => Future.successful(false)
}

adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)
val killResponse = adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)

killResponse.flatMap(killSuccessful =>
Future.successful (if (killSuccessful) executorsToKill else Seq.empty[String])
)(ThreadUtils.sameThread)
}

defaultAskTimeout.awaitResult(response)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
org.apache.spark.scheduler.DummyExternalClusterManager
org.apache.spark.scheduler.MockExternalClusterManager
org.apache.spark.scheduler.MockExternalClusterManager
org.apache.spark.DummyLocalExternalClusterManager
Loading