diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index 5d47f624ac8a3..84a46da6e0262 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -58,6 +58,19 @@ private[spark] trait ExecutorAllocationClient { */ def killExecutors(executorIds: Seq[String]): Seq[String] + /** + * Request that the cluster manager try harder to kill the specified executors, + * and maybe replace them. + * @return whether the request is acknowledged by the cluster manager. + */ + def killExecutors(executorIds: Seq[String], replace: Boolean, force: Boolean): Unit + + /** + * Request that the cluster manager kill every executor on the specified host. + * @return the ids of the executors acknowledged by the cluster manager to be removed + */ + def killExecutorsOnHost(host: String): Seq[String] + /** * Request that the cluster manager kill the specified executor. * @return whether the request is acknowledged by the cluster manager. diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 3a6cb2dd87f19..3730f0fe6c980 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -139,6 +139,11 @@ package object config { .timeConf(TimeUnit.MILLISECONDS) .createOptional + private[spark] val BLACKLIST_KILL_ENABLED = + ConfigBuilder("spark.blacklist.kill") + .booleanConf + .createOptional + private[spark] val BLACKLIST_LEGACY_TIMEOUT_CONF = ConfigBuilder("spark.scheduler.executorTaskBlacklistTime") .internal() diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index 21a22e92576ca..788a05ce7ee73 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicReference import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.internal.config import org.apache.spark.util.{Clock, SystemClock, Utils} @@ -50,10 +50,11 @@ import org.apache.spark.util.{Clock, SystemClock, Utils} private[scheduler] class BlacklistTracker ( private val listenerBus: LiveListenerBus, conf: SparkConf, + scheduler: Option[ExecutorAllocationClient], clock: Clock = new SystemClock()) extends Logging { - def this(sc: SparkContext) = { - this(sc.listenerBus, sc.getConf) + def this(sc: SparkContext, scheduler: Option[ExecutorAllocationClient]) = { + this(sc.listenerBus, sc.getConf, scheduler) } BlacklistTracker.validateBlacklistConfs(conf) @@ -169,6 +170,21 @@ private[scheduler] class BlacklistTracker ( if (newTotal >= MAX_FAILURES_PER_EXEC && !executorIdToBlacklistStatus.contains(exec)) { logInfo(s"Blacklisting executor id: $exec because it has $newTotal" + s" task failures in successful task sets") + conf.get(config.BLACKLIST_ENABLED) match { + case Some(enabled) => + if (enabled) { + scheduler match { + case Some(scheduler) => + logInfo(s"Killing blacklisted executor id $exec" + + s"since spark.blacklist.kill is set.") + scheduler.killExecutors(Seq(exec), true, true) + case None => + logWarning(s"Not attempting to kill blacklisted executor id $exec" + + s"since scheduler is not defined.") + } + } + case None => + } val node = failuresInTaskSet.node executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(node, expiryTime)) listenerBus.post( @@ -183,6 +199,21 @@ private[scheduler] class BlacklistTracker ( if (blacklistedExecsOnNode.size >= MAX_FAILED_EXEC_PER_NODE) { logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " + s"executors blacklisted: ${blacklistedExecsOnNode}") + conf.get(config.BLACKLIST_ENABLED) match { + case Some(enabled) => + if (enabled) { + scheduler match { + case Some(scheduler) => + logInfo(s"Killing all executors on blacklisted host $node" + + s"since spark.blacklist.kill is set.") + scheduler.killExecutorsOnHost(node) + case None => + logWarning(s"Not attempting to kill executors on blacklisted host $node" + + s"since scheduler is not defined.") + } + } + case None => + } nodeIdToBlacklistExpiryTime.put(node, expiryTime) listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size)) _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 086c8b0a0837c..15d237f3b73c9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -703,7 +703,11 @@ private[spark] object TaskSchedulerImpl { private def maybeCreateBlacklistTracker(sc: SparkContext): Option[BlacklistTracker] = { if (BlacklistTracker.isBlacklistEnabled(sc.conf)) { - Some(new BlacklistTracker(sc)) + val executorAllocClient: Option[ExecutorAllocationClient] = sc.schedulerBackend match { + case b: ExecutorAllocationClient => Some(b.asInstanceOf[ExecutorAllocationClient]) + case _ => None + } + Some(new BlacklistTracker(sc, executorAllocClient)) } else { None } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 684bf75c03d80..ed80a55606288 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -38,6 +38,8 @@ private[spark] object CoarseGrainedClusterMessages { case class KillTask(taskId: Long, executor: String, interruptThread: Boolean) extends CoarseGrainedClusterMessage + case class KillExecutorsOnHost(host: String) extends CoarseGrainedClusterMessage + sealed trait RegisterExecutorResponse case object RegisteredExecutor extends CoarseGrainedClusterMessage with RegisterExecutorResponse diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 10d55c87fb8de..c519e71d6905e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -145,6 +145,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Ignoring the task kill since the executor is not registered. logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.") } + + case KillExecutorsOnHost(host) => + scheduler.getExecutorsAliveOnHost(host).foreach(exec => + killExecutors(exec.toSeq, replace = true, force = true) + ) } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { @@ -547,7 +552,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * @return whether the kill request is acknowledged. If list to kill is empty, it will return * false. */ - final def killExecutors( + final override def killExecutors( executorIds: Seq[String], replace: Boolean, force: Boolean): Seq[String] = { @@ -603,6 +608,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp */ protected def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future.successful(false) + + /** + * Request that the cluster manager kill all executors on a given host. + * @return whether the kill request is acknowledged + */ + final override def killExecutorsOnHost(host: String): Unit = { + logInfo(s"Requesting to kill any and all executors on host ${host}") + driverEndpoint.send(KillExecutorsOnHost(host)) + } } private[spark] object CoarseGrainedSchedulerBackend { diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index ec409712b953c..491c5d2897c9d 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -1154,4 +1154,11 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend override def reviveOffers(): Unit = sb.reviveOffers() override def defaultParallelism(): Int = sb.defaultParallelism() + + // Unused. + override def killExecutorsOnHost(host: String): Seq[String] = { Seq.empty[String] } + + // Unused. + override def killExecutors(executorIds: Seq[String], replace: Boolean, force: Boolean) + : Unit = {} } diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index e29eb8552e134..5d31bce342b4d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -466,6 +466,23 @@ class StandaloneDynamicAllocationSuite } } + test("kill all executors on localhost") { + sc = new SparkContext(appConf) + val appId = sc.applicationId + eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(apps.size === 1) + assert(apps.head.id === appId) + assert(apps.head.executors.size === 2) + assert(apps.head.getExecutorLimit === Int.MaxValue) + } + // kill all executors + assert(killExecutorsOnHost(sc, "localhost").size == 2) + var apps = getApplications() + assert(apps.head.executors.size === 0) + assert(apps.head.getExecutorLimit === 0) + } + // =============================== // | Utility methods for testing | // =============================== @@ -527,6 +544,16 @@ class StandaloneDynamicAllocationSuite } } + /** Kill the executors on a given host. */ + private def killExecutorsOnHost(sc: SparkContext, host: String): Seq[String] = { + syncExecutors(sc) + sc.schedulerBackend match { + case b: CoarseGrainedSchedulerBackend => + b.killExecutorsOnHost(host) + case _ => fail("expected coarse grained scheduler") + } + } + /** * Return a list of executor IDs belonging to this application. * diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala index 1df20a0a3bbf5..7148e78218ca2 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala @@ -41,7 +41,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M scheduler = mockTaskSchedWithConf(conf) clock.setTime(0) - blacklist = new BlacklistTracker(null, conf, clock) + blacklist = new BlacklistTracker(null, conf, None, clock) } override def afterEach(): Unit = { @@ -90,7 +90,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M clock.setTime(0) listenerBusMock = mock[LiveListenerBus] - blacklist = new BlacklistTracker(listenerBusMock, conf, clock) + blacklist = new BlacklistTracker(listenerBusMock, conf, None, clock) } test("Blacklisting individual tasks and checking for SparkListenerEvents") { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 1fbfb4756e725..521de85f9a63c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -426,7 +426,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val clock = new ManualClock // We don't directly use the application blacklist, but its presence triggers blacklisting // within the taskset. - val blacklistTrackerOpt = Some(new BlacklistTracker(null, conf, clock)) + val blacklistTrackerOpt = Some(new BlacklistTracker(null, conf, None, clock)) val manager = new TaskSetManager(sched, taskSet, 4, blacklistTrackerOpt, clock) { diff --git a/docs/configuration.md b/docs/configuration.md index e527a268ce14d..6bc9b14907e40 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1334,6 +1334,15 @@ Apart from these, the following properties are also available, and may be useful may get marked as idle and be reclaimed by the cluster manager. + + spark.blacklist.kill + false + + (Experimental) If set to "true", allow Spark to automatically kill, and attempt to re-create, + executors when they are blacklisted. Note that, when an entire node is added to the blacklist, + all of the executors on that node will be killed. + + spark.speculation false