@@ -57,6 +57,9 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
5757 .set(config.STORAGE_DECOMMISSION_ENABLED , true )
5858 .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED , persist)
5959 .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED , shuffle)
60+ .set(config.EXECUTOR_CORES .key, " 1" )
61+ .set(config.CPUS_PER_TASK .key, " 1" )
62+ .set(config.EXECUTOR_MEMORY .key, " 1024m" )
6063 // Just replicate blocks as fast as we can during testing, there isn't another
6164 // workload we need to worry about.
6265 .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL , 1L )
@@ -90,6 +93,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
9093 val taskStartSem = new Semaphore (0 )
9194 val broadcastSem = new Semaphore (0 )
9295 val executorRemovedSem = new Semaphore (0 )
96+ val taskStartEvents = ArrayBuffer .empty[SparkListenerTaskStart ]
9397 val taskEndEvents = ArrayBuffer .empty[SparkListenerTaskEnd ]
9498 val blocksUpdated = ArrayBuffer .empty[SparkListenerBlockUpdated ]
9599 sc.addSparkListener(new SparkListener {
@@ -99,6 +103,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
99103 }
100104
101105 override def onTaskStart (taskStart : SparkListenerTaskStart ): Unit = {
106+ taskStartEvents.append(taskStart)
102107 taskStartSem.release()
103108 }
104109
@@ -145,8 +150,10 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
145150 assert(execs.size == numExecs, s " Expected ${numExecs} executors but found ${execs.size}" )
146151
147152 val execToDecommission = execs.head
148- logDebug(s " Decommissioning executor ${execToDecommission}" )
149- sched.decommissionExecutor(execToDecommission, ExecutorDecommissionInfo (" " , false ))
153+ val withTasks = taskStartEvents.map(_.taskInfo.executorId).toSet
154+ val msg = s " Decommissioning executor ${execToDecommission} of $execs $withTasks"
155+ logInfo(msg)
156+ sched.decommissionExecutor(execToDecommission, ExecutorDecommissionInfo (msg, false ))
150157
151158 // Wait for job to finish.
152159 val asyncCountResult = ThreadUtils .awaitResult(asyncCount, 15 .seconds)
@@ -178,7 +185,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
178185 val blocksToManagers = blockLocs.groupBy(_._1).mapValues(_.size)
179186 assert(! blocksToManagers.filter(_._2 > 1 ).isEmpty,
180187 s " We should have a block that has been on multiple BMs in rdds: \n ${rddUpdates} from: \n " +
181- s " ${blocksUpdated}\n but instead we got: \n ${blocksToManagers}" )
188+ s " ${blocksUpdated}\n but instead we got: \n ${blocksToManagers}\n $msg " )
182189 }
183190 // If we're migrating shuffles we look for any shuffle block updates
184191 // as there is no block update on the initial shuffle block write.
@@ -191,11 +198,13 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
191198 val blockId = update.blockUpdatedInfo.blockId
192199 blockId.isInstanceOf [ShuffleIndexBlockId ]
193200 }.size
194- assert(numDataLocs === 1 , s " Expect shuffle data block updates in ${blocksUpdated}" )
195- assert(numIndexLocs === 1 , s " Expect shuffle index block updates in ${blocksUpdated}" )
201+ assert(numDataLocs === 1 , s " Expect shuffle data block updates in ${blocksUpdated}\n $msg " )
202+ assert(numIndexLocs === 1 , s " Expect shuffle index block updates in ${blocksUpdated}\n $msg " )
196203 }
197204 }
198205
206+ logInfo(s " Blocks updated is $blocksUpdated" )
207+
199208 // Since the RDD is cached or shuffled so further usage of same RDD should use the
200209 // cached data. Original RDD partitions should not be recomputed i.e. accum
201210 // should have same value like before
0 commit comments