Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,19 @@ private[spark] trait ExecutorAllocationClient {
*/
def killExecutors(executorIds: Seq[String]): Seq[String]

/**
* Request that the cluster manager try harder to kill the specified executors,
* and maybe replace them.
* @return whether the request is acknowledged by the cluster manager.
*/
def killExecutors(executorIds: Seq[String], replace: Boolean, force: Boolean): Unit
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need this in the api, do you?


/**
* Request that the cluster manager kill every executor on the specified host.
* @return the ids of the executors acknowledged by the cluster manager to be removed
*/
def killExecutorsOnHost(host: String): Seq[String]

/**
* Request that the cluster manager kill the specified executor.
* @return whether the request is acknowledged by the cluster manager.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ package object config {
.timeConf(TimeUnit.MILLISECONDS)
.createOptional

private[spark] val BLACKLIST_KILL_ENABLED =
ConfigBuilder("spark.blacklist.kill")
.booleanConf
.createOptional

private[spark] val BLACKLIST_LEGACY_TIMEOUT_CONF =
ConfigBuilder("spark.scheduler.executorTaskBlacklistTime")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicReference

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.util.{Clock, SystemClock, Utils}
Expand Down Expand Up @@ -50,10 +50,11 @@ import org.apache.spark.util.{Clock, SystemClock, Utils}
private[scheduler] class BlacklistTracker (
private val listenerBus: LiveListenerBus,
conf: SparkConf,
scheduler: Option[ExecutorAllocationClient],
clock: Clock = new SystemClock()) extends Logging {

def this(sc: SparkContext) = {
this(sc.listenerBus, sc.getConf)
def this(sc: SparkContext, scheduler: Option[ExecutorAllocationClient]) = {
this(sc.listenerBus, sc.getConf, scheduler)
}

BlacklistTracker.validateBlacklistConfs(conf)
Expand Down Expand Up @@ -169,6 +170,21 @@ private[scheduler] class BlacklistTracker (
if (newTotal >= MAX_FAILURES_PER_EXEC && !executorIdToBlacklistStatus.contains(exec)) {
logInfo(s"Blacklisting executor id: $exec because it has $newTotal" +
s" task failures in successful task sets")
conf.get(config.BLACKLIST_ENABLED) match {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BlacklistTracker.isBlacklistEnabled

case Some(enabled) =>
if (enabled) {
scheduler match {
case Some(scheduler) =>
logInfo(s"Killing blacklisted executor id $exec" +
s"since spark.blacklist.kill is set.")
scheduler.killExecutors(Seq(exec), true, true)
case None =>
logWarning(s"Not attempting to kill blacklisted executor id $exec" +
s"since scheduler is not defined.")
}
}
case None =>
}
val node = failuresInTaskSet.node
executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(node, expiryTime))
listenerBus.post(
Expand All @@ -183,6 +199,21 @@ private[scheduler] class BlacklistTracker (
if (blacklistedExecsOnNode.size >= MAX_FAILED_EXEC_PER_NODE) {
logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " +
s"executors blacklisted: ${blacklistedExecsOnNode}")
conf.get(config.BLACKLIST_ENABLED) match {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BlacklistTracker.isBlacklistEnabled

case Some(enabled) =>
if (enabled) {
scheduler match {
case Some(scheduler) =>
logInfo(s"Killing all executors on blacklisted host $node" +
s"since spark.blacklist.kill is set.")
scheduler.killExecutorsOnHost(node)
case None =>
logWarning(s"Not attempting to kill executors on blacklisted host $node" +
s"since scheduler is not defined.")
}
}
case None =>
}
nodeIdToBlacklistExpiryTime.put(node, expiryTime)
listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size))
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,11 @@ private[spark] object TaskSchedulerImpl {

private def maybeCreateBlacklistTracker(sc: SparkContext): Option[BlacklistTracker] = {
if (BlacklistTracker.isBlacklistEnabled(sc.conf)) {
Some(new BlacklistTracker(sc))
val executorAllocClient: Option[ExecutorAllocationClient] = sc.schedulerBackend match {
case b: ExecutorAllocationClient => Some(b.asInstanceOf[ExecutorAllocationClient])
case _ => None
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe its best to just fail fast right here if blacklist.kill is enabled, but you don't have an ExecutorAllocationClient?

}
Some(new BlacklistTracker(sc, executorAllocClient))
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ private[spark] object CoarseGrainedClusterMessages {
case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
extends CoarseGrainedClusterMessage

case class KillExecutorsOnHost(host: String) extends CoarseGrainedClusterMessage

sealed trait RegisterExecutorResponse

case object RegisteredExecutor extends CoarseGrainedClusterMessage with RegisterExecutorResponse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Ignoring the task kill since the executor is not registered.
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
}

case KillExecutorsOnHost(host) =>
scheduler.getExecutorsAliveOnHost(host).foreach(exec =>
killExecutors(exec.toSeq, replace = true, force = true)
)
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand Down Expand Up @@ -547,7 +552,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* @return whether the kill request is acknowledged. If list to kill is empty, it will return
* false.
*/
final def killExecutors(
final override def killExecutors(
executorIds: Seq[String],
replace: Boolean,
force: Boolean): Seq[String] = {
Expand Down Expand Up @@ -603,6 +608,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
*/
protected def doKillExecutors(executorIds: Seq[String]): Future[Boolean] =
Future.successful(false)

/**
* Request that the cluster manager kill all executors on a given host.
* @return whether the kill request is acknowledged
*/
final override def killExecutorsOnHost(host: String): Unit = {
logInfo(s"Requesting to kill any and all executors on host ${host}")
driverEndpoint.send(KillExecutorsOnHost(host))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd include a comment here on why you do delegate this to driver endpoint, rather than doing it immediately here, something along the lines of:

We have to be careful that there isn't a race between killing all executors on the bad host, and another executor getting registered on the same host. We do that by doing it within the DriverEndpoint, which is guaranteed to handle one message at a time since its a ThreadSafeRPCEndpoint

}
}

private[spark] object CoarseGrainedSchedulerBackend {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1154,4 +1154,11 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend
override def reviveOffers(): Unit = sb.reviveOffers()

override def defaultParallelism(): Int = sb.defaultParallelism()

// Unused.
override def killExecutorsOnHost(host: String): Seq[String] = { Seq.empty[String] }

// Unused.
override def killExecutors(executorIds: Seq[String], replace: Boolean, force: Boolean)
: Unit = {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,23 @@ class StandaloneDynamicAllocationSuite
}
}

test("kill all executors on localhost") {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't really have anything to do with Dynamic Allocation, so this is a strange suite to put this test in. Though it does seem to have a useful framework -- perhaps there is a base class to pull out?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll refactor these tests when I start adding more tests for configuration parameters and the like.

sc = new SparkContext(appConf)
val appId = sc.applicationId
eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
assert(apps.size === 1)
assert(apps.head.id === appId)
assert(apps.head.executors.size === 2)
assert(apps.head.getExecutorLimit === Int.MaxValue)
}
// kill all executors
assert(killExecutorsOnHost(sc, "localhost").size == 2)
var apps = getApplications()
assert(apps.head.executors.size === 0)
assert(apps.head.getExecutorLimit === 0)
}

// ===============================
// | Utility methods for testing |
// ===============================
Expand Down Expand Up @@ -527,6 +544,16 @@ class StandaloneDynamicAllocationSuite
}
}

