Skip to content

Commit 780b00b

Browse files
committed
CR feedback/cleanup
1 parent 38a413e commit 780b00b

File tree

12 files changed

+44
-42
lines changed

12 files changed

+44
-42
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,15 +87,15 @@ private[spark] trait ExecutorAllocationClient {
8787
* Default implementation delegates to kill, scheduler must override
8888
* if it supports graceful decommissioning.
8989
*
90-
* @param executors identifiers of executors & decom info.
90+
* @param executorsAndDecominfo identifiers of executors & decom info.
9191
* @param adjustTargetNumExecutors whether the target number of executors will be adjusted down
9292
* after these executors have been decommissioned.
9393
* @return the ids of the executors acknowledged by the cluster manager to be removed.
9494
*/
9595
def decommissionExecutors(
96-
executors: Seq[(String, ExecutorDecommissionInfo)],
96+
executorsAndDecomInfo: Seq[(String, ExecutorDecommissionInfo)],
9797
adjustTargetNumExecutors: Boolean): Seq[String] = {
98-
killExecutors(executors.map(_._1),
98+
killExecutors(executorsAndDecomInfo.map(_._1),
9999
adjustTargetNumExecutors,
100100
countFailures = false)
101101
}
@@ -108,13 +108,15 @@ private[spark] trait ExecutorAllocationClient {
108108
*
109109
* @param executorId identifiers of executor to decommission
110110
* @param decommissionInfo information about the decommission (reason, host loss)
111+
* @param adjustTargetNumExecutors if we should adjust the target number of executors.
111112
* @return whether the request is acknowledged by the cluster manager.
112113
*/
113114
def decommissionExecutor(executorId: String,
114-
decommissionInfo: ExecutorDecommissionInfo): Boolean = {
115+
decommissionInfo: ExecutorDecommissionInfo,
116+
adjustTargetNumExecutors: Boolean): Boolean = {
115117
val decommissionedExecutors = decommissionExecutors(
116118
Seq((executorId, decommissionInfo)),
117-
adjustTargetNumExecutors = true)
119+
adjustTargetNumExecutors = adjustTargetNumExecutors)
118120
decommissionedExecutors.nonEmpty && decommissionedExecutors(0).equals(executorId)
119121
}
120122

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -546,7 +546,7 @@ private[spark] class ExecutorAllocationManager(
546546
val newExecutorTotal = numExecutorsTotalPerRpId.getOrElseUpdate(rpId,
547547
(executorMonitor.executorCountWithResourceProfile(rpId) -
548548
executorMonitor.pendingRemovalCountPerResourceProfileId(rpId) -
549-
executorMonitor.pendingDecommissioningPerResourceProfileId(rpId)
549+
executorMonitor.decommissioningPerResourceProfileId(rpId)
550550
))
551551
if (newExecutorTotal - 1 < minNumExecutors) {
552552
logDebug(s"Not removing idle executor $executorIdToBeRemoved because there " +

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -193,8 +193,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
193193

194194
case DecommissionExecutor(executorId, decommissionInfo) =>
195195
logError(s"Received decommission executor message ${executorId}: $decommissionInfo")
196-
decommissionExecutors(Seq((executorId, decommissionInfo)),
197-
adjustTargetNumExecutors = false)
196+
decommissionExecutor(executorId, decommissionInfo)
198197

199198
case RemoveWorker(workerId, host, message) =>
200199
removeWorker(workerId, host, message)
@@ -275,8 +274,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
275274

276275
case DecommissionExecutor(executorId, decommissionInfo) =>
277276
logError(s"Received decommission executor message ${executorId}: ${decommissionInfo}.")
278-
decommissionExecutors(Seq((executorId, decommissionInfo)),
279-
adjustTargetNumExecutors = false)
277+
decommissionExecutor(executorId, decommissionInfo)
280278
context.reply(true)
281279

282280
case RetrieveSparkAppConfig(resourceProfileId) =>
@@ -423,6 +421,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
423421

424422
/**
425423
* Mark a given executor as decommissioned and stop making resource offers for it.
424+
*
426425
*/
427426
private def decommissionExecutor(
428427
executorId: String, decommissionInfo: ExecutorDecommissionInfo): Boolean = {
@@ -508,16 +507,16 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
508507
/**
509508
* Request that the cluster manager decommission the specified executors.
510509
*
511-
* @param executors Identifiers of executors & decommission info.
510+
* @param executorsAndDecomInfo Identifiers of executors & decommission info.
512511
* @param adjustTargetNumExecutors whether the target number of executors will be adjusted down
513512
* after these executors have been decommissioned.
514513
* @return the ids of the executors acknowledged by the cluster manager to be removed.
515514
*/
516515
override def decommissionExecutors(
517-
executors: Seq[(String, ExecutorDecommissionInfo)],
516+
executorsAndDecomInfo: Seq[(String, ExecutorDecommissionInfo)],
518517
adjustTargetNumExecutors: Boolean): Seq[String] = {
519518

520-
val executorsToDecommission = executors.filter{case (executorId, _) =>
519+
val executorsToDecommission = executorsAndDecomInfo.filter { case (executorId, _) =>
521520
CoarseGrainedSchedulerBackend.this.synchronized {
522521
// Only bother decommissioning executors which are alive.
523522
if (isExecutorActive(executorId)) {
@@ -532,7 +531,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
532531
// If we don't want to replace the executors we are decommissioning
533532
if (adjustTargetNumExecutors) {
534533
executorsToDecommission.foreach { case (exec, _) =>
535-
val rpId = executorDataMap(exec).resourceProfileId
534+
val rpId = withLock {
535+
executorDataMap(exec).resourceProfileId
536+
}
536537
val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
537538
if (requestedTotalExecutorsPerResourceProfile.isEmpty) {
538539
// Assume that we are killing an executor that was started by default and
@@ -546,7 +547,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
546547
doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
547548
}
548549

549-
val decommissioned = executorsToDecommission.filter{case (executorId, decomInfo) =>
550+
val decommissioned = executorsToDecommission.filter { case (executorId, decomInfo) =>
550551
doDecommission(executorId, decomInfo)
551552
}.map(_._1)
552553
decommissioned
@@ -556,7 +557,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
556557
private def doDecommission(executorId: String,
557558
decomInfo: ExecutorDecommissionInfo): Boolean = {
558559

559-
logInfo(s"Starting decommissioning executor $executorId.")
560+
logInfo(s"Asking executor $executorId to decommissioning.")
560561
try {
561562
scheduler.executorDecommission(executorId, decomInfo)
562563
if (driverEndpoint != null) {
@@ -580,12 +581,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
580581
return false
581582
}
582583
}
583-
logInfo(s"Finished decommissioning executor $executorId.")
584+
logInfo(s"Asked executor $executorId to decommission.")
584585

585586
if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {
586587
try {
587-
logInfo("Starting decommissioning block manager corresponding to " +
588-
s"executor $executorId.")
588+
logInfo(s"Asking block manager corresponding to executor $executorId to decommission.")
589589
scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId))
590590
} catch {
591591
case e: Exception =>
@@ -595,6 +595,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
595595
}
596596
logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.")
597597
}
598+
598599
true
599600
}
600601

core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ private[spark] class ExecutorMonitor(
115115
var newNextTimeout = Long.MaxValue
116116
timedOutExecs = executors.asScala
117117
.filter { case (_, exec) =>
118-
!exec.pendingRemoval && !exec.hasActiveShuffle && !exec.pendingDecommissioning}
118+
!exec.pendingRemoval && !exec.hasActiveShuffle && !exec.decommissioning}
119119
.filter { case (_, exec) =>
120120
val deadline = exec.timeoutAt
121121
if (deadline > now) {
@@ -155,7 +155,7 @@ private[spark] class ExecutorMonitor(
155155
ids.foreach { id =>
156156
val tracker = executors.get(id)
157157
if (tracker != null) {
158-
tracker.pendingDecommissioning = true
158+
tracker.decommissioning = true
159159
}
160160
}
161161

@@ -186,13 +186,13 @@ private[spark] class ExecutorMonitor(
186186
executors.asScala.filter { case (k, v) => v.resourceProfileId == id && v.pendingRemoval }.size
187187
}
188188

189-
def pendingDecommissioningCount: Int = executors.asScala.count { case (_, exec) =>
190-
exec.pendingDecommissioning
189+
def decommissioningCount: Int = executors.asScala.count { case (_, exec) =>
190+
exec.decommissioning
191191
}
192192

193-
def pendingDecommissioningPerResourceProfileId(id: Int): Int = {
193+
def decommissioningPerResourceProfileId(id: Int): Int = {
194194
executors.asScala.filter { case (k, v) =>
195-
v.resourceProfileId == id && v.pendingDecommissioning
195+
v.resourceProfileId == id && v.decommissioning
196196
}.size
197197
}
198198

@@ -352,7 +352,7 @@ private[spark] class ExecutorMonitor(
352352
val removed = executors.remove(event.executorId)
353353
if (removed != null) {
354354
decrementExecResourceProfileCount(removed.resourceProfileId)
355-
if (!removed.pendingRemoval || !removed.pendingDecommissioning) {
355+
if (!removed.pendingRemoval || !removed.decommissioning) {
356356
nextTimeout.set(Long.MinValue)
357357
}
358358
}
@@ -457,7 +457,7 @@ private[spark] class ExecutorMonitor(
457457

458458
// Visible for testing
459459
private[spark] def executorsDecommissioning(): Set[String] = {
460-
executors.asScala.filter { case (_, exec) => exec.pendingDecommissioning }.keys.toSet
460+
executors.asScala.filter { case (_, exec) => exec.decommissioning }.keys.toSet
461461
}
462462

463463
/**
@@ -512,7 +512,7 @@ private[spark] class ExecutorMonitor(
512512
@volatile var timedOut: Boolean = false
513513

514514
var pendingRemoval: Boolean = false
515-
var pendingDecommissioning: Boolean = false
515+
var decommissioning: Boolean = false
516516
var hasActiveShuffle: Boolean = false
517517

518518
private var idleStart: Long = -1

core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -314,13 +314,9 @@ private[storage] class BlockManagerDecommissioner(
314314
logInfo("Starting block migration thread")
315315
if (conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) {
316316
rddBlockMigrationExecutor.submit(rddBlockMigrationRunnable)
317-
} else {
318-
stoppedRDD = true
319317
}
320318
if (conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) {
321319
shuffleBlockMigrationRefreshExecutor.submit(shuffleBlockMigrationRefreshRunnable)
322-
} else {
323-
stoppedShuffle = true
324320
}
325321
if (!conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED) &&
326322
!conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) {

core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ class BlockManagerMaster(
4848
*/
4949
def decommissionBlockManagers(executorIds: Seq[String]): Unit = {
5050
driverEndpoint.ask[Boolean](DecommissionBlockManagers(executorIds))
51-
logInfo(s"Decommissioning block managers on ${executorIds}")
5251
}
5352

5453
/** Get Replication Info for all the RDD blocks stored in given blockManagerId */

core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ class WorkerDecommissionExtendedSuite extends SparkFunSuite with LocalSparkConte
6565

6666
val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
6767
sc.getExecutorIds().tail.foreach { id =>
68-
sched.decommissionExecutor(id, ExecutorDecommissionInfo("", false))
68+
sched.decommissionExecutor(id, ExecutorDecommissionInfo("", false),
69+
adjustTargetNumExecutors = false)
6970
assert(rdd3.sortByKey().collect().length === 100)
7071
}
7172
}

core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,9 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext {
7676
// decom.sh message passing is tested manually.
7777
val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
7878
val execs = sched.getExecutorIds()
79-
execs.foreach(execId => sched.decommissionExecutor(execId, ExecutorDecommissionInfo("", false)))
79+
// Make the executors decommission, finish, exit, and not be replaced.
80+
val execsAndDecomInfo = execs.map((_, ExecutorDecommissionInfo("", false)))
81+
sched.decommissionExecutors(execsAndDecomInfo, adjustTargetNumExecutors = true)
8082
val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds)
8183
assert(asyncCountResult === 10)
8284
}

core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,8 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
190190
logInfo(s"Decommissioning executor ${execToDecommission}")
191191
sched.decommissionExecutor(
192192
execToDecommission,
193-
ExecutorDecommissionInfo("", isHostDecommissioned = false))
193+
ExecutorDecommissionInfo("", isHostDecommissioned = false),
194+
adjustTargetNumExecutors = true)
194195
val decomTime = new SystemClock().getTimeMillis()
195196

196197
// Wait for job to finish.

core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,5 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers {
221221
} finally {
222222
bmDecomManager.stop()
223223
}
224-
225-
bmDecomManager.stop()
226224
}
227225
}

0 commit comments

Comments
 (0)