Skip to content

Commit 17b72d3

Browse files
Dhruve AsharMarcelo Vanzin
authored andcommitted
[SPARK-17365][CORE] Remove/Kill multiple executors together to reduce RPC call time.
## What changes were proposed in this pull request? We are killing multiple executors together instead of iterating over expensive RPC calls to kill single executor. ## How was this patch tested? Executed sample spark job to observe executors being killed/removed with dynamic allocation enabled. Author: Dhruve Ashar <[email protected]> Author: Dhruve Ashar <[email protected]> Closes #15152 from dhruve/impr/SPARK-17365.
1 parent 8a02410 commit 17b72d3

File tree

11 files changed

+239
-55
lines changed

11 files changed

+239
-55
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,16 @@ private[spark] trait ExecutorAllocationClient {
5454

5555
/**
5656
* Request that the cluster manager kill the specified executors.
57-
* @return whether the request is acknowledged by the cluster manager.
57+
* @return the ids of the executors acknowledged by the cluster manager to be removed.
5858
*/
59-
def killExecutors(executorIds: Seq[String]): Boolean
59+
def killExecutors(executorIds: Seq[String]): Seq[String]
6060

6161
/**
6262
* Request that the cluster manager kill the specified executor.
6363
* @return whether the request is acknowledged by the cluster manager.
6464
*/
65-
def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId))
65+
def killExecutor(executorId: String): Boolean = {
66+
val killedExecutors = killExecutors(Seq(executorId))
67+
killedExecutors.nonEmpty && killedExecutors(0).equals(executorId)
68+
}
6669
}

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 64 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark
2020
import java.util.concurrent.TimeUnit
2121

2222
import scala.collection.mutable
23+
import scala.collection.mutable.ArrayBuffer
2324
import scala.util.control.ControlThrowable
2425

2526
import com.codahale.metrics.{Gauge, MetricRegistry}
@@ -279,14 +280,18 @@ private[spark] class ExecutorAllocationManager(
279280

280281
updateAndSyncNumExecutorsTarget(now)
281282

283+
val executorIdsToBeRemoved = ArrayBuffer[String]()
282284
removeTimes.retain { case (executorId, expireTime) =>
283285
val expired = now >= expireTime
284286
if (expired) {
285287
initializing = false
286-
removeExecutor(executorId)
288+
executorIdsToBeRemoved += executorId
287289
}
288290
!expired
289291
}
292+
if (executorIdsToBeRemoved.nonEmpty) {
293+
removeExecutors(executorIdsToBeRemoved)
294+
}
290295
}
291296

