diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index fb68f1b02b989..038f036c602c3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -162,32 +162,67 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi /** Length of time to wait while draining listener events. */ val WAIT_TIMEOUT_MILLIS = 10000 - val sparkListener = new SparkListener() { - val submittedStageInfos = new HashSet[StageInfo] - val successfulStages = new HashSet[Int] - val failedStages = new ArrayBuffer[Int] - val stageByOrderOfExecution = new ArrayBuffer[Int] - val endedTasks = new HashSet[Long] + + /** + * Listeners which records some information to verify in UTs. Getter-kind methods in this class + * ensures the value is returned after ensuring there's no event to process, as well as the + * value is immutable: prevent showing odd result by race condition. + */ + class EventInfoRecordingListener extends SparkListener { + private val _submittedStageInfos = new HashSet[StageInfo] + private val _successfulStages = new HashSet[Int] + private val _failedStages = new ArrayBuffer[Int] + private val _stageByOrderOfExecution = new ArrayBuffer[Int] + private val _endedTasks = new HashSet[Long] override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { - submittedStageInfos += stageSubmitted.stageInfo + _submittedStageInfos += stageSubmitted.stageInfo } override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { val stageInfo = stageCompleted.stageInfo - stageByOrderOfExecution += stageInfo.stageId + _stageByOrderOfExecution += stageInfo.stageId if (stageInfo.failureReason.isEmpty) { - successfulStages += stageInfo.stageId + _successfulStages += stageInfo.stageId } else { - failedStages += stageInfo.stageId + _failedStages += stageInfo.stageId } } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { - endedTasks += taskEnd.taskInfo.taskId + _endedTasks += taskEnd.taskInfo.taskId + } + + def submittedStageInfos: Set[StageInfo] = { + waitForListeners() + _submittedStageInfos.toSet + } + + def successfulStages: Set[Int] = { + waitForListeners() + _successfulStages.toSet + } + + def failedStages: List[Int] = { + waitForListeners() + _failedStages.toList + } + + def stageByOrderOfExecution: List[Int] = { + waitForListeners() + _stageByOrderOfExecution.toList + } + + def endedTasks: Set[Long] = { + waitForListeners() + _endedTasks.toSet } + + private def waitForListeners(): Unit = sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) } + var sparkListener: EventInfoRecordingListener = null + var mapOutputTracker: MapOutputTrackerMaster = null var broadcastManager: BroadcastManager = null var securityMgr: SecurityManager = null @@ -236,10 +271,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi private def init(testConf: SparkConf): Unit = { sc = new SparkContext("local[2]", "DAGSchedulerSuite", testConf) - sparkListener.submittedStageInfos.clear() - sparkListener.successfulStages.clear() - sparkListener.failedStages.clear() - sparkListener.endedTasks.clear() + sparkListener = new EventInfoRecordingListener failure = null sc.addSparkListener(sparkListener) taskSets.clear() @@ -361,11 +393,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } test("[SPARK-3353] parent stage should have lower stage id") { - sparkListener.stageByOrderOfExecution.clear() sc.parallelize(1 to 10).map(x => (x, x)).reduceByKey(_ + _, 4).count() - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(sparkListener.stageByOrderOfExecution.length === 2) - assert(sparkListener.stageByOrderOfExecution(0) < sparkListener.stageByOrderOfExecution(1)) + val stageByOrderOfExecution = sparkListener.stageByOrderOfExecution + assert(stageByOrderOfExecution.length === 2) + assert(stageByOrderOfExecution(0) < stageByOrderOfExecution(1)) } /** @@ -606,9 +637,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi submit(unserializableRdd, Array(0)) assert(failure.getMessage.startsWith( "Job aborted due to stage failure: Task not serializable:")) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(sparkListener.failedStages.contains(0)) - assert(sparkListener.failedStages.size === 1) + assert(sparkListener.failedStages === Seq(0)) assertDataStructuresEmpty() } @@ -616,9 +645,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi submit(new MyRDD(sc, 1, Nil), Array(0)) failed(taskSets(0), "some failure") assert(failure.getMessage === "Job aborted due to stage failure: some failure") - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(sparkListener.failedStages.contains(0)) - assert(sparkListener.failedStages.size === 1) + assert(sparkListener.failedStages === Seq(0)) assertDataStructuresEmpty() } @@ -627,9 +654,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi val jobId = submit(rdd, Array(0)) cancel(jobId) assert(failure.getMessage === s"Job $jobId cancelled ") - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(sparkListener.failedStages.contains(0)) - assert(sparkListener.failedStages.size === 1) + assert(sparkListener.failedStages === Seq(0)) assertDataStructuresEmpty() } @@ -683,7 +708,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(results === Map(0 -> 42)) assertDataStructuresEmpty() - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(sparkListener.failedStages.isEmpty) assert(sparkListener.successfulStages.contains(0)) } @@ -1068,7 +1092,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi taskSets(1).tasks(0), FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null)) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(sparkListener.failedStages.contains(1)) // The second ResultTask fails, with a fetch failure for the output from the second mapper. @@ -1077,8 +1100,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1, "ignored"), null)) // The SparkListener should not receive redundant failure events. - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(sparkListener.failedStages.size == 1) + assert(sparkListener.failedStages.size === 1) } test("Retry all the tasks on a resubmitted attempt of a barrier stage caused by FetchFailure") { @@ -1183,7 +1205,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } // The map stage should have been submitted. - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedMapStageAttempts() === 1) complete(taskSets(0), Seq( @@ -1200,12 +1221,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi taskSets(1).tasks(0), FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null)) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(sparkListener.failedStages.contains(1)) // Trigger resubmission of the failed map stage. runEvent(ResubmitFailedStages) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) // Another attempt for the map stage should have been submitted, resulting in 2 total attempts. assert(countSubmittedMapStageAttempts() === 2) @@ -1222,7 +1241,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // shouldn't effect anything -- our calling it just makes *SURE* it gets called between the // desired event and our check. runEvent(ResubmitFailedStages) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedMapStageAttempts() === 2) } @@ -1247,7 +1265,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } // The map stage should have been submitted. - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedMapStageAttempts() === 1) // Complete the map stage. @@ -1256,7 +1273,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi (Success, makeMapStatus("hostB", 2)))) // The reduce stage should have been submitted. - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedReduceStageAttempts() === 1) // The first result task fails, with a fetch failure for the output from the first mapper. @@ -1271,7 +1287,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // Because the map stage finished, another attempt for the reduce stage should have been // submitted, resulting in 2 total attempts for each the map and the reduce stage. - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedMapStageAttempts() === 2) assert(countSubmittedReduceStageAttempts() === 2) @@ -1301,11 +1316,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi runEvent(makeCompletionEvent( taskSets(0).tasks(1), Success, 42, Seq.empty, createFakeTaskInfoWithId(1))) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) // verify stage exists assert(scheduler.stageIdToStage.contains(0)) - assert(sparkListener.endedTasks.size == 2) - + assert(sparkListener.endedTasks.size === 2) // finish other 2 tasks runEvent(makeCompletionEvent( taskSets(0).tasks(2), Success, 42, @@ -1313,8 +1326,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi runEvent(makeCompletionEvent( taskSets(0).tasks(3), Success, 42, Seq.empty, createFakeTaskInfoWithId(3))) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(sparkListener.endedTasks.size == 4) + assert(sparkListener.endedTasks.size === 4) // verify the stage is done assert(!scheduler.stageIdToStage.contains(0)) @@ -1324,14 +1336,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi runEvent(makeCompletionEvent( taskSets(0).tasks(3), Success, 42, Seq.empty, createFakeTaskInfoWithId(5))) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(sparkListener.endedTasks.size == 5) // make sure non successful tasks also send out event runEvent(makeCompletionEvent( taskSets(0).tasks(3), UnknownReason, 42, Seq.empty, createFakeTaskInfoWithId(6))) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(sparkListener.endedTasks.size == 6) } @@ -1405,7 +1415,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // Listener bus should get told about the map stage failing, but not the reduce stage // (since the reduce stage hasn't been started yet). - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(sparkListener.failedStages.toSet === Set(0)) assertDataStructuresEmpty() @@ -1649,7 +1658,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(cancelledStages.toSet === Set(0, 2)) // Make sure the listeners got told about both failed stages. - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(sparkListener.successfulStages.isEmpty) assert(sparkListener.failedStages.toSet === Set(0, 2)) @@ -2607,7 +2615,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } // The map stage should have been submitted. - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedMapStageAttempts() === 1) // The first map task fails with TaskKilled. @@ -2625,7 +2632,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // Trigger resubmission of the failed map stage. runEvent(ResubmitFailedStages) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) // Another attempt for the map stage should have been submitted, resulting in 2 total attempts. assert(countSubmittedMapStageAttempts() === 2) @@ -2644,7 +2650,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } // The map stage should have been submitted. - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedMapStageAttempts() === 1) // The first map task fails with TaskKilled. @@ -2656,7 +2661,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // Trigger resubmission of the failed map stage. runEvent(ResubmitFailedStages) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) // Another attempt for the map stage should have been submitted, resulting in 2 total attempts. assert(countSubmittedMapStageAttempts() === 2) @@ -2669,7 +2673,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // The second map task failure doesn't trigger stage retry. runEvent(ResubmitFailedStages) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedMapStageAttempts() === 2) }