/** Kill the executors on a given host. */
private def killExecutorsOnHost(sc: SparkContext, host: String): Seq[String] = {
syncExecutors(sc)
sc.schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.killExecutorsOnHost(host)
case _ => fail("expected coarse grained scheduler")
}
}

/**
* Return a list of executor IDs belonging to this application.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
scheduler = mockTaskSchedWithConf(conf)

clock.setTime(0)
blacklist = new BlacklistTracker(null, conf, clock)
blacklist = new BlacklistTracker(null, conf, None, clock)
}

override def afterEach(): Unit = {
Expand Down Expand Up @@ -90,7 +90,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M

clock.setTime(0)
listenerBusMock = mock[LiveListenerBus]
blacklist = new BlacklistTracker(listenerBusMock, conf, clock)
blacklist = new BlacklistTracker(listenerBusMock, conf, None, clock)
}

test("Blacklisting individual tasks and checking for SparkListenerEvents") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val clock = new ManualClock
// We don't directly use the application blacklist, but its presence triggers blacklisting
// within the taskset.
val blacklistTrackerOpt = Some(new BlacklistTracker(null, conf, clock))
val blacklistTrackerOpt = Some(new BlacklistTracker(null, conf, None, clock))
val manager = new TaskSetManager(sched, taskSet, 4, blacklistTrackerOpt, clock)

{
Expand Down
9 changes: 9 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1334,6 +1334,15 @@ Apart from these, the following properties are also available, and may be useful
may get marked as idle and be reclaimed by the cluster manager.
</td>
</tr>
<tr>
<td><code>spark.blacklist.kill</code></td>
<td>false</td>
<td>
(Experimental) If set to "true", allow Spark to automatically kill, and attempt to re-create,
executors when they are blacklisted. Note that, when an entire node is added to the blacklist,
all of the executors on that node will be killed.
</td>
</tr>
<tr>
<td><code>spark.speculation</code></td>
<td>false</td>
Expand Down