@@ -57,6 +57,9 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
5757 .set(config.STORAGE_DECOMMISSION_ENABLED , true )
5858 .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED , persist)
5959 .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED , shuffle)
60+ .set(config.EXECUTOR_CORES .key, " 1" )
61+ .set(config.CPUS_PER_TASK .key, " 1" )
62+ .set(config.EXECUTOR_MEMORY .key, " 1024m" )
6063 // Just replicate blocks as fast as we can during testing, there isn't another
6164 // workload we need to worry about.
6265 .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL , 1L )
@@ -90,6 +93,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
9093 val taskStartSem = new Semaphore (0 )
9194 val broadcastSem = new Semaphore (0 )
9295 val executorRemovedSem = new Semaphore (0 )
96+ val taskStartEvents = ArrayBuffer .empty[SparkListenerTaskStart ]
9397 val taskEndEvents = ArrayBuffer .empty[SparkListenerTaskEnd ]
9498 val blocksUpdated = ArrayBuffer .empty[SparkListenerBlockUpdated ]
9599 sc.addSparkListener(new SparkListener {
@@ -99,6 +103,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
99103 }
100104
101105 override def onTaskStart (taskStart : SparkListenerTaskStart ): Unit = {
106+ taskStartEvents.append(taskStart)
102107 taskStartSem.release()
103108 }
104109
@@ -145,8 +150,11 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
145150 assert(execs.size == numExecs, s " Expected ${numExecs} executors but found ${execs.size}" )
146151
147152 val execToDecommission = execs.head
148- logDebug(s " Decommissioning executor ${execToDecommission}" )
149- sched.decommissionExecutor(execToDecommission, ExecutorDecommissionInfo (" " , false ))
153+ val execsWithTasks = taskStartEvents.map(_.taskInfo.executorId).toSet
154+ val decomContext = s " Decommissioning executor ${execToDecommission} of $execs " +
155+ s " (with tasks: $execsWithTasks) "
156+ logInfo(decomContext)
157+ sched.decommissionExecutor(execToDecommission, ExecutorDecommissionInfo (decomContext, false ))
150158
151159 // Wait for job to finish.
152160 val asyncCountResult = ThreadUtils .awaitResult(asyncCount, 15 .seconds)
@@ -178,7 +186,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
178186 val blocksToManagers = blockLocs.groupBy(_._1).mapValues(_.size)
179187 assert(! blocksToManagers.filter(_._2 > 1 ).isEmpty,
180188 s " We should have a block that has been on multiple BMs in rdds: \n ${rddUpdates} from: \n " +
181- s " ${blocksUpdated}\n but instead we got: \n ${blocksToManagers}" )
189+ s " ${blocksUpdated}\n but instead we got: \n ${blocksToManagers}\n $decomContext " )
182190 }
183191 // If we're migrating shuffles we look for any shuffle block updates
184192 // as there is no block update on the initial shuffle block write.
@@ -191,8 +199,10 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
191199 val blockId = update.blockUpdatedInfo.blockId
192200 blockId.isInstanceOf [ShuffleIndexBlockId ]
193201 }.size
194- assert(numDataLocs === 1 , s " Expect shuffle data block updates in ${blocksUpdated}" )
195- assert(numIndexLocs === 1 , s " Expect shuffle index block updates in ${blocksUpdated}" )
202+ assert(numDataLocs === 1 ,
203+ s " Expect shuffle data block updates in ${blocksUpdated}\n $decomContext" )
204+ assert(numIndexLocs === 1 ,
205+ s " Expect shuffle index block updates in ${blocksUpdated}\n $decomContext" )
196206 }
197207 }
198208
0 commit comments