@@ -37,21 +37,34 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
3737 val numExecs = 3
3838 val numParts = 3
3939
40+ test(s " verify that an already running task which is going to cache data succeeds " +
41+ s " on a decommissioned executor after task start " ) {
42+ runDecomTest(true , false , true , 1 )
43+ }
44+
45+ test(s " verify that an already running task which is going to cache data succeeds " +
46+ s " on a decommissioned executor after iterator start " ) {
47+ runDecomTest(true , false , true , 2 )
48+ }
49+
4050 test(s " verify that an already running task which is going to cache data succeeds " +
4151 s " on a decommissioned executor " ) {
42- runDecomTest(true , false , true )
52+ runDecomTest(true , false , true , 0 )
4353 }
4454
4555 test(s " verify that shuffle blocks are migrated " ) {
46- runDecomTest(false , true , false )
56+ runDecomTest(false , true , false , 0 )
4757 }
4858
4959 test(s " verify that both migrations can work at the same time. " ) {
50- runDecomTest(true , true , false )
60+ runDecomTest(true , true , false , 0 )
5161 }
5262
53- private def runDecomTest (persist : Boolean , shuffle : Boolean , migrateDuring : Boolean ) = {
54-
63+ private def runDecomTest (
64+ persist : Boolean ,
65+ shuffle : Boolean ,
66+ migrateDuring : Boolean ,
67+ afterStart : Int ) = {
5568 val master = s " local-cluster[ ${numExecs}, 1, 1024] "
5669 val conf = new SparkConf ().setAppName(" test" ).setMaster(master)
5770 .set(config.Worker .WORKER_DECOMMISSION_ENABLED , true )
@@ -70,16 +83,15 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
7083 timeout = 60000 ) // 60s
7184
7285 val input = sc.parallelize(1 to numParts, numParts)
73- val accum = sc.longAccumulator(" mapperRunAccumulator" )
74- input.count()
86+ val accum = sc.collectionAccumulator[String ](" mapperRunAccumulator" )
7587
7688 // Create a new RDD where we have sleep in each partition, we are also increasing
7789 // the value of accumulator in each partition
7890 val baseRdd = input.mapPartitions { x =>
91+ accum.add(SparkEnv .get.executorId)
7992 if (migrateDuring) {
8093 Thread .sleep(1000 )
8194 }
82- accum.add(1 )
8395 x.map(y => (y, y))
8496 }
8597 val testRdd = shuffle match {
@@ -90,16 +102,25 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
90102 // Listen for the job & block updates
91103 val executorRemovedSem = new Semaphore (0 )
92104 val taskEndEvents = new ConcurrentLinkedQueue [SparkListenerTaskEnd ]()
105+ val taskStartEvents = new ConcurrentLinkedQueue [SparkListenerTaskStart ]()
93106 val blocksUpdated = ArrayBuffer .empty[SparkListenerBlockUpdated ]
94107
95- def getExecutorIdOfAnySuccessfulTask (): Option [String ] =
96- taskEndEvents.asScala.filter(_.taskInfo.successful).map(_.taskInfo.executorId).headOption
108+ def getCandidateExecutorToDecom : Option [String ] = afterStart match {
109+ case 1 => taskStartEvents.asScala.map(_.taskInfo.executorId).headOption
110+ case 2 => accum.value.asScala.headOption
111+ case _ =>
112+ taskEndEvents.asScala.filter(_.taskInfo.successful).map(_.taskInfo.executorId).headOption
113+ }
97114
98115 sc.addSparkListener(new SparkListener {
99116 override def onExecutorRemoved (execRemoved : SparkListenerExecutorRemoved ): Unit = {
100117 executorRemovedSem.release()
101118 }
102119
120+ override def onTaskStart (taskStart : SparkListenerTaskStart ): Unit = {
121+ taskStartEvents.add(taskStart)
122+ }
123+
103124 override def onTaskEnd (taskEnd : SparkListenerTaskEnd ): Unit = {
104125 taskEndEvents.add(taskEnd)
105126 }
@@ -109,7 +130,6 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
109130 }
110131 })
111132
112-
113133 // Cache the RDD lazily
114134 if (persist) {
115135 testRdd.persist()
@@ -123,8 +143,13 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
123143 // Wait for one of the tasks to succeed and finish writing its blocks.
124144 // This way we know that this executor had real data to migrate when it is subsequently
125145 // decommissioned below.
126- eventually(timeout(3 .seconds), interval(10 .milliseconds)) {
127- assert(getExecutorIdOfAnySuccessfulTask().isDefined)
146+ val intervalMs = if (afterStart == 0 ) {
147+ 10 .milliseconds
148+ } else {
149+ 1 .milliseconds
150+ }
151+ eventually(timeout(6 .seconds), interval(intervalMs)) {
152+ assert(getCandidateExecutorToDecom.isDefined)
128153 }
129154 } else {
130155 ThreadUtils .awaitResult(asyncCount, 15 .seconds)
@@ -133,18 +158,15 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
133158 // Decommission one of the executors.
134159 val sched = sc.schedulerBackend.asInstanceOf [StandaloneSchedulerBackend ]
135160
136- // When `migrateDuring=true`, we have already waited for a successful task.
137- // When `migrateDuring=false`, then we know the job is finished, so there
138- // must be a successful executor. Thus the '.get' should succeed in either case.
139- val execToDecommission = getExecutorIdOfAnySuccessfulTask().get
161+ val execToDecommission = getCandidateExecutorToDecom.get
140162 logInfo(s " Decommissioning executor ${execToDecommission}" )
141163 sched.decommissionExecutor(execToDecommission, ExecutorDecommissionInfo (" " , false ))
142164
143165 // Wait for job to finish.
144166 val asyncCountResult = ThreadUtils .awaitResult(asyncCount, 15 .seconds)
145167 assert(asyncCountResult === numParts)
146168 // All tasks finished, so accum should have been increased numParts times.
147- assert(accum.value === numParts)
169+ assert(accum.value.size() === numParts)
148170
149171 sc.listenerBus.waitUntilEmpty()
150172 if (shuffle) {
@@ -192,7 +214,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
192214 // cached data. Original RDD partitions should not be recomputed i.e. accum
193215 // should have same value like before
194216 assert(testRdd.count() === numParts)
195- assert(accum.value === numParts)
217+ assert(accum.value.size() === numParts)
196218
197219 val storageStatus = sc.env.blockManager.master.getStorageStatus
198220 val execIdToBlocksMapping = storageStatus.map(
@@ -215,7 +237,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
215237 // cached data. Original RDD partitions should not be recomputed i.e. accum
216238 // should have same value like before
217239 assert(testRdd.count() === numParts)
218- assert(accum.value === numParts)
240+ assert(accum.value.size() === numParts)
219241
220242 }
221243}
0 commit comments