1818package org .apache .spark .storage
1919
2020import java .util .concurrent .Semaphore
21+ import java .util .concurrent .atomic .AtomicReference
2122
2223import scala .collection .mutable .ArrayBuffer
2324import scala .concurrent .duration ._
@@ -57,6 +58,11 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
5758 .set(config.STORAGE_DECOMMISSION_ENABLED , true )
5859 .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED , persist)
5960 .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED , shuffle)
61+ // Force exactly one executor per worker such that all block managers
62+ // get the shuffle and RDD blocks.
63+ .set(config.EXECUTOR_CORES .key, " 1" )
64+ .set(config.CPUS_PER_TASK .key, " 1" )
65+ .set(config.EXECUTOR_MEMORY .key, " 1024m" )
6066 // Just replicate blocks as fast as we can during testing, there isn't another
6167 // workload we need to worry about.
6268 .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL , 1L )
@@ -92,6 +98,8 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
9298 val executorRemovedSem = new Semaphore (0 )
9399 val taskEndEvents = ArrayBuffer .empty[SparkListenerTaskEnd ]
94100 val blocksUpdated = ArrayBuffer .empty[SparkListenerBlockUpdated ]
101+ val sched = sc.schedulerBackend.asInstanceOf [StandaloneSchedulerBackend ]
102+ val execToDecommission = new AtomicReference [String ](null )
95103 sc.addSparkListener(new SparkListener {
96104
97105 override def onExecutorRemoved (execRemoved : SparkListenerExecutorRemoved ): Unit = {
@@ -107,6 +115,21 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
107115 }
108116
109117 override def onBlockUpdated (blockUpdated : SparkListenerBlockUpdated ): Unit = {
118+ if (blockUpdated.blockUpdatedInfo.blockId.isRDD && persist) {
119+ // Persisted RDD blocks are a bit weirder than shuffle blocks: Even though
120+ // the tasks are run say on executors (0, 1, 2), the RDD blocks might end up only
121+ // on executors 0 and 1. So we cannot just indiscriminately decommission any executor.
122+ // Instead we must decommission an executor that actually has an RDD block.
123+ // Fortunately, this isn't the case for shuffle blocks which are indeed present on all
124+ // executors and thus any executor can be decommissioned when `persist` is false.
125+ val candidateExecToDecom = blockUpdated.blockUpdatedInfo.blockManagerId.executorId
126+ if (execToDecommission.compareAndSet(null , candidateExecToDecom)) {
127+ val decomContext = s " Decommissioning executor ${candidateExecToDecom} for persist "
128+ logInfo(decomContext)
129+ sched.decommissionExecutor(candidateExecToDecom,
130+ ExecutorDecommissionInfo (decomContext, false ))
131+ }
132+ }
110133 // Once broadcast start landing on the executors we're good to proceed.
111134 // We don't only use task start as it can occur before the work is on the executor.
112135 if (blockUpdated.blockUpdatedInfo.blockId.isBroadcast) {
@@ -139,14 +162,17 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
139162 ThreadUtils .awaitResult(asyncCount, 15 .seconds)
140163 }
141164
142- // Decommission one of the executors.
143- val sched = sc.schedulerBackend.asInstanceOf [StandaloneSchedulerBackend ]
144165 val execs = sched.getExecutorIds()
145166 assert(execs.size == numExecs, s " Expected ${numExecs} executors but found ${execs.size}" )
146167
147- val execToDecommission = execs.head
148- logDebug(s " Decommissioning executor ${execToDecommission}" )
149- sched.decommissionExecutor(execToDecommission, ExecutorDecommissionInfo (" " , false ))
168+ if (! persist && shuffle && execToDecommission.compareAndSet(null , execs.head)) {
169+ // But for shuffle blocks, we can decommission any executor since the shuffle blocks
170+ // are indeed present on any executor.
171+ val decomContext = s " Decommissioning executor ${execToDecommission.get()}"
172+ logInfo(decomContext)
173+ sched.decommissionExecutor(execToDecommission.get(),
174+ ExecutorDecommissionInfo (decomContext, false ))
175+ }
150176
151177 // Wait for job to finish.
152178 val asyncCountResult = ThreadUtils .awaitResult(asyncCount, 15 .seconds)
@@ -206,15 +232,15 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
206232 val execIdToBlocksMapping = storageStatus.map(
207233 status => (status.blockManagerId.executorId, status.blocks)).toMap
208234 // No cached blocks should be present on executor which was decommissioned
209- assert(execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq === Seq (),
235+ assert(execIdToBlocksMapping(execToDecommission.get() ).keys.filter(_.isRDD).toSeq === Seq (),
210236 " Cache blocks should be migrated" )
211237 if (persist) {
212238 // There should still be all the RDD blocks cached
213239 assert(execIdToBlocksMapping.values.flatMap(_.keys).count(_.isRDD) === numParts)
214240 }
215241
216242 // Make the executor we decommissioned exit
217- sched.client.killExecutors(List (execToDecommission))
243+ sched.client.killExecutors(List (execToDecommission.get() ))
218244
219245 // Wait for the executor to be removed
220246 executorRemovedSem.acquire(1 )
0 commit comments