@@ -36,35 +36,33 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
3636
3737 val numExecs = 3
3838 val numParts = 3
39+ val TaskStarted = " TASK_STARTED"
40+ val TaskEnded = " TASK_ENDED"
41+ val JobEnded = " JOB_ENDED"
3942
4043 test(s " verify that an already running task which is going to cache data succeeds " +
4144 s " on a decommissioned executor after task start " ) {
42- runDecomTest(true , false , true , 1 )
45+ runDecomTest(true , false , TaskStarted )
4346 }
4447
4548 test(s " verify that an already running task which is going to cache data succeeds " +
46- s " on a decommissioned executor after iterator start " ) {
47- runDecomTest(true , false , true , 2 )
48- }
49-
50- test(s " verify that an already running task which is going to cache data succeeds " +
51- s " on a decommissioned executor " ) {
52- runDecomTest(true , false , true , 0 )
49+ s " on a decommissioned executor after one task ends but before job ends " ) {
50+ runDecomTest(true , false , TaskEnded )
5351 }
5452
5553 test(s " verify that shuffle blocks are migrated " ) {
56- runDecomTest(false , true , false , 0 )
54+ runDecomTest(false , true , JobEnded )
5755 }
5856
59- test(s " verify that both migrations can work at the same time. " ) {
60- runDecomTest(true , true , false , 0 )
57+ test(s " verify that both migrations can work at the same time " ) {
58+ runDecomTest(true , true , JobEnded )
6159 }
6260
6361 private def runDecomTest (
6462 persist : Boolean ,
6563 shuffle : Boolean ,
66- migrateDuring : Boolean ,
67- afterStart : Int ) = {
64+ whenToDecom : String ) : Unit = {
65+ val migrateDuring = whenToDecom != JobEnded
6866 val master = s " local-cluster[ ${numExecs}, 1, 1024] "
6967 val conf = new SparkConf ().setAppName(" test" ).setMaster(master)
7068 .set(config.Worker .WORKER_DECOMMISSION_ENABLED , true )
@@ -102,25 +100,19 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
102100 // Listen for the job & block updates
103101 val executorRemovedSem = new Semaphore (0 )
104102 val taskEndEvents = new ConcurrentLinkedQueue [SparkListenerTaskEnd ]()
105- val taskStartEvents = new ConcurrentLinkedQueue [SparkListenerTaskStart ]()
106103 val blocksUpdated = ArrayBuffer .empty[SparkListenerBlockUpdated ]
107104
108- def getCandidateExecutorToDecom : Option [String ] = afterStart match {
109- case 1 => taskStartEvents.asScala.map(_.taskInfo.executorId).headOption
110- case 2 => accum.value.asScala.headOption
111- case _ =>
112- taskEndEvents.asScala.filter(_.taskInfo.successful).map(_.taskInfo.executorId).headOption
105+ def getCandidateExecutorToDecom : Option [String ] = if (whenToDecom == TaskStarted ) {
106+ accum.value.asScala.headOption
107+ } else {
108+ taskEndEvents.asScala.filter(_.taskInfo.successful).map(_.taskInfo.executorId).headOption
113109 }
114110
115111 sc.addSparkListener(new SparkListener {
116112 override def onExecutorRemoved (execRemoved : SparkListenerExecutorRemoved ): Unit = {
117113 executorRemovedSem.release()
118114 }
119115
120- override def onTaskStart (taskStart : SparkListenerTaskStart ): Unit = {
121- taskStartEvents.add(taskStart)
122- }
123-
124116 override def onTaskEnd (taskEnd : SparkListenerTaskEnd ): Unit = {
125117 taskEndEvents.add(taskEnd)
126118 }
@@ -143,10 +135,10 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
143135 // Wait for one of the tasks to succeed and finish writing its blocks.
144136 // This way we know that this executor had real data to migrate when it is subsequently
145137 // decommissioned below.
146- val intervalMs = if (afterStart == 0 ) {
147- 10 .milliseconds
148- } else {
138+ val intervalMs = if (whenToDecom == TaskStarted ) {
149139 1 .milliseconds
140+ } else {
141+ 10 .milliseconds
150142 }
151143 eventually(timeout(6 .seconds), interval(intervalMs)) {
152144 assert(getCandidateExecutorToDecom.isDefined)
@@ -160,7 +152,9 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
160152
161153 val execToDecommission = getCandidateExecutorToDecom.get
162154 logInfo(s " Decommissioning executor ${execToDecommission}" )
163- sched.decommissionExecutor(execToDecommission, ExecutorDecommissionInfo (" " , false ))
155+ sched.decommissionExecutor(
156+ execToDecommission,
157+ ExecutorDecommissionInfo (" " , isHostDecommissioned = false ))
164158
165159 // Wait for job to finish.
166160 val asyncCountResult = ThreadUtils .awaitResult(asyncCount, 15 .seconds)
@@ -238,6 +232,5 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
238232 // should have same value like before
239233 assert(testRdd.count() === numParts)
240234 assert(accum.value.size() === numParts)
241-
242235 }
243236}
0 commit comments