Skip to content

Commit cc76ff5

Browse files
committed
More CR feedback
1 parent 995ffa9 commit cc76ff5

File tree

4 files changed

+35
-85
lines changed

4 files changed

+35
-85
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@ private[spark] class ExecutorAllocationManager(
128128
private val executorAllocationRatio =
129129
conf.get(DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO)
130130

131+
private val decommissionEnabled = conf.get(WORKER_DECOMMISSION_ENABLED)
132+
131133
private val defaultProfileId = resourceProfileManager.defaultResourceProfile.id
132134

133135
validateSettings()
@@ -209,7 +211,7 @@ private[spark] class ExecutorAllocationManager(
209211
// storage shuffle decommissioning is enabled we have *experimental* support for
210212
// decommissioning without a shuffle service.
211213
if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED) ||
212-
(conf.get(WORKER_DECOMMISSION_ENABLED) &&
214+
(decommissionEnabled &&
213215
conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) {
214216
logWarning("Dynamic allocation without a shuffle service is an experimental feature.")
215217
} else if (!testing) {
@@ -573,7 +575,7 @@ private[spark] class ExecutorAllocationManager(
573575
} else {
574576
// We don't want to change our target number of executors, because we already did that
575577
// when the task backlog decreased.
576-
if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
578+
if (decommissionEnabled) {
577579
val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map(
578580
id => (id, ExecutorDecommissionInfo("spark scale down", false))).toArray
579581
client.decommissionExecutors(executorIdsWithoutHostLoss, adjustTargetNumExecutors = false)
@@ -592,7 +594,7 @@ private[spark] class ExecutorAllocationManager(
592594

593595
// reset the newExecutorTotal to the existing number of executors
594596
if (testing || executorsRemoved.nonEmpty) {
595-
if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
597+
if (decommissionEnabled) {
596598
executorMonitor.executorsDecommissioned(executorsRemoved)
597599
} else {
598600
executorMonitor.executorsKilled(executorsRemoved.toSeq)

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 3 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
193193

194194
case DecommissionExecutor(executorId, decommissionInfo) =>
195195
logError(s"Received decommission executor message ${executorId}: $decommissionInfo")
196-
decommissionExecutor(executorId, decommissionInfo)
196+
decommissionExecutor(executorId, decommissionInfo, adjustTargetNumExecutors = false)
197197

198198
case RemoveWorker(workerId, host, message) =>
199199
removeWorker(workerId, host, message)
@@ -274,8 +274,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
274274

275275
case DecommissionExecutor(executorId, decommissionInfo) =>
276276
logError(s"Received decommission executor message ${executorId}: ${decommissionInfo}.")
277-
decommissionExecutor(executorId, decommissionInfo)
278-
context.reply(true)
277+
context.reply(decommissionExecutor(executorId, decommissionInfo,
278+
adjustTargetNumExecutors = false))
279279

280280
case RetrieveSparkAppConfig(resourceProfileId) =>
281281
val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(resourceProfileId)
@@ -419,60 +419,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
419419
scheduler.workerRemoved(workerId, host, message)
420420
}
421421

422-
/**
423-
* Mark a given executor as decommissioned and stop making resource offers for it.
424-
*
425-
*/
426-
private def decommissionExecutor(
427-
executorId: String, decommissionInfo: ExecutorDecommissionInfo): Boolean = {
428-
val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized {
429-
// Only bother decommissioning executors which are alive.
430-
if (isExecutorActive(executorId)) {
431-
executorsPendingDecommission += executorId
432-
true
433-
} else {
434-
false
435-
}
436-
}
437-
438-
if (shouldDisable) {
439-
logInfo(s"Starting decommissioning executor $executorId.")
440-
try {
441-
scheduler.executorDecommission(executorId, decommissionInfo)
442-
} catch {
443-
case e: Exception =>
444-
logError(s"Unexpected error during decommissioning ${e.toString}", e)
445-
}
446-
// Send decommission message to the executor, this may be a duplicate since the executor
447-
// could have been the one to notify us. But it's also possible the notification came from
448-
// elsewhere and the executor does not yet know.
449-
executorDataMap.get(executorId) match {
450-
case Some(executorInfo) =>
451-
executorInfo.executorEndpoint.send(DecommissionSelf)
452-
case None =>
453-
// Ignoring the executor since it is not registered.
454-
logWarning(s"Attempted to decommission unknown executor $executorId.")
455-
}
456-
logInfo(s"Finished decommissioning executor $executorId.")
457-
458-
if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {
459-
try {
460-
logInfo("Starting decommissioning block manager corresponding to " +
461-
s"executor $executorId.")
462-
scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId))
463-
} catch {
464-
case e: Exception =>
465-
logError("Unexpected error during block manager " +
466-
s"decommissioning for executor $executorId: ${e.toString}", e)
467-
}
468-
logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.")
469-
}
470-
} else {
471-
logInfo(s"Skipping decommissioning of executor $executorId.")
472-
}
473-
shouldDisable
474-
}
475-
476422
/**
477423
* Stop making resource offers for the given executor. The executor is marked as lost with
478424
* the loss reason still pending.

core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -363,22 +363,24 @@ private[spark] class ExecutorMonitor(
363363
UNKNOWN_RESOURCE_PROFILE_ID)
364364

365365
// Check if it is a shuffle file, or RDD to pick the correct codepath for update
366-
if (event.blockUpdatedInfo.blockId.isInstanceOf[ShuffleDataBlockId] && shuffleTrackingEnabled) {
367-
/**
368-
* The executor monitor keeps track of locations of cache and shuffle blocks and this can be
369-
* used to decide which executor(s) Spark should shutdown first. Since we move shuffle blocks
370-
* around now this wires it up so that it keeps track of it. We only do this for data blocks
371-
* as index and other blocks blocks do not necessarily mean the entire block has been
372-
* committed.
373-
*/
374-
event.blockUpdatedInfo.blockId match {
375-
case ShuffleDataBlockId(shuffleId, _, _) => exec.addShuffle(shuffleId)
376-
case _ => // For now we only update on data blocks
366+
if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) {
367+
if (event.blockUpdatedInfo.blockId.isInstanceOf[ShuffleDataBlockId] &&
368+
shuffleTrackingEnabled) {
369+
/**
370+
* The executor monitor keeps track of locations of cache and shuffle blocks and this can
371+
* be used to decide which executor(s) Spark should shutdown first. Since we move shuffle
372+
* blocks around now this wires it up so that it keeps track of it. We only do this for
373+
* data blocks as index and other blocks blocks do not necessarily mean the entire block
374+
* has been committed.
375+
*/
376+
event.blockUpdatedInfo.blockId match {
377+
case ShuffleDataBlockId(shuffleId, _, _) => exec.addShuffle(shuffleId)
378+
case _ => // For now we only update on data blocks
379+
}
377380
}
378381
return
379-
} else if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) {
380-
return
381382
}
383+
382384
val storageLevel = event.blockUpdatedInfo.storageLevel
383385
val blockId = event.blockUpdatedInfo.blockId.asInstanceOf[RDDBlockId]
384386

streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -97,17 +97,17 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase
9797
}
9898

9999
/** Verify that a particular executor was scaled down. */
100-
def verifyKilledExec(expectedKilledExec: Option[String]): Unit = {
101-
if (expectedKilledExec.nonEmpty) {
100+
def verifyScaledDownExec(expectedExec: Option[String]): Unit = {
101+
if (expectedExec.nonEmpty) {
102102
val decomInfo = ExecutorDecommissionInfo("spark scale down", false)
103103
if (decommissioning) {
104104
verify(allocationClient, times(1)).decommissionExecutor(
105-
meq(expectedKilledExec.get), meq(decomInfo), meq(true))
106-
verify(allocationClient, never).killExecutor(meq(expectedKilledExec.get))
105+
meq(expectedExec.get), meq(decomInfo), meq(true))
106+
verify(allocationClient, never).killExecutor(meq(expectedExec.get))
107107
} else {
108-
verify(allocationClient, times(1)).killExecutor(meq(expectedKilledExec.get))
108+
verify(allocationClient, times(1)).killExecutor(meq(expectedExec.get))
109109
verify(allocationClient, never).decommissionExecutor(
110-
meq(expectedKilledExec.get), meq(decomInfo), meq(true))
110+
meq(expectedExec.get), meq(decomInfo), meq(true))
111111
}
112112
} else {
113113
if (decommissioning) {
@@ -122,41 +122,41 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase
122122
// Batch proc time = batch interval, should increase allocation by 1
123123
addBatchProcTimeAndVerifyAllocation(batchDurationMillis) {
124124
verifyTotalRequestedExecs(Some(3)) // one already allocated, increase allocation by 1
125-
verifyKilledExec(None)
125+
verifyScaledDownExec(None)
126126
}
127127

128128
// Batch proc time = batch interval * 2, should increase allocation by 2
129129
addBatchProcTimeAndVerifyAllocation(batchDurationMillis * 2) {
130130
verifyTotalRequestedExecs(Some(4))
131-
verifyKilledExec(None)
131+
verifyScaledDownExec(None)
132132
}
133133

134134
// Batch proc time slightly more than the scale up ratio, should increase allocation by 1
135135
addBatchProcTimeAndVerifyAllocation(
136136
batchDurationMillis * STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO.defaultValue.get + 1) {
137137
verifyTotalRequestedExecs(Some(3))
138-
verifyKilledExec(None)
138+
verifyScaledDownExec(None)
139139
}
140140

141141
// Batch proc time slightly less than the scale up ratio, should not change allocation
142142
addBatchProcTimeAndVerifyAllocation(
143143
batchDurationMillis * STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO.defaultValue.get - 1) {
144144
verifyTotalRequestedExecs(None)
145-
verifyKilledExec(None)
145+
verifyScaledDownExec(None)
146146
}
147147

148148
// Batch proc time slightly more than the scale down ratio, should not change allocation
149149
addBatchProcTimeAndVerifyAllocation(
150150
batchDurationMillis * STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO.defaultValue.get + 1) {
151151
verifyTotalRequestedExecs(None)
152-
verifyKilledExec(None)
152+
verifyScaledDownExec(None)
153153
}
154154

155155
// Batch proc time slightly more than the scale down ratio, should not change allocation
156156
addBatchProcTimeAndVerifyAllocation(
157157
batchDurationMillis * STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO.defaultValue.get - 1) {
158158
verifyTotalRequestedExecs(None)
159-
verifyKilledExec(Some("2"))
159+
verifyScaledDownExec(Some("2"))
160160
}
161161
}
162162
}

0 commit comments

Comments
 (0)