1818package org .apache .spark .storage
1919
2020import java .util .concurrent .Semaphore
21+ import java .util .concurrent .atomic .AtomicBoolean
2122
2223import scala .collection .mutable .ArrayBuffer
2324import scala .concurrent .duration ._
24-
2525import org .scalatest .concurrent .Eventually
26-
2726import org .apache .spark ._
2827import org .apache .spark .internal .config
2928import org .apache .spark .scheduler ._
@@ -98,6 +97,8 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
9897 val taskStartEvents = ArrayBuffer .empty[SparkListenerTaskStart ]
9998 val taskEndEvents = ArrayBuffer .empty[SparkListenerTaskEnd ]
10099 val blocksUpdated = ArrayBuffer .empty[SparkListenerBlockUpdated ]
100+ val sched = sc.schedulerBackend.asInstanceOf [StandaloneSchedulerBackend ]
101+ val decommissionedForPersist = new AtomicBoolean (false )
101102 sc.addSparkListener(new SparkListener {
102103
103104 override def onExecutorRemoved (execRemoved : SparkListenerExecutorRemoved ): Unit = {
@@ -114,6 +115,14 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
114115 }
115116
116117 override def onBlockUpdated (blockUpdated : SparkListenerBlockUpdated ): Unit = {
118+ if (blockUpdated.blockUpdatedInfo.blockId.isRDD && persist &&
119+ decommissionedForPersist.compareAndSet(false , true )) {
120+ val execToDecommission = blockUpdated.blockUpdatedInfo.blockManagerId.executorId
121+ val decomContext = s " Decommissioning executor ${execToDecommission} for persist "
122+ logInfo(decomContext)
123+ sched.decommissionExecutor(execToDecommission,
124+ ExecutorDecommissionInfo (decomContext, false ))
125+ }
117126 // Once broadcast start landing on the executors we're good to proceed.
118127 // We don't only use task start as it can occur before the work is on the executor.
119128 if (blockUpdated.blockUpdatedInfo.blockId.isBroadcast) {
@@ -123,7 +132,6 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
123132 }
124133 })
125134
126-
127135 // Cache the RDD lazily
128136 if (persist) {
129137 testRdd.persist()
@@ -146,17 +154,16 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
146154 ThreadUtils .awaitResult(asyncCount, 15 .seconds)
147155 }
148156
149- // Decommission one of the executors.
150- val sched = sc.schedulerBackend.asInstanceOf [StandaloneSchedulerBackend ]
151157 val execs = sched.getExecutorIds()
152158 assert(execs.size == numExecs, s " Expected ${numExecs} executors but found ${execs.size}" )
153159
160+ // Decommission one of the executors.
154161 val execToDecommission = execs.head
155- val execsWithTasks = taskStartEvents.map(_.taskInfo.executorId).toSet
156- val decomContext = s " Decommissioning executor ${execToDecommission} of $execs " +
157- s " (with tasks: $execsWithTasks ) "
158- logInfo( decomContext)
159- sched.decommissionExecutor(execToDecommission, ExecutorDecommissionInfo (decomContext, false ))
162+ if ( ! persist) {
163+ val decomContext = s " Decommissioning executor ${execToDecommission} for shuffle "
164+ logInfo(decomContext)
165+ sched.decommissionExecutor(execToDecommission, ExecutorDecommissionInfo ( decomContext, false ) )
166+ }
160167
161168 // Wait for job to finish.
162169 val asyncCountResult = ThreadUtils .awaitResult(asyncCount, 15 .seconds)
@@ -188,7 +195,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
188195 val blocksToManagers = blockLocs.groupBy(_._1).mapValues(_.size)
189196 assert(! blocksToManagers.filter(_._2 > 1 ).isEmpty,
190197 s " We should have a block that has been on multiple BMs in rdds: \n ${rddUpdates} from: \n " +
191- s " ${blocksUpdated}\n but instead we got: \n ${blocksToManagers}\n $decomContext " )
198+ s " ${blocksUpdated}\n but instead we got: \n ${blocksToManagers}" )
192199 }
193200 // If we're migrating shuffles we look for any shuffle block updates
194201 // as there is no block update on the initial shuffle block write.
@@ -202,9 +209,9 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
202209 blockId.isInstanceOf [ShuffleIndexBlockId ]
203210 }.size
204211 assert(numDataLocs === 1 ,
205- s " Expect shuffle data block updates in ${blocksUpdated}\n $decomContext " )
212+ s " Expect shuffle data block updates in ${blocksUpdated}" )
206213 assert(numIndexLocs === 1 ,
207- s " Expect shuffle index block updates in ${blocksUpdated}\n $decomContext " )
214+ s " Expect shuffle index block updates in ${blocksUpdated}" )
208215 }
209216 }
210217
0 commit comments