Skip to content

Commit fea00dd

Browse files
committed
Code review feedback, various cleanups
1 parent ba51033 commit fea00dd

File tree

6 files changed

+14
-19
lines changed

6 files changed

+14
-19
lines changed

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ private[spark] class CoarseGrainedExecutorBackend(
7979
*/
8080
private[executor] val taskResources = new mutable.HashMap[Long, Map[String, ResourceInformation]]
8181

82-
// Track our decommissioning status internally.
8382
@volatile private var decommissioned = false
8483

8584
override def onStart(): Unit = {
@@ -291,7 +290,7 @@ private[spark] class CoarseGrainedExecutorBackend(
291290
// is viewed as acceptable to minimize introduction of any new locking structures in critical
292291
// code paths.
293292

294-
val shutdownThread = new Thread() {
293+
val shutdownThread = ThreadUtils.runInNewThread("wait-for-blocks-to-migrate") {
295294
var lastTaskRunningTime = System.nanoTime()
296295
val sleep_time = 1000 // 1s
297296

@@ -300,11 +299,10 @@ private[spark] class CoarseGrainedExecutorBackend(
300299
if (executor == null || executor.numRunningTasks == 0) {
301300
if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
302301
logInfo("No running tasks, checking migrations")
303-
val allBlocksMigrated = env.blockManager.lastMigrationInfo()
302+
val (migrationTime, allBlocksMigrated) = env.blockManager.lastMigrationInfo()
304303
// We can only trust allBlocksMigrated boolean value if there were no tasks running
305304
// since the start of computing it.
306-
if (allBlocksMigrated._2 &&
307-
(allBlocksMigrated._1 > lastTaskRunningTime)) {
305+
if (allBlocksMigrated && (migrationTime > lastTaskRunningTime)) {
308306
logInfo("No running tasks, all blocks migrated, stopping.")
309307
exitExecutor(0, "Finished decommissioning", notifyDriver = true)
310308
} else {
@@ -316,7 +314,7 @@ private[spark] class CoarseGrainedExecutorBackend(
316314
}
317315
Thread.sleep(sleep_time)
318316
} else {
319-
logInfo("Blocked from shutdown by running task")
317+
logInfo("Blocked from shutdown by running ${executor.numRunningtasks} tasks")
320318
// If there is a running task it could store blocks, so make sure we wait for a
321319
// migration loop to complete after the last task is done.
322320
Thread.sleep(sleep_time)

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,6 @@ private[spark] object CoarseGrainedClusterMessages {
137137
// The message to check if `CoarseGrainedSchedulerBackend` thinks the executor is alive or not.
138138
case class IsExecutorAlive(executorId: String) extends CoarseGrainedClusterMessage
139139

140-
// Used to ask an executor to decommission it's self.
140+
// Used to ask an executor to decommission itself. (Can be an internal message)
141141
case object DecommissionSelf extends CoarseGrainedClusterMessage
142142
}

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -442,8 +442,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
442442
case e: Exception =>
443443
logError(s"Unexpected error during decommissioning ${e.toString}", e)
444444
}
445-
// Send decommission message to the executor (it could have originated on the executor
446-
// but not necessarily.
445+
// Send decommission message to the executor, this may be a duplicate since the executor
446+
// could have been the one to notify us. But it's also possible the notification came from
447+
// elsewhere and the executor does not yet know.
447448
executorDataMap.get(executorId) match {
448449
case Some(executorInfo) =>
449450
executorInfo.executorEndpoint.send(DecommissionSelf)

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1823,7 +1823,7 @@ private[spark] class BlockManager(
18231823
}
18241824

18251825
/*
1826-
* Returns the last migration time and a boolean for if all blocks have been migrated.
1826+
* Returns the last migration time and a boolean denoting if all the blocks have been migrated.
18271827
* If there are any tasks running since that time the boolean may be incorrect.
18281828
*/
18291829
private[spark] def lastMigrationInfo(): (Long, Boolean) = {

core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ private[storage] class BlockManagerDecommissioner(
102102
} else {
103103
logError(s"Skipping block ${shuffleBlockInfo} because it has failed ${retryCount}")
104104
}
105-
logInfo(s"Migrated ${shuffleBlockInfo}")
106105
numMigratedShuffles.incrementAndGet()
107106
}
108107
}
@@ -135,8 +134,10 @@ private[storage] class BlockManagerDecommissioner(
135134

136135
// Set if we encounter an error attempting to migrate and stop.
137136
@volatile private var stopped = false
138-
@volatile private var stoppedRDD = false
139-
@volatile private var stoppedShuffle = false
137+
@volatile private var stoppedRDD =
138+
!conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)
139+
@volatile private var stoppedShuffle =
140+
!conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)
140141

141142
private val migrationPeers =
142143
mutable.HashMap[BlockManagerId, ShuffleMigrationRunnable]()
@@ -301,13 +302,9 @@ private[storage] class BlockManagerDecommissioner(
301302
logInfo("Starting block migration thread")
302303
if (conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) {
303304
rddBlockMigrationExecutor.submit(rddBlockMigrationRunnable)
304-
} else {
305-
stoppedRDD = true
306305
}
307306
if (conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) {
308307
shuffleBlockMigrationRefreshExecutor.submit(shuffleBlockMigrationRefreshRunnable)
309-
} else {
310-
stoppedShuffle = true
311308
}
312309
if (!conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED) &&
313310
!conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) {

core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers {
3838
private val sparkConf = new SparkConf(false)
3939
.set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
4040
.set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, true)
41-
// Just replicate blocks as fast as we can during testing, there isn't another
41+
// Just replicate blocks quickly during testing, as there isn't another
4242
// workload we need to worry about.
4343
.set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L)
4444

@@ -92,6 +92,5 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers {
9292
} finally {
9393
bmDecomManager.stop()
9494
}
95-
bmDecomManager.stop()
9695
}
9796
}

0 commit comments

Comments
 (0)