1818package org .apache .spark .storage
1919
2020import java .util .concurrent .Semaphore
21+ import java .util .concurrent .atomic .AtomicReference
2122
2223import scala .collection .mutable .ArrayBuffer
2324import scala .concurrent .duration ._
@@ -57,6 +58,11 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
5758 .set(config.STORAGE_DECOMMISSION_ENABLED , true )
5859 .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED , persist)
5960 .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED , shuffle)
61+ // Force exactly one executor per worker such that all block managers
62+ // get the shuffle and RDD blocks.
63+ .set(config.EXECUTOR_CORES .key, " 1" )
64+ .set(config.CPUS_PER_TASK .key, " 1" )
65+ .set(config.EXECUTOR_MEMORY .key, " 1024m" )
6066 // Just replicate blocks as fast as we can during testing, there isn't another
6167 // workload we need to worry about.
6268 .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL , 1L )
@@ -90,15 +96,19 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
9096 val taskStartSem = new Semaphore (0 )
9197 val broadcastSem = new Semaphore (0 )
9298 val executorRemovedSem = new Semaphore (0 )
99+ val taskStartEvents = ArrayBuffer .empty[SparkListenerTaskStart ]
93100 val taskEndEvents = ArrayBuffer .empty[SparkListenerTaskEnd ]
94101 val blocksUpdated = ArrayBuffer .empty[SparkListenerBlockUpdated ]
102+ val sched = sc.schedulerBackend.asInstanceOf [StandaloneSchedulerBackend ]
103+ val execToDecommission = new AtomicReference [String ](null )
95104 sc.addSparkListener(new SparkListener {
96105
97106 override def onExecutorRemoved (execRemoved : SparkListenerExecutorRemoved ): Unit = {
98107 executorRemovedSem.release()
99108 }
100109
101110 override def onTaskStart (taskStart : SparkListenerTaskStart ): Unit = {
111+ taskStartEvents.append(taskStart)
102112 taskStartSem.release()
103113 }
104114
@@ -107,6 +117,21 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
107117 }
108118
109119 override def onBlockUpdated (blockUpdated : SparkListenerBlockUpdated ): Unit = {
120+ if (blockUpdated.blockUpdatedInfo.blockId.isRDD && persist) {
121+ // Persisted RDD blocks are a bit weirder than shuffle blocks: Even though
122+ // the tasks are run say on executors (0, 1, 2), the RDD blocks might end up only
123+ // on executors 0 and 1. So we cannot just indiscriminately decommission any executor.
124+ // Instead we must decommission an executor that actually has an RDD block.
125+ // Fortunately, this isn't the case for shuffle blocks which are indeed present on all
126+ // executors and thus any executor can be decommissioned when `persist` is false.
127+ val candidateExecToDecom = blockUpdated.blockUpdatedInfo.blockManagerId.executorId
128+ if (execToDecommission.compareAndSet(null , candidateExecToDecom)) {
129+ val decomContext = s " Decommissioning executor ${candidateExecToDecom} for persist "
130+ logInfo(decomContext)
131+ sched.decommissionExecutor(candidateExecToDecom,
132+ ExecutorDecommissionInfo (decomContext, false ))
133+ }
134+ }
110135 // Once broadcast start landing on the executors we're good to proceed.
111136 // We don't only use task start as it can occur before the work is on the executor.
112137 if (blockUpdated.blockUpdatedInfo.blockId.isBroadcast) {
@@ -116,7 +141,6 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
116141 }
117142 })
118143
119-
120144 // Cache the RDD lazily
121145 if (persist) {
122146 testRdd.persist()
@@ -139,14 +163,15 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
139163 ThreadUtils .awaitResult(asyncCount, 15 .seconds)
140164 }
141165
142- // Decommission one of the executors.
143- val sched = sc.schedulerBackend.asInstanceOf [StandaloneSchedulerBackend ]
144166 val execs = sched.getExecutorIds()
145167 assert(execs.size == numExecs, s " Expected ${numExecs} executors but found ${execs.size}" )
146168
147- val execToDecommission = execs.head
148- logDebug(s " Decommissioning executor ${execToDecommission}" )
149- sched.decommissionExecutor(execToDecommission, ExecutorDecommissionInfo (" " , false ))
169+ if (! persist && execToDecommission.compareAndSet(null , execs.head)) {
170+ val decomContext = s " Decommissioning executor ${execToDecommission.get()}"
171+ logInfo(decomContext)
172+ sched.decommissionExecutor(execToDecommission.get(),
173+ ExecutorDecommissionInfo (decomContext, false ))
174+ }
150175
151176 // Wait for job to finish.
152177 val asyncCountResult = ThreadUtils .awaitResult(asyncCount, 15 .seconds)
@@ -191,8 +216,10 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
191216 val blockId = update.blockUpdatedInfo.blockId
192217 blockId.isInstanceOf [ShuffleIndexBlockId ]
193218 }.size
194- assert(numDataLocs === 1 , s " Expect shuffle data block updates in ${blocksUpdated}" )
195- assert(numIndexLocs === 1 , s " Expect shuffle index block updates in ${blocksUpdated}" )
219+ assert(numDataLocs === 1 ,
220+ s " Expect shuffle data block updates in ${blocksUpdated}" )
221+ assert(numIndexLocs === 1 ,
222+ s " Expect shuffle index block updates in ${blocksUpdated}" )
196223 }
197224 }
198225
@@ -206,15 +233,15 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
206233 val execIdToBlocksMapping = storageStatus.map(
207234 status => (status.blockManagerId.executorId, status.blocks)).toMap
208235 // No cached blocks should be present on executor which was decommissioned
209- assert(execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq === Seq (),
236+ assert(execIdToBlocksMapping(execToDecommission.get() ).keys.filter(_.isRDD).toSeq === Seq (),
210237 " Cache blocks should be migrated" )
211238 if (persist) {
212239 // There should still be all the RDD blocks cached
213240 assert(execIdToBlocksMapping.values.flatMap(_.keys).count(_.isRDD) === numParts)
214241 }
215242
216243 // Make the executor we decommissioned exit
217- sched.client.killExecutors(List (execToDecommission))
244+ sched.client.killExecutors(List (execToDecommission.get() ))
218245
219246 // Wait for the executor to be removed
220247 executorRemovedSem.acquire(1 )
0 commit comments