From 414f99f23cf180294df3be0bc0ff1b615cec36a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Wed, 30 Nov 2016 16:45:06 -0600 Subject: [PATCH 01/11] BlacklistTracker takes a SchedulerBackend as input --- .../spark/scheduler/BlacklistTracker.scala | 8 ++++--- .../spark/scheduler/TaskSchedulerImpl.scala | 23 ++++++++++++------- .../CoarseGrainedSchedulerBackend.scala | 4 ++-- .../scheduler/BlacklistTrackerSuite.scala | 4 ++-- .../spark/scheduler/TaskSetManagerSuite.scala | 2 +- 5 files changed, 25 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index 21a22e92576ca..06d460a8eb4f9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -21,9 +21,10 @@ import java.util.concurrent.atomic.AtomicReference import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.internal.config +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.{Clock, SystemClock, Utils} /** @@ -50,10 +51,11 @@ import org.apache.spark.util.{Clock, SystemClock, Utils} private[scheduler] class BlacklistTracker ( private val listenerBus: LiveListenerBus, conf: SparkConf, + scheduler: Option[CoarseGrainedSchedulerBackend], clock: Clock = new SystemClock()) extends Logging { - def this(sc: SparkContext) = { - this(sc.listenerBus, sc.getConf) + def this(sched: CoarseGrainedSchedulerBackend) = { + this(sched.listenerBus, sched.conf, Some(sched)) } BlacklistTracker.validateBlacklistConfs(conf) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 086c8b0a0837c..55bf404e95fd6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -32,6 +32,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.scheduler.TaskLocality.TaskLocality +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.local.LocalSchedulerBackend import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils} @@ -54,7 +55,7 @@ import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils} private[spark] class TaskSchedulerImpl private[scheduler]( val sc: SparkContext, val maxTaskFailures: Int, - blacklistTrackerOpt: Option[BlacklistTracker], + var blacklistTrackerOpt: Option[BlacklistTracker], isLocal: Boolean = false) extends TaskScheduler with Logging { @@ -63,14 +64,14 @@ private[spark] class TaskSchedulerImpl private[scheduler]( this( sc, sc.conf.get(config.MAX_TASK_FAILURES), - TaskSchedulerImpl.maybeCreateBlacklistTracker(sc)) + null) } def this(sc: SparkContext, maxTaskFailures: Int, isLocal: Boolean) = { this( sc, maxTaskFailures, - TaskSchedulerImpl.maybeCreateBlacklistTracker(sc), + null, isLocal = isLocal) } @@ -148,6 +149,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( def initialize(backend: SchedulerBackend) { this.backend = backend + blacklistTrackerOpt = TaskSchedulerImpl.maybeCreateBlacklistTracker(backend) // temporarily set rootPool name to empty rootPool = new Pool("", schedulingMode, 0, 0) schedulableBuilder = { @@ -701,11 +703,16 @@ private[spark] object TaskSchedulerImpl { retval.toList } - private def maybeCreateBlacklistTracker(sc: SparkContext): Option[BlacklistTracker] = { - if (BlacklistTracker.isBlacklistEnabled(sc.conf)) { - Some(new BlacklistTracker(sc)) - } else { - None + private def maybeCreateBlacklistTracker(sched: SchedulerBackend): Option[BlacklistTracker] = { + sched match { + case backend: CoarseGrainedSchedulerBackend => + if (BlacklistTracker.isBlacklistEnabled(backend.conf)) { + Some(new BlacklistTracker(backend)) + } else { + None + } + case _ => + None } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 10d55c87fb8de..0d5976a7b3dc3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -49,7 +49,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp protected val totalCoreCount = new AtomicInteger(0) // Total number of executors that are currently registered protected val totalRegisteredExecutors = new AtomicInteger(0) - protected val conf = scheduler.sc.conf + val conf = scheduler.sc.conf private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf) private val defaultAskTimeout = RpcUtils.askRpcTimeout(conf) // Submit tasks only after (registered resources / total expected resources) @@ -73,7 +73,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp @GuardedBy("CoarseGrainedSchedulerBackend.this") private var numPendingExecutors = 0 - private val listenerBus = scheduler.sc.listenerBus + val listenerBus = scheduler.sc.listenerBus // Executors we have requested the cluster manager to kill that have not died yet; maps // the executor ID to whether it was explicitly killed by the driver (and thus shouldn't diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala index 1df20a0a3bbf5..9a9c904c19235 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala @@ -41,7 +41,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M scheduler = mockTaskSchedWithConf(conf) clock.setTime(0) - blacklist = new BlacklistTracker(null, conf, clock) + blacklist = new BlacklistTracker(null, conf, null, clock) } override def afterEach(): Unit = { @@ -90,7 +90,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M clock.setTime(0) listenerBusMock = mock[LiveListenerBus] - blacklist = new BlacklistTracker(listenerBusMock, conf, clock) + blacklist = new BlacklistTracker(listenerBusMock, conf, null, clock) } test("Blacklisting individual tasks and checking for SparkListenerEvents") { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 1fbfb4756e725..981b636a00e7b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -426,7 +426,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val clock = new ManualClock // We don't directly use the application blacklist, but its presence triggers blacklisting // within the taskset. - val blacklistTrackerOpt = Some(new BlacklistTracker(null, conf, clock)) + val blacklistTrackerOpt = Some(new BlacklistTracker(null, conf, null, clock)) val manager = new TaskSetManager(sched, taskSet, 4, blacklistTrackerOpt, clock) { From 719f93c128b418134c1cd4cd8966d7edd8532c0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Wed, 30 Nov 2016 21:03:40 -0600 Subject: [PATCH 02/11] Revert "BlacklistTracker takes a SchedulerBackend as input" This reverts commit 414f99f23cf180294df3be0bc0ff1b615cec36a3. --- .../spark/scheduler/BlacklistTracker.scala | 8 +++---- .../spark/scheduler/TaskSchedulerImpl.scala | 23 +++++++------------ .../CoarseGrainedSchedulerBackend.scala | 4 ++-- .../scheduler/BlacklistTrackerSuite.scala | 4 ++-- .../spark/scheduler/TaskSetManagerSuite.scala | 2 +- 5 files changed, 16 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index 06d460a8eb4f9..21a22e92576ca 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -21,10 +21,9 @@ import java.util.concurrent.atomic.AtomicReference import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.internal.config -import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.{Clock, SystemClock, Utils} /** @@ -51,11 +50,10 @@ import org.apache.spark.util.{Clock, SystemClock, Utils} private[scheduler] class BlacklistTracker ( private val listenerBus: LiveListenerBus, conf: SparkConf, - scheduler: Option[CoarseGrainedSchedulerBackend], clock: Clock = new SystemClock()) extends Logging { - def this(sched: CoarseGrainedSchedulerBackend) = { - this(sched.listenerBus, sched.conf, Some(sched)) + def this(sc: SparkContext) = { + this(sc.listenerBus, sc.getConf) } BlacklistTracker.validateBlacklistConfs(conf) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 55bf404e95fd6..086c8b0a0837c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -32,7 +32,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.scheduler.TaskLocality.TaskLocality -import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.local.LocalSchedulerBackend import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils} @@ -55,7 +54,7 @@ import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils} private[spark] class TaskSchedulerImpl private[scheduler]( val sc: SparkContext, val maxTaskFailures: Int, - var blacklistTrackerOpt: Option[BlacklistTracker], + blacklistTrackerOpt: Option[BlacklistTracker], isLocal: Boolean = false) extends TaskScheduler with Logging { @@ -64,14 +63,14 @@ private[spark] class TaskSchedulerImpl private[scheduler]( this( sc, sc.conf.get(config.MAX_TASK_FAILURES), - null) + TaskSchedulerImpl.maybeCreateBlacklistTracker(sc)) } def this(sc: SparkContext, maxTaskFailures: Int, isLocal: Boolean) = { this( sc, maxTaskFailures, - null, + TaskSchedulerImpl.maybeCreateBlacklistTracker(sc), isLocal = isLocal) } @@ -149,7 +148,6 @@ private[spark] class TaskSchedulerImpl private[scheduler]( def initialize(backend: SchedulerBackend) { this.backend = backend - blacklistTrackerOpt = TaskSchedulerImpl.maybeCreateBlacklistTracker(backend) // temporarily set rootPool name to empty rootPool = new Pool("", schedulingMode, 0, 0) schedulableBuilder = { @@ -703,16 +701,11 @@ private[spark] object TaskSchedulerImpl { retval.toList } - private def maybeCreateBlacklistTracker(sched: SchedulerBackend): Option[BlacklistTracker] = { - sched match { - case backend: CoarseGrainedSchedulerBackend => - if (BlacklistTracker.isBlacklistEnabled(backend.conf)) { - Some(new BlacklistTracker(backend)) - } else { - None - } - case _ => - None + private def maybeCreateBlacklistTracker(sc: SparkContext): Option[BlacklistTracker] = { + if (BlacklistTracker.isBlacklistEnabled(sc.conf)) { + Some(new BlacklistTracker(sc)) + } else { + None } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 0d5976a7b3dc3..10d55c87fb8de 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -49,7 +49,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp protected val totalCoreCount = new AtomicInteger(0) // Total number of executors that are currently registered protected val totalRegisteredExecutors = new AtomicInteger(0) - val conf = scheduler.sc.conf + protected val conf = scheduler.sc.conf private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf) private val defaultAskTimeout = RpcUtils.askRpcTimeout(conf) // Submit tasks only after (registered resources / total expected resources) @@ -73,7 +73,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp @GuardedBy("CoarseGrainedSchedulerBackend.this") private var numPendingExecutors = 0 - val listenerBus = scheduler.sc.listenerBus + private val listenerBus = scheduler.sc.listenerBus // Executors we have requested the cluster manager to kill that have not died yet; maps // the executor ID to whether it was explicitly killed by the driver (and thus shouldn't diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala index 9a9c904c19235..1df20a0a3bbf5 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala @@ -41,7 +41,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M scheduler = mockTaskSchedWithConf(conf) clock.setTime(0) - blacklist = new BlacklistTracker(null, conf, null, clock) + blacklist = new BlacklistTracker(null, conf, clock) } override def afterEach(): Unit = { @@ -90,7 +90,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M clock.setTime(0) listenerBusMock = mock[LiveListenerBus] - blacklist = new BlacklistTracker(listenerBusMock, conf, null, clock) + blacklist = new BlacklistTracker(listenerBusMock, conf, clock) } test("Blacklisting individual tasks and checking for SparkListenerEvents") { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 981b636a00e7b..1fbfb4756e725 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -426,7 +426,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val clock = new ManualClock // We don't directly use the application blacklist, but its presence triggers blacklisting // within the taskset. - val blacklistTrackerOpt = Some(new BlacklistTracker(null, conf, null, clock)) + val blacklistTrackerOpt = Some(new BlacklistTracker(null, conf, clock)) val manager = new TaskSetManager(sched, taskSet, 4, blacklistTrackerOpt, clock) { From 2566ce7dd91e59291220dd0a121e657eab37fe54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Wed, 30 Nov 2016 21:43:30 -0600 Subject: [PATCH 03/11] Add killExecutorsOnHost. Needs a unit test. --- .../spark/ExecutorAllocationClient.scala | 6 +++++ .../CoarseGrainedSchedulerBackend.scala | 24 +++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index 5d47f624ac8a3..6b7af441a3800 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -58,6 +58,12 @@ private[spark] trait ExecutorAllocationClient { */ def killExecutors(executorIds: Seq[String]): Seq[String] + /** + * Request that the cluster manager kill every executor on the specified host. + * @return the ids of the executors acknowledged by the cluster manager to be removed + */ + def killExecutorsOnHost(host: String): Seq[String] + /** * Request that the cluster manager kill the specified executor. * @return whether the request is acknowledged by the cluster manager. diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 10d55c87fb8de..34ad4696a8acd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -92,6 +92,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // The num of current max ExecutorId used to re-register appMaster @volatile protected var currentExecutorIdCounter = 0 + // The set of executors we have on each host. + protected val hostToExecutors = new HashMap[String, HashSet[String]] + class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends ThreadSafeRpcEndpoint with Logging { @@ -182,6 +185,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorRef.send(RegisteredExecutor) // Note: some tests expect the reply to come after we put the executor in the map context.reply(true) + if (!hostToExecutors.contains(hostname)) { + hostToExecutors(hostname) = new HashSet[String]() + } + hostToExecutors(hostname) += executorId listenerBus.post( SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data)) makeOffers() @@ -204,6 +211,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // automatically, so try to tell the executor to stop itself. See SPARK-13519. executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor)) removeExecutor(executorId, reason) + hostToExecutors.foreach( e => { + val execs = e._2 + if (execs.contains(executorId)) execs -= executorId + if (execs.isEmpty) { + val host = e._1 + hostToExecutors -= host + } + }) context.reply(true) case RetrieveSparkProps => @@ -603,6 +618,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp */ protected def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future.successful(false) + + /** + * Request that the cluster manager kill all executors on a given host. + * @return whether the kill request is acknowledged + */ + final override def killExecutorsOnHost(host: String): Seq[String] = { + logInfo(s"Requesting to kill any and all executors on host ${host}") + killExecutors(hostToExecutors(host).toSeq, replace = false, force = true) + } } private[spark] object CoarseGrainedSchedulerBackend { From 44cf4d8b11a03900c16e4cc4d7ca620af1beac10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Wed, 30 Nov 2016 22:01:42 -0600 Subject: [PATCH 04/11] Add test case for killExecutorsOnHost --- .../ExecutorAllocationManagerSuite.scala | 2 ++ .../StandaloneDynamicAllocationSuite.scala | 27 +++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index ec409712b953c..87b022fb37cb8 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -1147,6 +1147,8 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend } } + override def killExecutorsOnHost(host: String): Seq[String] = { Seq.empty[String] } + override def start(): Unit = sb.start() override def stop(): Unit = sb.stop() diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index e29eb8552e134..5d31bce342b4d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -466,6 +466,23 @@ class StandaloneDynamicAllocationSuite } } + test("kill all executors on localhost") { + sc = new SparkContext(appConf) + val appId = sc.applicationId + eventually(timeout(10.seconds), interval(10.millis)) { + val apps = getApplications() + assert(apps.size === 1) + assert(apps.head.id === appId) + assert(apps.head.executors.size === 2) + assert(apps.head.getExecutorLimit === Int.MaxValue) + } + // kill all executors + assert(killExecutorsOnHost(sc, "localhost").size == 2) + var apps = getApplications() + assert(apps.head.executors.size === 0) + assert(apps.head.getExecutorLimit === 0) + } + // =============================== // | Utility methods for testing | // =============================== @@ -527,6 +544,16 @@ class StandaloneDynamicAllocationSuite } } + /** Kill the executors on a given host. */ + private def killExecutorsOnHost(sc: SparkContext, host: String): Seq[String] = { + syncExecutors(sc) + sc.schedulerBackend match { + case b: CoarseGrainedSchedulerBackend => + b.killExecutorsOnHost(host) + case _ => fail("expected coarse grained scheduler") + } + } + /** * Return a list of executor IDs belonging to this application. * From 63da9a37d1d08d0850bb339200ffcd837b75871c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Wed, 30 Nov 2016 22:36:26 -0600 Subject: [PATCH 05/11] BlacklistTracker can ask the SparkContext to kill executors on a host. Still need to wire in configuration. --- .../scala/org/apache/spark/SparkContext.scala | 22 +++++++++++++++++++ .../spark/scheduler/BlacklistTracker.scala | 5 ++++- .../scheduler/BlacklistTrackerSuite.scala | 4 ++-- .../spark/scheduler/TaskSetManagerSuite.scala | 2 +- 4 files changed, 29 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4694790c72cd8..0a87bac39312c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1570,6 +1570,28 @@ class SparkContext(config: SparkConf) extends Logging { @DeveloperApi def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId)) + /** + * :: DeveloperApo :: + * Request that the cluster manager kill all executors on the specified host. + * + * Note: This is an indication to the cluster manager that the application wishes to adjust + * its resource usage downwards. If the application wishes to replace the executor it kills + * through this method with a new one, it should follow up explicitly with a call to + * {{SparkContext#requestExecutors}}. + * + * @return whether the request is received. + */ + @DeveloperApi + def killExecutorsOnHost(host: String): Boolean = { + schedulerBackend match { + case b: CoarseGrainedSchedulerBackend => + b.killExecutorsOnHost(host).nonEmpty + case _ => + logWarning("Killing executors is only supported in coarse-grained mode") + false + } + } + /** * Request that the cluster manager kill the specified executor without adjusting the * application resource requirements. diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index 21a22e92576ca..8c7d6c4b62858 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -50,10 +50,11 @@ import org.apache.spark.util.{Clock, SystemClock, Utils} private[scheduler] class BlacklistTracker ( private val listenerBus: LiveListenerBus, conf: SparkConf, + sc: Option[SparkContext], clock: Clock = new SystemClock()) extends Logging { def this(sc: SparkContext) = { - this(sc.listenerBus, sc.getConf) + this(sc.listenerBus, sc.getConf, Some(sc)) } BlacklistTracker.validateBlacklistConfs(conf) @@ -184,6 +185,8 @@ private[scheduler] class BlacklistTracker ( logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " + s"executors blacklisted: ${blacklistedExecsOnNode}") nodeIdToBlacklistExpiryTime.put(node, expiryTime) + // TODO Only do this if a config value is set. + sc.foreach(context => context.killExecutorsOnHost(node)) listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size)) _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala index 1df20a0a3bbf5..7148e78218ca2 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala @@ -41,7 +41,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M scheduler = mockTaskSchedWithConf(conf) clock.setTime(0) - blacklist = new BlacklistTracker(null, conf, clock) + blacklist = new BlacklistTracker(null, conf, None, clock) } override def afterEach(): Unit = { @@ -90,7 +90,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M clock.setTime(0) listenerBusMock = mock[LiveListenerBus] - blacklist = new BlacklistTracker(listenerBusMock, conf, clock) + blacklist = new BlacklistTracker(listenerBusMock, conf, None, clock) } test("Blacklisting individual tasks and checking for SparkListenerEvents") { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 1fbfb4756e725..521de85f9a63c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -426,7 +426,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val clock = new ManualClock // We don't directly use the application blacklist, but its presence triggers blacklisting // within the taskset. - val blacklistTrackerOpt = Some(new BlacklistTracker(null, conf, clock)) + val blacklistTrackerOpt = Some(new BlacklistTracker(null, conf, None, clock)) val manager = new TaskSetManager(sched, taskSet, 4, blacklistTrackerOpt, clock) { From f4ad62dfed603e0d85c41f8f0d3b203c134f00f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Fri, 2 Dec 2016 11:14:12 -0600 Subject: [PATCH 06/11] Respond to review feedback: basic changes --- .../scala/org/apache/spark/SparkContext.scala | 7 +------ .../spark/scheduler/BlacklistTracker.scala | 10 +++++----- .../spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../cluster/CoarseGrainedSchedulerBackend.scala | 17 +---------------- 4 files changed, 8 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 0a87bac39312c..76b5acab1a54c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1571,14 +1571,9 @@ class SparkContext(config: SparkConf) extends Logging { def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId)) /** - * :: DeveloperApo :: + * :: DeveloperApi :: * Request that the cluster manager kill all executors on the specified host. * - * Note: This is an indication to the cluster manager that the application wishes to adjust - * its resource usage downwards. If the application wishes to replace the executor it kills - * through this method with a new one, it should follow up explicitly with a call to - * {{SparkContext#requestExecutors}}. - * * @return whether the request is received. */ @DeveloperApi diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index 8c7d6c4b62858..556a657039237 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicReference import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.internal.config import org.apache.spark.util.{Clock, SystemClock, Utils} @@ -50,11 +50,11 @@ import org.apache.spark.util.{Clock, SystemClock, Utils} private[scheduler] class BlacklistTracker ( private val listenerBus: LiveListenerBus, conf: SparkConf, - sc: Option[SparkContext], + scheduler: ExecutorAllocationClient, clock: Clock = new SystemClock()) extends Logging { - def this(sc: SparkContext) = { - this(sc.listenerBus, sc.getConf, Some(sc)) + def this(sc: SparkContext, scheduler: ExecutorAllocationClient) = { + this(sc.listenerBus, sc.getConf, scheduler) } BlacklistTracker.validateBlacklistConfs(conf) @@ -186,7 +186,7 @@ private[scheduler] class BlacklistTracker ( s"executors blacklisted: ${blacklistedExecsOnNode}") nodeIdToBlacklistExpiryTime.put(node, expiryTime) // TODO Only do this if a config value is set. - sc.foreach(context => context.killExecutorsOnHost(node)) + scheduler.killExecutorsOnHost(node) listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size)) _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 086c8b0a0837c..6ee5a30ff6e2a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -703,7 +703,7 @@ private[spark] object TaskSchedulerImpl { private def maybeCreateBlacklistTracker(sc: SparkContext): Option[BlacklistTracker] = { if (BlacklistTracker.isBlacklistEnabled(sc.conf)) { - Some(new BlacklistTracker(sc)) + Some(new BlacklistTracker(sc, scheduler)) } else { None } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 34ad4696a8acd..4d8fe12c91642 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -92,9 +92,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // The num of current max ExecutorId used to re-register appMaster @volatile protected var currentExecutorIdCounter = 0 - // The set of executors we have on each host. - protected val hostToExecutors = new HashMap[String, HashSet[String]] - class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends ThreadSafeRpcEndpoint with Logging { @@ -185,10 +182,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorRef.send(RegisteredExecutor) // Note: some tests expect the reply to come after we put the executor in the map context.reply(true) - if (!hostToExecutors.contains(hostname)) { - hostToExecutors(hostname) = new HashSet[String]() - } - hostToExecutors(hostname) += executorId listenerBus.post( SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data)) makeOffers() @@ -211,14 +204,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // automatically, so try to tell the executor to stop itself. See SPARK-13519. executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor)) removeExecutor(executorId, reason) - hostToExecutors.foreach( e => { - val execs = e._2 - if (execs.contains(executorId)) execs -= executorId - if (execs.isEmpty) { - val host = e._1 - hostToExecutors -= host - } - }) context.reply(true) case RetrieveSparkProps => @@ -625,7 +610,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp */ final override def killExecutorsOnHost(host: String): Seq[String] = { logInfo(s"Requesting to kill any and all executors on host ${host}") - killExecutors(hostToExecutors(host).toSeq, replace = false, force = true) + killExecutors(scheduler.getExecutorsAliveOnHost(host).get.toSeq, replace = true, force = true) } } From 4f4d21d439d8691aa18f22355a18a7fb86ded36a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Fri, 2 Dec 2016 15:13:07 -0600 Subject: [PATCH 07/11] Add documentation for configuration.md --- docs/configuration.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/docs/configuration.md b/docs/configuration.md index e527a268ce14d..6bc9b14907e40 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1334,6 +1334,15 @@ Apart from these, the following properties are also available, and may be useful may get marked as idle and be reclaimed by the cluster manager. + + spark.blacklist.kill + false + + (Experimental) If set to "true", allow Spark to automatically kill, and attempt to re-create, + executors when they are blacklisted. Note that, when an entire node is added to the blacklist, + all of the executors on that node will be killed. + + spark.speculation false From 314001783c83e02e3c6da3db2b965d2888fad319 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Fri, 2 Dec 2016 16:33:13 -0600 Subject: [PATCH 08/11] First implementation of actual executor killing in BlacklistTracker --- .../spark/ExecutorAllocationClient.scala | 7 ++++ .../spark/internal/config/package.scala | 5 +++ .../spark/scheduler/BlacklistTracker.scala | 37 +++++++++++++++++-- .../spark/scheduler/TaskSchedulerImpl.scala | 6 ++- .../CoarseGrainedSchedulerBackend.scala | 2 +- .../ExecutorAllocationManagerSuite.scala | 9 ++++- 6 files changed, 58 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index 6b7af441a3800..a35338cb64a86 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -58,6 +58,13 @@ private[spark] trait ExecutorAllocationClient { */ def killExecutors(executorIds: Seq[String]): Seq[String] + /** + * Request that the cluster manager try harder to kill the specified executors, + * and maybe replace them. + * @return whether the request is acknowledged by the cluster manager. + */ + def killExecutors(executorIds: Seq[String], replace: Boolean, force: Boolean): Seq[String] + /** * Request that the cluster manager kill every executor on the specified host. * @return the ids of the executors acknowledged by the cluster manager to be removed diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 3a6cb2dd87f19..3730f0fe6c980 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -139,6 +139,11 @@ package object config { .timeConf(TimeUnit.MILLISECONDS) .createOptional + private[spark] val BLACKLIST_KILL_ENABLED = + ConfigBuilder("spark.blacklist.kill") + .booleanConf + .createOptional + private[spark] val BLACKLIST_LEGACY_TIMEOUT_CONF = ConfigBuilder("spark.scheduler.executorTaskBlacklistTime") .internal() diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index 556a657039237..e822d17cac7f0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -50,10 +50,10 @@ import org.apache.spark.util.{Clock, SystemClock, Utils} private[scheduler] class BlacklistTracker ( private val listenerBus: LiveListenerBus, conf: SparkConf, - scheduler: ExecutorAllocationClient, + scheduler: Option[ExecutorAllocationClient], clock: Clock = new SystemClock()) extends Logging { - def this(sc: SparkContext, scheduler: ExecutorAllocationClient) = { + def this(sc: SparkContext, scheduler: Option[ExecutorAllocationClient]) = { this(sc.listenerBus, sc.getConf, scheduler) } @@ -170,6 +170,21 @@ private[scheduler] class BlacklistTracker ( if (newTotal >= MAX_FAILURES_PER_EXEC && !executorIdToBlacklistStatus.contains(exec)) { logInfo(s"Blacklisting executor id: $exec because it has $newTotal" + s" task failures in successful task sets") + conf.get(config.BLACKLIST_ENABLED) match { + case Some(enabled) => + if (enabled) { + scheduler match { + case Some(scheduler) => + logInfo(s"Killing blacklisted executor id: $exec" + + s"since spark.blacklist.kill is set.") + scheduler.killExecutors(Seq(exec), true, true) + case None => + logWarning(s"Not attempting to kill blacklisted executor id $exec" + + s"since scheduler is not defined.") + } + } + case None => + } val node = failuresInTaskSet.node executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(node, expiryTime)) listenerBus.post( @@ -184,9 +199,23 @@ private[scheduler] class BlacklistTracker ( if (blacklistedExecsOnNode.size >= MAX_FAILED_EXEC_PER_NODE) { logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " + s"executors blacklisted: ${blacklistedExecsOnNode}") + // TODO Prevent the scheduler from offering executors on this host. + conf.get(config.BLACKLIST_ENABLED) match { + case Some(enabled) => + if (enabled) { + scheduler match { + case Some(scheduler) => + logInfo(s"Killing blacklisted executor id: $exec" + + s"since spark.blacklist.kill is set.") + scheduler.killExecutorsOnHost(node) + case None => + logWarning(s"Not attempting to kill blacklisted executor id $exec" + + s"since scheduler is not defined.") + } + } + case None => + } nodeIdToBlacklistExpiryTime.put(node, expiryTime) - // TODO Only do this if a config value is set. - scheduler.killExecutorsOnHost(node) listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size)) _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 6ee5a30ff6e2a..15d237f3b73c9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -703,7 +703,11 @@ private[spark] object TaskSchedulerImpl { private def maybeCreateBlacklistTracker(sc: SparkContext): Option[BlacklistTracker] = { if (BlacklistTracker.isBlacklistEnabled(sc.conf)) { - Some(new BlacklistTracker(sc, scheduler)) + val executorAllocClient: Option[ExecutorAllocationClient] = sc.schedulerBackend match { + case b: ExecutorAllocationClient => Some(b.asInstanceOf[ExecutorAllocationClient]) + case _ => None + } + Some(new BlacklistTracker(sc, executorAllocClient)) } else { None } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 4d8fe12c91642..93329eab16faa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -547,7 +547,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * @return whether the kill request is acknowledged. If list to kill is empty, it will return * false. */ - final def killExecutors( + final override def killExecutors( executorIds: Seq[String], replace: Boolean, force: Boolean): Seq[String] = { diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 87b022fb37cb8..81b5b51f7a0fa 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -1147,8 +1147,6 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend } } - override def killExecutorsOnHost(host: String): Seq[String] = { Seq.empty[String] } - override def start(): Unit = sb.start() override def stop(): Unit = sb.stop() @@ -1156,4 +1154,11 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend override def reviveOffers(): Unit = sb.reviveOffers() override def defaultParallelism(): Int = sb.defaultParallelism() + + // Unused. + override def killExecutorsOnHost(host: String): Seq[String] = { Seq.empty[String] } + + // Unused. + override def killExecutors(executorIds: Seq[String], replace: Boolean, force: Boolean) + : Seq[String] = { Seq.empty[String] } } From e605eaf624fb4ef4f16a55c7dc18803f7e262180 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Fri, 2 Dec 2016 17:14:35 -0600 Subject: [PATCH 09/11] Additional updates. Not sure if this killing is thread or race safe. --- .../scala/org/apache/spark/SparkContext.scala | 17 ----------------- .../spark/scheduler/BlacklistTracker.scala | 5 ++++- .../cluster/CoarseGrainedSchedulerBackend.scala | 4 +++- 3 files changed, 7 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 76b5acab1a54c..4694790c72cd8 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1570,23 +1570,6 @@ class SparkContext(config: SparkConf) extends Logging { @DeveloperApi def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId)) - /** - * :: DeveloperApi :: - * Request that the cluster manager kill all executors on the specified host. - * - * @return whether the request is received. - */ - @DeveloperApi - def killExecutorsOnHost(host: String): Boolean = { - schedulerBackend match { - case b: CoarseGrainedSchedulerBackend => - b.killExecutorsOnHost(host).nonEmpty - case _ => - logWarning("Killing executors is only supported in coarse-grained mode") - false - } - } - /** * Request that the cluster manager kill the specified executor without adjusting the * application resource requirements. diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index e822d17cac7f0..58b4dc3f753f6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -177,6 +177,9 @@ private[scheduler] class BlacklistTracker ( case Some(scheduler) => logInfo(s"Killing blacklisted executor id: $exec" + s"since spark.blacklist.kill is set.") + // TODO Do this killing in the driver via an RPC message? + // TODO Update the coarseGrainedSchedulerBackend's list of executors and hosts + // TODO to fail fast and not attempt to allocate this executor? scheduler.killExecutors(Seq(exec), true, true) case None => logWarning(s"Not attempting to kill blacklisted executor id $exec" + @@ -199,7 +202,6 @@ private[scheduler] class BlacklistTracker ( if (blacklistedExecsOnNode.size >= MAX_FAILED_EXEC_PER_NODE) { logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " + s"executors blacklisted: ${blacklistedExecsOnNode}") - // TODO Prevent the scheduler from offering executors on this host. conf.get(config.BLACKLIST_ENABLED) match { case Some(enabled) => if (enabled) { @@ -207,6 +209,7 @@ private[scheduler] class BlacklistTracker ( case Some(scheduler) => logInfo(s"Killing blacklisted executor id: $exec" + s"since spark.blacklist.kill is set.") + // TODO Same as above. scheduler.killExecutorsOnHost(node) case None => logWarning(s"Not attempting to kill blacklisted executor id $exec" + diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 93329eab16faa..b64697653e7d6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -610,7 +610,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp */ final override def killExecutorsOnHost(host: String): Seq[String] = { logInfo(s"Requesting to kill any and all executors on host ${host}") - killExecutors(scheduler.getExecutorsAliveOnHost(host).get.toSeq, replace = true, force = true) + scheduler.getExecutorsAliveOnHost(host).foreach(exec => + killExecutors(exec.toSeq, replace = true, force = true) + ) } } From 4fb6b1266465cb2ca938fc612aa833d542067e5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Fri, 2 Dec 2016 17:25:21 -0600 Subject: [PATCH 10/11] Add some implementation thoughts in comments to BlacklistTracker --- .../scala/org/apache/spark/ExecutorAllocationClient.scala | 2 +- .../org/apache/spark/scheduler/BlacklistTracker.scala | 8 ++++++++ .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 2 +- .../org/apache/spark/ExecutorAllocationManagerSuite.scala | 2 +- 4 files changed, 11 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index a35338cb64a86..84a46da6e0262 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -63,7 +63,7 @@ private[spark] trait ExecutorAllocationClient { * and maybe replace them. * @return whether the request is acknowledged by the cluster manager. */ - def killExecutors(executorIds: Seq[String], replace: Boolean, force: Boolean): Seq[String] + def killExecutors(executorIds: Seq[String], replace: Boolean, force: Boolean): Unit /** * Request that the cluster manager kill every executor on the specified host. diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index 58b4dc3f753f6..efc27202d8518 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -202,6 +202,14 @@ private[scheduler] class BlacklistTracker ( if (blacklistedExecsOnNode.size >= MAX_FAILED_EXEC_PER_NODE) { logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " + s"executors blacklisted: ${blacklistedExecsOnNode}") + // TODO: + // As soon as this decision has been made, a couple of things need to happen. + // First, as quickly as possible, we need to tell the scheduler backend to: + // not create any additional executors on this host + // (attempt to) fail to create any executors being created. + // not schedule any additional tasks on the executors on this host. + // + // Then, we kill and re-create all the executors on this host. conf.get(config.BLACKLIST_ENABLED) match { case Some(enabled) => if (enabled) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index b64697653e7d6..0a6da3d39de69 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -608,7 +608,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * Request that the cluster manager kill all executors on a given host. * @return whether the kill request is acknowledged */ - final override def killExecutorsOnHost(host: String): Seq[String] = { + final override def killExecutorsOnHost(host: String): Unit = { logInfo(s"Requesting to kill any and all executors on host ${host}") scheduler.getExecutorsAliveOnHost(host).foreach(exec => killExecutors(exec.toSeq, replace = true, force = true) diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 81b5b51f7a0fa..491c5d2897c9d 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -1160,5 +1160,5 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend // Unused. override def killExecutors(executorIds: Seq[String], replace: Boolean, force: Boolean) - : Seq[String] = { Seq.empty[String] } + : Unit = {} } From 322a2328e9ff483b0dea5421445f9749d90dca53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Hiram=20Soltren?= Date: Mon, 12 Dec 2016 21:30:11 -0600 Subject: [PATCH 11/11] Update killing of nodes to use an RPC method for synchronization --- .../spark/scheduler/BlacklistTracker.scala | 18 +++--------------- .../cluster/CoarseGrainedClusterMessage.scala | 2 ++ .../CoarseGrainedSchedulerBackend.scala | 9 ++++++--- 3 files changed, 11 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index efc27202d8518..788a05ce7ee73 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -175,11 +175,8 @@ private[scheduler] class BlacklistTracker ( if (enabled) { scheduler match { case Some(scheduler) => - logInfo(s"Killing blacklisted executor id: $exec" + + logInfo(s"Killing blacklisted executor id $exec" + s"since spark.blacklist.kill is set.") - // TODO Do this killing in the driver via an RPC message? - // TODO Update the coarseGrainedSchedulerBackend's list of executors and hosts - // TODO to fail fast and not attempt to allocate this executor? scheduler.killExecutors(Seq(exec), true, true) case None => logWarning(s"Not attempting to kill blacklisted executor id $exec" + @@ -202,25 +199,16 @@ private[scheduler] class BlacklistTracker ( if (blacklistedExecsOnNode.size >= MAX_FAILED_EXEC_PER_NODE) { logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " + s"executors blacklisted: ${blacklistedExecsOnNode}") - // TODO: - // As soon as this decision has been made, a couple of things need to happen. - // First, as quickly as possible, we need to tell the scheduler backend to: - // not create any additional executors on this host - // (attempt to) fail to create any executors being created. - // not schedule any additional tasks on the executors on this host. - // - // Then, we kill and re-create all the executors on this host. conf.get(config.BLACKLIST_ENABLED) match { case Some(enabled) => if (enabled) { scheduler match { case Some(scheduler) => - logInfo(s"Killing blacklisted executor id: $exec" + + logInfo(s"Killing all executors on blacklisted host $node" + s"since spark.blacklist.kill is set.") - // TODO Same as above. scheduler.killExecutorsOnHost(node) case None => - logWarning(s"Not attempting to kill blacklisted executor id $exec" + + logWarning(s"Not attempting to kill executors on blacklisted host $node" + s"since scheduler is not defined.") } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 684bf75c03d80..ed80a55606288 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -38,6 +38,8 @@ private[spark] object CoarseGrainedClusterMessages { case class KillTask(taskId: Long, executor: String, interruptThread: Boolean) extends CoarseGrainedClusterMessage + case class KillExecutorsOnHost(host: String) extends CoarseGrainedClusterMessage + sealed trait RegisterExecutorResponse case object RegisteredExecutor extends CoarseGrainedClusterMessage with RegisterExecutorResponse diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 0a6da3d39de69..c519e71d6905e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -145,6 +145,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Ignoring the task kill since the executor is not registered. logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.") } + + case KillExecutorsOnHost(host) => + scheduler.getExecutorsAliveOnHost(host).foreach(exec => + killExecutors(exec.toSeq, replace = true, force = true) + ) } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { @@ -610,9 +615,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp */ final override def killExecutorsOnHost(host: String): Unit = { logInfo(s"Requesting to kill any and all executors on host ${host}") - scheduler.getExecutorsAliveOnHost(host).foreach(exec => - killExecutors(exec.toSeq, replace = true, force = true) - ) + driverEndpoint.send(KillExecutorsOnHost(host)) } }