292297
/**
@@ -391,11 +396,67 @@ private[spark] class ExecutorAllocationManager(
391396
}
392397
}
393398

399+
/**
400+
* Request the cluster manager to remove the given executors.
401+
* Returns the list of executors which are removed.
402+
*/
403+
private def removeExecutors(executors: Seq[String]): Seq[String] = synchronized {
404+
val executorIdsToBeRemoved = new ArrayBuffer[String]
405+
406+
logInfo("Request to remove executorIds: " + executors.mkString(", "))
407+
val numExistingExecutors = allocationManager.executorIds.size - executorsPendingToRemove.size
408+
409+
var newExecutorTotal = numExistingExecutors
410+
executors.foreach { executorIdToBeRemoved =>
411+
if (newExecutorTotal - 1 < minNumExecutors) {
412+
logDebug(s"Not removing idle executor $executorIdToBeRemoved because there are only " +
413+
s"$newExecutorTotal executor(s) left (limit $minNumExecutors)")
414+
} else if (canBeKilled(executorIdToBeRemoved)) {
415+
executorIdsToBeRemoved += executorIdToBeRemoved
416+
newExecutorTotal -= 1
417+
}
418+
}
419+
420+
if (executorIdsToBeRemoved.isEmpty) {
421+
return Seq.empty[String]
422+
}
423+
424+
// Send a request to the backend to kill this executor(s)
425+
val executorsRemoved = if (testing) {
426+
executorIdsToBeRemoved
427+
} else {
428+
client.killExecutors(executorIdsToBeRemoved)
429+
}
430+
// reset the newExecutorTotal to the existing number of executors
431+
newExecutorTotal = numExistingExecutors
432+
if (testing || executorsRemoved.nonEmpty) {
433+
executorsRemoved.foreach { removedExecutorId =>
434+
newExecutorTotal -= 1
435+
logInfo(s"Removing executor $removedExecutorId because it has been idle for " +
436+
s"$executorIdleTimeoutS seconds (new desired total will be $newExecutorTotal)")
437+
executorsPendingToRemove.add(removedExecutorId)
438+
}
439+
executorsRemoved
440+
} else {
441+
logWarning(s"Unable to reach the cluster manager to kill executor/s " +
442+
"executorIdsToBeRemoved.mkString(\",\") or no executor eligible to kill!")
443+
Seq.empty[String]
444+
}
445+
}
446+
394447
/**
395448
* Request the cluster manager to remove the given executor.
396-
* Return whether the request is received.
449+
* Return whether the request is acknowledged.
397450
*/
398451
private def removeExecutor(executorId: String): Boolean = synchronized {
452+
val executorsRemoved = removeExecutors(Seq(executorId))
453+
executorsRemoved.nonEmpty && executorsRemoved(0) == executorId
454+
}
455+
456+
/**
457+
* Determine if the given executor can be killed.
458+
*/
459+
private def canBeKilled(executorId: String): Boolean = synchronized {
399460
// Do not kill the executor if we are not aware of it (should never happen)
400461
if (!executorIds.contains(executorId)) {
401462
logWarning(s"Attempted to remove unknown executor $executorId!")
@@ -409,26 +470,7 @@ private[spark] class ExecutorAllocationManager(
409470
return false
410471
}
411472

412-
// Do not kill the executor if we have already reached the lower bound
413-
val numExistingExecutors = executorIds.size - executorsPendingToRemove.size
414-
if (numExistingExecutors - 1 < minNumExecutors) {
415-
logDebug(s"Not removing idle executor $executorId because there are only " +
416-
s"$numExistingExecutors executor(s) left (limit $minNumExecutors)")
417-
return false
418-
}
419-
420-
// Send a request to the backend to kill this executor
421-
val removeRequestAcknowledged = testing || client.killExecutor(executorId)
422-
if (removeRequestAcknowledged) {
423-
logInfo(s"Removing executor $executorId because it has been idle for " +
424-
s"$executorIdleTimeoutS seconds (new desired total will be ${numExistingExecutors - 1})")
425-
executorsPendingToRemove.add(executorId)
426-
true
427-
} else {
428-
logWarning(s"Unable to reach the cluster manager to kill executor $executorId," +
429-
s"or no executor eligible to kill!")
430-
false
431-
}
473+
true
432474
}
433475

434476
/**

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ import org.apache.spark.util._
7373
* @param config a Spark Config object describing the application configuration. Any settings in
7474
* this config overrides the default configs as well as system properties.
7575
*/
76-
class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
76+
class SparkContext(config: SparkConf) extends Logging {
7777

7878
// The call site where this SparkContext was constructed.
7979
private val creationSite: CallSite = Utils.getCallSite()
@@ -534,7 +534,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
534534
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
535535
_executorAllocationManager =
536536
if (dynamicAllocationEnabled) {
537-
Some(new ExecutorAllocationManager(this, listenerBus, _conf))
537+
schedulerBackend match {
538+
case b: ExecutorAllocationClient =>
539+
Some(new ExecutorAllocationManager(
540+
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf))
541+
case _ =>
542+
None
543+
}
538544
} else {
539545
None
540546
}
@@ -1473,7 +1479,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
14731479
listenerBus.addListener(listener)
14741480
}
14751481

1476-
private[spark] override def getExecutorIds(): Seq[String] = {
1482+
private[spark] def getExecutorIds(): Seq[String] = {
14771483
schedulerBackend match {
14781484
case b: CoarseGrainedSchedulerBackend =>
14791485
b.getExecutorIds()
@@ -1498,7 +1504,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
14981504
* @return whether the request is acknowledged by the cluster manager.
14991505
*/
15001506
@DeveloperApi
1501-
override def requestTotalExecutors(
1507+
def requestTotalExecutors(
15021508
numExecutors: Int,
15031509
localityAwareTasks: Int,
15041510
hostToLocalTaskCount: scala.collection.immutable.Map[String, Int]
@@ -1518,7 +1524,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
15181524
* @return whether the request is received.
15191525
*/
15201526
@DeveloperApi
1521-
override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
1527+
def requestExecutors(numAdditionalExecutors: Int): Boolean = {
15221528
schedulerBackend match {
15231529
case b: CoarseGrainedSchedulerBackend =>
15241530
b.requestExecutors(numAdditionalExecutors)
@@ -1540,10 +1546,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
15401546
* @return whether the request is received.
15411547
*/
15421548
@DeveloperApi
1543-
override def killExecutors(executorIds: Seq[String]): Boolean = {
1549+
def killExecutors(executorIds: Seq[String]): Boolean = {
15441550
schedulerBackend match {
15451551
case b: CoarseGrainedSchedulerBackend =>
1546-
b.killExecutors(executorIds, replace = false, force = true)
1552+
b.killExecutors(executorIds, replace = false, force = true).nonEmpty
15471553
case _ =>
15481554
logWarning("Killing executors is only supported in coarse-grained mode")
15491555
false
@@ -1562,7 +1568,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
15621568
* @return whether the request is received.
15631569
*/
15641570
@DeveloperApi
1565-
override def killExecutor(executorId: String): Boolean = super.killExecutor(executorId)
1571+
def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId))
15661572

15671573
/**
15681574
* Request that the cluster manager kill the specified executor without adjusting the
@@ -1581,7 +1587,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
15811587
private[spark] def killAndReplaceExecutor(executorId: String): Boolean = {
15821588
schedulerBackend match {
15831589
case b: CoarseGrainedSchedulerBackend =>
1584-
b.killExecutors(Seq(executorId), replace = true, force = true)
1590+
b.killExecutors(Seq(executorId), replace = true, force = true).nonEmpty
15851591
case _ =>
15861592
logWarning("Killing executors is only supported in coarse-grained mode")
15871593
false

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
528528
* @return whether the kill request is acknowledged. If list to kill is empty, it will return
529529
* false.
530530
*/
531-
final override def killExecutors(executorIds: Seq[String]): Boolean = {
531+
final override def killExecutors(executorIds: Seq[String]): Seq[String] = {
532532
killExecutors(executorIds, replace = false, force = false)
533533
}
534534

@@ -548,7 +548,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
548548
final def killExecutors(
549549
executorIds: Seq[String],
550550
replace: Boolean,
551-
force: Boolean): Boolean = {
551+
force: Boolean): Seq[String] = {
552552
logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
553553

554554
val response = synchronized {
@@ -564,6 +564,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
564564
.filter { id => force || !scheduler.isExecutorBusy(id) }
565565
executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace }
566566

567+
logInfo(s"Actual list of executor(s) to be killed is ${executorsToKill.mkString(", ")}")
568+
567569
// If we do not wish to replace the executors we kill, sync the target number of executors
568570
// with the cluster manager to avoid allocating new ones. When computing the new target,
569571
// take into account executors that are pending to be added or removed.
@@ -583,7 +585,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
583585
_ => Future.successful(false)
584586
}
585587

586-
adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)
588+
val killResponse = adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)
589+
590+
killResponse.flatMap(killSuccessful =>
591+
Future.successful (if (killSuccessful) executorsToKill else Seq.empty[String])
592+
)(ThreadUtils.sameThread)
587593
}
588594

589595
defaultAskTimeout.awaitResult(response)
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
org.apache.spark.scheduler.DummyExternalClusterManager
2-
org.apache.spark.scheduler.MockExternalClusterManager
2+
org.apache.spark.scheduler.MockExternalClusterManager
3+
org.apache.spark.DummyLocalExternalClusterManager

0 commit comments

Comments
 (0)