@@ -175,31 +175,66 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
175175 /** Length of time to wait while draining listener events. */
176176 val WAIT_TIMEOUT_MILLIS = 10000
177177
178- val submittedStageInfos = new HashSet [StageInfo ]
179- val successfulStages = new HashSet [Int ]
180- val failedStages = new ArrayBuffer [Int ]
181- val stageByOrderOfExecution = new ArrayBuffer [Int ]
182- val endedTasks = new HashSet [Long ]
183- val sparkListener = new SparkListener () {
178+ /**
179+ * Listeners which records some information to verify in UTs. Getter-kind methods in this class
180+ * ensures the value is returned after ensuring there's no event to process, as well as the
181+ * value is immutable: prevent showing odd result by race condition.
182+ */
183+ class EventInfoRecordingListener extends SparkListener {
184+ private val _submittedStageInfos = new HashSet [StageInfo ]
185+ private val _successfulStages = new HashSet [Int ]
186+ private val _failedStages = new ArrayBuffer [Int ]
187+ private val _stageByOrderOfExecution = new ArrayBuffer [Int ]
188+ private val _endedTasks = new HashSet [Long ]
189+
184190 override def onStageSubmitted (stageSubmitted : SparkListenerStageSubmitted ) {
185- submittedStageInfos += stageSubmitted.stageInfo
191+ _submittedStageInfos += stageSubmitted.stageInfo
186192 }
187193
188194 override def onStageCompleted (stageCompleted : SparkListenerStageCompleted ) {
189195 val stageInfo = stageCompleted.stageInfo
190- stageByOrderOfExecution += stageInfo.stageId
196+ _stageByOrderOfExecution += stageInfo.stageId
191197 if (stageInfo.failureReason.isEmpty) {
192- successfulStages += stageInfo.stageId
198+ _successfulStages += stageInfo.stageId
193199 } else {
194- failedStages += stageInfo.stageId
200+ _failedStages += stageInfo.stageId
195201 }
196202 }
197203
198204 override def onTaskEnd (taskEnd : SparkListenerTaskEnd ): Unit = {
199- endedTasks += taskEnd.taskInfo.taskId
205+ _endedTasks += taskEnd.taskInfo.taskId
206+ }
207+
208+ def submittedStageInfos : Set [StageInfo ] = {
209+ waitForListeners()
210+ _submittedStageInfos.toSet
211+ }
212+
213+ def successfulStages : Set [Int ] = {
214+ waitForListeners()
215+ _successfulStages.toSet
216+ }
217+
218+ def failedStages : List [Int ] = {
219+ waitForListeners()
220+ _failedStages.toList
221+ }
222+
223+ def stageByOrderOfExecution : List [Int ] = {
224+ waitForListeners()
225+ _stageByOrderOfExecution.toList
226+ }
227+
228+ def endedTasks : Set [Long ] = {
229+ waitForListeners()
230+ _endedTasks.toSet
200231 }
232+
233+ private def waitForListeners (): Unit = sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
201234 }
202235
236+ var sparkListener : EventInfoRecordingListener = null
237+
203238 var mapOutputTracker : MapOutputTrackerMaster = null
204239 var broadcastManager : BroadcastManager = null
205240 var securityMgr : SecurityManager = null
@@ -248,10 +283,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
248283
249284 private def init (testConf : SparkConf ): Unit = {
250285 sc = new SparkContext (" local[2]" , " DAGSchedulerSuite" , testConf)
251- submittedStageInfos.clear()
252- successfulStages.clear()
253- failedStages.clear()
254- endedTasks.clear()
286+ sparkListener = new EventInfoRecordingListener
255287 failure = null
256288 sc.addSparkListener(sparkListener)
257289 taskSets.clear()
@@ -374,9 +406,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
374406 }
375407
376408 test(" [SPARK-3353] parent stage should have lower stage id" ) {
377- stageByOrderOfExecution.clear()
378409 sc.parallelize(1 to 10 ).map(x => (x, x)).reduceByKey(_ + _, 4 ).count()
379- sc.listenerBus.waitUntilEmpty( WAIT_TIMEOUT_MILLIS )
410+ val stageByOrderOfExecution = sparkListener.stageByOrderOfExecution
380411 assert(stageByOrderOfExecution.length === 2 )
381412 assert(stageByOrderOfExecution(0 ) < stageByOrderOfExecution(1 ))
382413 }
@@ -619,19 +650,15 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
619650 submit(unserializableRdd, Array (0 ))
620651 assert(failure.getMessage.startsWith(
621652 " Job aborted due to stage failure: Task not serializable:" ))
622- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
623- assert(failedStages.contains(0 ))
624- assert(failedStages.size === 1 )
653+ assert(sparkListener.failedStages === Seq (0 ))
625654 assertDataStructuresEmpty()
626655 }
627656
628657 test(" trivial job failure" ) {
629658 submit(new MyRDD (sc, 1 , Nil ), Array (0 ))
630659 failed(taskSets(0 ), " some failure" )
631660 assert(failure.getMessage === " Job aborted due to stage failure: some failure" )
632- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
633- assert(failedStages.contains(0 ))
634- assert(failedStages.size === 1 )
661+ assert(sparkListener.failedStages === Seq (0 ))
635662 assertDataStructuresEmpty()
636663 }
637664
@@ -640,9 +667,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
640667 val jobId = submit(rdd, Array (0 ))
641668 cancel(jobId)
642669 assert(failure.getMessage === s " Job $jobId cancelled " )
643- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
644- assert(failedStages.contains(0 ))
645- assert(failedStages.size === 1 )
670+ assert(sparkListener.failedStages === Seq (0 ))
646671 assertDataStructuresEmpty()
647672 }
648673
@@ -700,9 +725,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
700725 assert(results === Map (0 -> 42 ))
701726 assertDataStructuresEmpty()
702727
703- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
704- assert(failedStages.isEmpty)
705- assert(successfulStages.contains(0 ))
728+ assert(sparkListener.failedStages.isEmpty)
729+ assert(sparkListener.successfulStages.contains(0 ))
706730 }
707731
708732 test(" run trivial shuffle" ) {
@@ -1115,17 +1139,15 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
11151139 taskSets(1 ).tasks(0 ),
11161140 FetchFailed (makeBlockManagerId(" hostA" ), shuffleId, 0 , 0 , " ignored" ),
11171141 null ))
1118- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
1119- assert(failedStages.contains(1 ))
1142+ assert(sparkListener.failedStages.contains(1 ))
11201143
11211144 // The second ResultTask fails, with a fetch failure for the output from the second mapper.
11221145 runEvent(makeCompletionEvent(
11231146 taskSets(1 ).tasks(0 ),
11241147 FetchFailed (makeBlockManagerId(" hostA" ), shuffleId, 1 , 1 , " ignored" ),
11251148 null ))
11261149 // The SparkListener should not receive redundant failure events.
1127- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
1128- assert(failedStages.size == 1 )
1150+ assert(sparkListener.failedStages.size === 1 )
11291151 }
11301152
11311153 test(" Retry all the tasks on a resubmitted attempt of a barrier stage caused by FetchFailure" ) {
@@ -1172,7 +1194,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
11721194 taskSets(0 ).tasks(1 ),
11731195 TaskKilled (" test" ),
11741196 null ))
1175- assert(failedStages === Seq (0 ))
1197+ assert(sparkListener. failedStages === Seq (0 ))
11761198 assert(mapOutputTracker.findMissingPartitions(shuffleId) === Some (Seq (0 , 1 )))
11771199
11781200 scheduler.resubmitFailedStages()
@@ -1226,11 +1248,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
12261248
12271249 val mapStageId = 0
12281250 def countSubmittedMapStageAttempts (): Int = {
1229- submittedStageInfos.count(_.stageId == mapStageId)
1251+ sparkListener. submittedStageInfos.count(_.stageId == mapStageId)
12301252 }
12311253
12321254 // The map stage should have been submitted.
1233- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
12341255 assert(countSubmittedMapStageAttempts() === 1 )
12351256
12361257 complete(taskSets(0 ), Seq (
@@ -1247,12 +1268,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
12471268 taskSets(1 ).tasks(0 ),
12481269 FetchFailed (makeBlockManagerId(" hostA" ), shuffleId, 0 , 0 , " ignored" ),
12491270 null ))
1250- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
1251- assert(failedStages.contains(1 ))
1271+ assert(sparkListener.failedStages.contains(1 ))
12521272
12531273 // Trigger resubmission of the failed map stage.
12541274 runEvent(ResubmitFailedStages )
1255- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
12561275
12571276 // Another attempt for the map stage should have been submitted, resulting in 2 total attempts.
12581277 assert(countSubmittedMapStageAttempts() === 2 )
@@ -1269,7 +1288,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
12691288 // shouldn't effect anything -- our calling it just makes *SURE* it gets called between the
12701289 // desired event and our check.
12711290 runEvent(ResubmitFailedStages )
1272- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
12731291 assert(countSubmittedMapStageAttempts() === 2 )
12741292
12751293 }
@@ -1287,14 +1305,13 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
12871305 submit(reduceRdd, Array (0 , 1 ))
12881306
12891307 def countSubmittedReduceStageAttempts (): Int = {
1290- submittedStageInfos.count(_.stageId == 1 )
1308+ sparkListener. submittedStageInfos.count(_.stageId == 1 )
12911309 }
12921310 def countSubmittedMapStageAttempts (): Int = {
1293- submittedStageInfos.count(_.stageId == 0 )
1311+ sparkListener. submittedStageInfos.count(_.stageId == 0 )
12941312 }
12951313
12961314 // The map stage should have been submitted.
1297- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
12981315 assert(countSubmittedMapStageAttempts() === 1 )
12991316
13001317 // Complete the map stage.
@@ -1303,7 +1320,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
13031320 (Success , makeMapStatus(" hostB" , 2 ))))
13041321
13051322 // The reduce stage should have been submitted.
1306- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
13071323 assert(countSubmittedReduceStageAttempts() === 1 )
13081324
13091325 // The first result task fails, with a fetch failure for the output from the first mapper.
@@ -1318,7 +1334,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
13181334
13191335 // Because the map stage finished, another attempt for the reduce stage should have been
13201336 // submitted, resulting in 2 total attempts for each the map and the reduce stage.
1321- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
13221337 assert(countSubmittedMapStageAttempts() === 2 )
13231338 assert(countSubmittedReduceStageAttempts() === 2 )
13241339
@@ -1348,10 +1363,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
13481363 runEvent(makeCompletionEvent(
13491364 taskSets(0 ).tasks(1 ), Success , 42 ,
13501365 Seq .empty, Array .empty, createFakeTaskInfoWithId(1 )))
1351- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
13521366 // verify stage exists
13531367 assert(scheduler.stageIdToStage.contains(0 ))
1354- assert(endedTasks.size == 2 )
1368+ assert(sparkListener. endedTasks.size = == 2 )
13551369
13561370 // finish other 2 tasks
13571371 runEvent(makeCompletionEvent(
@@ -1360,8 +1374,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
13601374 runEvent(makeCompletionEvent(
13611375 taskSets(0 ).tasks(3 ), Success , 42 ,
13621376 Seq .empty, Array .empty, createFakeTaskInfoWithId(3 )))
1363- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
1364- assert(endedTasks.size == 4 )
1377+ assert(sparkListener.endedTasks.size === 4 )
13651378
13661379 // verify the stage is done
13671380 assert(! scheduler.stageIdToStage.contains(0 ))
@@ -1371,15 +1384,13 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
13711384 runEvent(makeCompletionEvent(
13721385 taskSets(0 ).tasks(3 ), Success , 42 ,
13731386 Seq .empty, Array .empty, createFakeTaskInfoWithId(5 )))
1374- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
1375- assert(endedTasks.size == 5 )
1387+ assert(sparkListener.endedTasks.size === 5 )
13761388
13771389 // make sure non successful tasks also send out event
13781390 runEvent(makeCompletionEvent(
13791391 taskSets(0 ).tasks(3 ), UnknownReason , 42 ,
13801392 Seq .empty, Array .empty, createFakeTaskInfoWithId(6 )))
1381- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
1382- assert(endedTasks.size == 6 )
1393+ assert(sparkListener.endedTasks.size === 6 )
13831394 }
13841395
13851396 test(" ignore late map task completions" ) {
@@ -1452,8 +1463,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
14521463
14531464 // Listener bus should get told about the map stage failing, but not the reduce stage
14541465 // (since the reduce stage hasn't been started yet).
1455- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
1456- assert(failedStages.toSet === Set (0 ))
1466+ assert(sparkListener.failedStages.toSet === Set (0 ))
14571467
14581468 assertDataStructuresEmpty()
14591469 }
@@ -1696,9 +1706,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
16961706 assert(cancelledStages.toSet === Set (0 , 2 ))
16971707
16981708 // Make sure the listeners got told about both failed stages.
1699- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
1700- assert(successfulStages.isEmpty)
1701- assert(failedStages.toSet === Set (0 , 2 ))
1709+ assert(sparkListener.successfulStages.isEmpty)
1710+ assert(sparkListener.failedStages.toSet === Set (0 , 2 ))
17021711
17031712 assert(listener1.failureMessage === s " Job aborted due to stage failure: $stageFailureMessage" )
17041713 assert(listener2.failureMessage === s " Job aborted due to stage failure: $stageFailureMessage" )
@@ -2672,19 +2681,18 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
26722681
26732682 val mapStageId = 0
26742683 def countSubmittedMapStageAttempts (): Int = {
2675- submittedStageInfos.count(_.stageId == mapStageId)
2684+ sparkListener. submittedStageInfos.count(_.stageId == mapStageId)
26762685 }
26772686
26782687 // The map stage should have been submitted.
2679- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
26802688 assert(countSubmittedMapStageAttempts() === 1 )
26812689
26822690 // The first map task fails with TaskKilled.
26832691 runEvent(makeCompletionEvent(
26842692 taskSets(0 ).tasks(0 ),
26852693 TaskKilled (" test" ),
26862694 null ))
2687- assert(failedStages === Seq (0 ))
2695+ assert(sparkListener. failedStages === Seq (0 ))
26882696
26892697 // The second map task fails with TaskKilled.
26902698 runEvent(makeCompletionEvent(
@@ -2694,7 +2702,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
26942702
26952703 // Trigger resubmission of the failed map stage.
26962704 runEvent(ResubmitFailedStages )
2697- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
26982705
26992706 // Another attempt for the map stage should have been submitted, resulting in 2 total attempts.
27002707 assert(countSubmittedMapStageAttempts() === 2 )
@@ -2708,23 +2715,21 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
27082715
27092716 val mapStageId = 0
27102717 def countSubmittedMapStageAttempts (): Int = {
2711- submittedStageInfos.count(_.stageId == mapStageId)
2718+ sparkListener. submittedStageInfos.count(_.stageId == mapStageId)
27122719 }
27132720
27142721 // The map stage should have been submitted.
2715- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
27162722 assert(countSubmittedMapStageAttempts() === 1 )
27172723
27182724 // The first map task fails with TaskKilled.
27192725 runEvent(makeCompletionEvent(
27202726 taskSets(0 ).tasks(0 ),
27212727 TaskKilled (" test" ),
27222728 null ))
2723- assert(failedStages === Seq (0 ))
2729+ assert(sparkListener. failedStages === Seq (0 ))
27242730
27252731 // Trigger resubmission of the failed map stage.
27262732 runEvent(ResubmitFailedStages )
2727- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
27282733
27292734 // Another attempt for the map stage should have been submitted, resulting in 2 total attempts.
27302735 assert(countSubmittedMapStageAttempts() === 2 )
@@ -2737,7 +2742,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
27372742
27382743 // The second map task failure doesn't trigger stage retry.
27392744 runEvent(ResubmitFailedStages )
2740- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
27412745 assert(countSubmittedMapStageAttempts() === 2 )
27422746 }
27432747
0 commit comments