Skip to content

Commit 01d4137

Browse files
committed
CR feedback, move adjustExecutors to a common utility function
1 parent 3fa3313 commit 01d4137

File tree

5 files changed

+31
-37
lines changed

5 files changed

+31
-37
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ private[spark] trait ExecutorAllocationClient {
9393
* @return the ids of the executors acknowledged by the cluster manager to be removed.
9494
*/
9595
def decommissionExecutors(
96-
executorsAndDecomInfo: Seq[(String, ExecutorDecommissionInfo)],
96+
executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
9797
adjustTargetNumExecutors: Boolean): Seq[String] = {
9898
killExecutors(executorsAndDecomInfo.map(_._1),
9999
adjustTargetNumExecutors,
@@ -103,19 +103,18 @@ private[spark] trait ExecutorAllocationClient {
103103

104104
/**
105105
* Request that the cluster manager decommission the specified executor.
106-
* Default implementation delegates to decommissionExecutors, scheduler can override
107-
* if it supports graceful decommissioning.
106+
* Delegates to decommissionExecutors.
108107
*
109108
* @param executorId identifiers of executor to decommission
110109
* @param decommissionInfo information about the decommission (reason, host loss)
111110
* @param adjustTargetNumExecutors if we should adjust the target number of executors.
112111
* @return whether the request is acknowledged by the cluster manager.
113112
*/
114-
def decommissionExecutor(executorId: String,
113+
final def decommissionExecutor(executorId: String,
115114
decommissionInfo: ExecutorDecommissionInfo,
116115
adjustTargetNumExecutors: Boolean): Boolean = {
117116
val decommissionedExecutors = decommissionExecutors(
118-
Seq((executorId, decommissionInfo)),
117+
Array((executorId, decommissionInfo)),
119118
adjustTargetNumExecutors = adjustTargetNumExecutors)
120119
decommissionedExecutors.nonEmpty && decommissionedExecutors(0).equals(executorId)
121120
}

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -575,7 +575,7 @@ private[spark] class ExecutorAllocationManager(
575575
// when the task backlog decreased.
576576
if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
577577
val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map(
578-
id => (id, ExecutorDecommissionInfo("spark scale down", false)))
578+
id => (id, ExecutorDecommissionInfo("spark scale down", false))).toArray
579579
client.decommissionExecutors(executorIdsWithoutHostLoss, adjustTargetNumExecutors = false)
580580
} else {
581581
client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false,

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 24 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -513,7 +513,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
513513
* @return the ids of the executors acknowledged by the cluster manager to be removed.
514514
*/
515515
override def decommissionExecutors(
516-
executorsAndDecomInfo: Seq[(String, ExecutorDecommissionInfo)],
516+
executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
517517
adjustTargetNumExecutors: Boolean): Seq[String] = {
518518

519519
val executorsToDecommission = executorsAndDecomInfo.filter { case (executorId, _) =>
@@ -530,21 +530,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
530530

531531
// If we don't want to replace the executors we are decommissioning
532532
if (adjustTargetNumExecutors) {
533-
executorsToDecommission.foreach { case (exec, _) =>
534-
val rpId = withLock {
535-
executorDataMap(exec).resourceProfileId
536-
}
537-
val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
538-
if (requestedTotalExecutorsPerResourceProfile.isEmpty) {
539-
// Assume that we are killing an executor that was started by default and
540-
// not through the request api
541-
requestedTotalExecutorsPerResourceProfile(rp) = 0
542-
} else {
543-
val requestedTotalForRp = requestedTotalExecutorsPerResourceProfile(rp)
544-
requestedTotalExecutorsPerResourceProfile(rp) = math.max(requestedTotalForRp - 1, 0)
545-
}
546-
}
547-
doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
533+
adjustExecutors(executorsToDecommission.map(_._1))
548534
}
549535

550536
val decommissioned = executorsToDecommission.filter { case (executorId, decomInfo) =>
@@ -846,6 +832,27 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
846832
resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Future[Boolean] =
847833
Future.successful(false)
848834

835+
/**
836+
* Adjust the number of executors being requested to no longer include the provided executors.
837+
*/
838+
private def adjustExecutors(executorIds: Seq[String]) = {
839+
executorIds.foreach { exec =>
840+
withLock {
841+
val rpId = executorDataMap(exec).resourceProfileId
842+
val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
843+
if (requestedTotalExecutorsPerResourceProfile.isEmpty) {
844+
// Assume that we are killing an executor that was started by default and
845+
// not through the request api
846+
requestedTotalExecutorsPerResourceProfile(rp) = 0
847+
} else {
848+
val requestedTotalForRp = requestedTotalExecutorsPerResourceProfile(rp)
849+
requestedTotalExecutorsPerResourceProfile(rp) = math.max(requestedTotalForRp - 1, 0)
850+
}
851+
}
852+
}
853+
doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
854+
}
855+
849856
/**
850857
* Request that the cluster manager kill the specified executors.
851858
*
@@ -884,19 +891,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
884891
// take into account executors that are pending to be added or removed.
885892
val adjustTotalExecutors =
886893
if (adjustTargetNumExecutors) {
887-
executorsToKill.foreach { exec =>
888-
val rpId = executorDataMap(exec).resourceProfileId
889-
val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
890-
if (requestedTotalExecutorsPerResourceProfile.isEmpty) {
891-
// Assume that we are killing an executor that was started by default and
892-
// not through the request api
893-
requestedTotalExecutorsPerResourceProfile(rp) = 0
894-
} else {
895-
val requestedTotalForRp = requestedTotalExecutorsPerResourceProfile(rp)
896-
requestedTotalExecutorsPerResourceProfile(rp) = math.max(requestedTotalForRp - 1, 0)
897-
}
898-
}
899-
doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
894+
adjustExecutors(executorsToKill)
900895
} else {
901896
Future.successful(true)
902897
}

core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ private[spark] class StandaloneSchedulerBackend(
177177
override def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo) {
178178
logInfo("Asked to decommission executor")
179179
val execId = fullId.split("/")(1)
180-
decommissionExecutors(Seq((execId, decommissionInfo)), adjustTargetNumExecutors = false)
180+
decommissionExecutors(Array((execId, decommissionInfo)), adjustTargetNumExecutors = false)
181181
logInfo("Executor %s decommissioned: %s".format(fullId, decommissionInfo))
182182
}
183183

core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext {
7777
val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
7878
val execs = sched.getExecutorIds()
7979
// Make the executors decommission, finish, exit, and not be replaced.
80-
val execsAndDecomInfo = execs.map((_, ExecutorDecommissionInfo("", false)))
80+
val execsAndDecomInfo = execs.map((_, ExecutorDecommissionInfo("", false))).toArray
8181
sched.decommissionExecutors(execsAndDecomInfo, adjustTargetNumExecutors = true)
8282
val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds)
8383
assert(asyncCountResult === 10)

0 commit comments

Comments
 (0)