From ed5005558d10c19e997be04166639fa9fe45c264 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Wed, 11 Sep 2019 10:24:57 -0700 Subject: [PATCH] [SPARK-26989][CORE][TEST] DAGSchedulerSuite: ensure listeners are fully processed before checking recorded values This patch ensures accessing recorded values in listener is always after letting listeners fully process all events. To ensure this, this patch adds new class to hide these values and access with methods which will ensure above condition. Without this guard, two threads are running concurrently - 1) listeners process thread 2) test main thread - and race condition would occur. That's why we also see very odd thing, error message saying condition is met but test failed: ``` - Barrier task failures from the same stage attempt don't trigger multiple stage retries *** FAILED *** ArrayBuffer(0) did not equal List(0) (DAGSchedulerSuite.scala:2656) ``` which means verification failed, and condition is met just before constructing error message. The guard is properly placed in many spots, but missed in some places. This patch enforces that it can't be missed. UT fails intermittently and this patch will address the flakyness. No Modified UT. Also made the flaky tests artificially failing via applying 50ms of sleep on each onXXX method. ![Screen Shot 2019-09-07 at 7 44 15 AM](https://user-images.githubusercontent.com/1317309/64465178-1747ad00-d146-11e9-92f6-f4ed4a1f4b08.png) I found 3 methods being failed. (They've marked as X. Just ignore ! as they failed on waiting listener in given timeout and these tests don't deal with these recorded values - it uses other timeout value 1000ms than 10000ms for this listener so affected via side-effect.) When I applied same in this patch all tests marked as X passed. Closes #25706 from HeartSaVioR/SPARK-26989. Authored-by: Jungtaek Lim (HeartSaVioR) Signed-off-by: Marcelo Vanzin --- .../spark/scheduler/DAGSchedulerSuite.scala | 109 +++++++++--------- 1 file changed, 56 insertions(+), 53 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index fb68f1b02b989..038f036c602c3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -162,32 +162,67 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi /** Length of time to wait while draining listener events. */ val WAIT_TIMEOUT_MILLIS = 10000 - val sparkListener = new SparkListener() { - val submittedStageInfos = new HashSet[StageInfo] - val successfulStages = new HashSet[Int] - val failedStages = new ArrayBuffer[Int] - val stageByOrderOfExecution = new ArrayBuffer[Int] - val endedTasks = new HashSet[Long] + + /** + * Listeners which records some information to verify in UTs. Getter-kind methods in this class + * ensures the value is returned after ensuring there's no event to process, as well as the + * value is immutable: prevent showing odd result by race condition. + */ + class EventInfoRecordingListener extends SparkListener { + private val _submittedStageInfos = new HashSet[StageInfo] + private val _successfulStages = new HashSet[Int] + private val _failedStages = new ArrayBuffer[Int] + private val _stageByOrderOfExecution = new ArrayBuffer[Int] + private val _endedTasks = new HashSet[Long] override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { - submittedStageInfos += stageSubmitted.stageInfo + _submittedStageInfos += stageSubmitted.stageInfo } override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { val stageInfo = stageCompleted.stageInfo - stageByOrderOfExecution += stageInfo.stageId + _stageByOrderOfExecution += stageInfo.stageId if (stageInfo.failureReason.isEmpty) { - successfulStages += stageInfo.stageId + _successfulStages += stageInfo.stageId } else { - failedStages += stageInfo.stageId + _failedStages += stageInfo.stageId } } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { - endedTasks += taskEnd.taskInfo.taskId + _endedTasks += taskEnd.taskInfo.taskId + } + + def submittedStageInfos: Set[StageInfo] = { + waitForListeners() + _submittedStageInfos.toSet + } + + def successfulStages: Set[Int] = { + waitForListeners() + _successfulStages.toSet + } + + def failedStages: List[Int] = { + waitForListeners() + _failedStages.toList + } + + def stageByOrderOfExecution: List[Int] = { + waitForListeners() + _stageByOrderOfExecution.toList + } + + def endedTasks: Set[Long] = { + waitForListeners() + _endedTasks.toSet } + + private def waitForListeners(): Unit = sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) } + var sparkListener: EventInfoRecordingListener = null + var mapOutputTracker: MapOutputTrackerMaster = null var broadcastManager: BroadcastManager = null var securityMgr: SecurityManager = null @@ -236,10 +271,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi private def init(testConf: SparkConf): Unit = { sc = new SparkContext("local[2]", "DAGSchedulerSuite", testConf) - sparkListener.submittedStageInfos.clear() - sparkListener.successfulStages.clear() - sparkListener.failedStages.clear() - sparkListener.endedTasks.clear() + sparkListener = new EventInfoRecordingListener failure = null sc.addSparkListener(sparkListener) taskSets.clear() @@ -361,11 +393,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } test("[SPARK-3353] parent stage should have lower stage id") { - sparkListener.stageByOrderOfExecution.clear() sc.parallelize(1 to 10).map(x => (x, x)).reduceByKey(_ + _, 4).count() - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(sparkListener.stageByOrderOfExecution.length === 2) - assert(sparkListener.stageByOrderOfExecution(0) < sparkListener.stageByOrderOfExecution(1)) + val stageByOrderOfExecution = sparkListener.stageByOrderOfExecution + assert(stageByOrderOfExecution.length === 2) + assert(stageByOrderOfExecution(0) < stageByOrderOfExecution(1)) } /** @@ -606,9 +637,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi submit(unserializableRdd, Array(0)) assert(failure.getMessage.startsWith( "Job aborted due to stage failure: Task not serializable:")) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(sparkListener.failedStages.contains(0)) - assert(sparkListener.failedStages.size === 1) + assert(sparkListener.failedStages === Seq(0)) assertDataStructuresEmpty() } @@ -616,9 +645,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi submit(new MyRDD(sc, 1, Nil), Array(0)) failed(taskSets(0), "some failure") assert(failure.getMessage === "Job aborted due to stage failure: some failure") - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(sparkListener.failedStages.contains(0)) - assert(sparkListener.failedStages.size === 1) + assert(sparkListener.failedStages === Seq(0)) assertDataStructuresEmpty() } @@ -627,9 +654,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi val jobId = submit(rdd, Array(0)) cancel(jobId) assert(failure.getMessage === s"Job $jobId cancelled ") - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(sparkListener.failedStages.contains(0)) - assert(sparkListener.failedStages.size === 1) + assert(sparkListener.failedStages === Seq(0)) assertDataStructuresEmpty() } @@ -683,7 +708,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(results === Map(0 -> 42)) assertDataStructuresEmpty() - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(sparkListener.failedStages.isEmpty) assert(sparkListener.successfulStages.contains(0)) } @@ -1068,7 +1092,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi taskSets(1).tasks(0), FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null)) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(sparkListener.failedStages.contains(1)) // The second ResultTask fails, with a fetch failure for the output from the second mapper. @@ -1077,8 +1100,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1, "ignored"), null)) // The SparkListener should not receive redundant failure events. - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(sparkListener.failedStages.size == 1) + assert(sparkListener.failedStages.size === 1) } test("Retry all the tasks on a resubmitted attempt of a barrier stage caused by FetchFailure") { @@ -1183,7 +1205,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } // The map stage should have been submitted. - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedMapStageAttempts() === 1) complete(taskSets(0), Seq( @@ -1200,12 +1221,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi taskSets(1).tasks(0), FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null)) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(sparkListener.failedStages.contains(1)) // Trigger resubmission of the failed map stage. runEvent(ResubmitFailedStages) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) // Another attempt for the map stage should have been submitted, resulting in 2 total attempts. assert(countSubmittedMapStageAttempts() === 2) @@ -1222,7 +1241,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // shouldn't effect anything -- our calling it just makes *SURE* it gets called between the // desired event and our check. runEvent(ResubmitFailedStages) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedMapStageAttempts() === 2) } @@ -1247,7 +1265,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } // The map stage should have been submitted. - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedMapStageAttempts() === 1) // Complete the map stage. @@ -1256,7 +1273,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi (Success, makeMapStatus("hostB", 2)))) // The reduce stage should have been submitted. - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedReduceStageAttempts() === 1) // The first result task fails, with a fetch failure for the output from the first mapper. @@ -1271,7 +1287,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // Because the map stage finished, another attempt for the reduce stage should have been // submitted, resulting in 2 total attempts for each the map and the reduce stage. - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedMapStageAttempts() === 2) assert(countSubmittedReduceStageAttempts() === 2) @@ -1301,11 +1316,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi runEvent(makeCompletionEvent( taskSets(0).tasks(1), Success, 42, Seq.empty, createFakeTaskInfoWithId(1))) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) // verify stage exists assert(scheduler.stageIdToStage.contains(0)) - assert(sparkListener.endedTasks.size == 2) - + assert(sparkListener.endedTasks.size === 2) // finish other 2 tasks runEvent(makeCompletionEvent( taskSets(0).tasks(2), Success, 42, @@ -1313,8 +1326,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi runEvent(makeCompletionEvent( taskSets(0).tasks(3), Success, 42, Seq.empty, createFakeTaskInfoWithId(3))) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(sparkListener.endedTasks.size == 4) + assert(sparkListener.endedTasks.size === 4) // verify the stage is done assert(!scheduler.stageIdToStage.contains(0)) @@ -1324,14 +1336,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi runEvent(makeCompletionEvent( taskSets(0).tasks(3), Success, 42, Seq.empty, createFakeTaskInfoWithId(5))) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(sparkListener.endedTasks.size == 5) // make sure non successful tasks also send out event runEvent(makeCompletionEvent( taskSets(0).tasks(3), UnknownReason, 42, Seq.empty, createFakeTaskInfoWithId(6))) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(sparkListener.endedTasks.size == 6) } @@ -1405,7 +1415,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // Listener bus should get told about the map stage failing, but not the reduce stage // (since the reduce stage hasn't been started yet). - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(sparkListener.failedStages.toSet === Set(0)) assertDataStructuresEmpty() @@ -1649,7 +1658,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(cancelledStages.toSet === Set(0, 2)) // Make sure the listeners got told about both failed stages. - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(sparkListener.successfulStages.isEmpty) assert(sparkListener.failedStages.toSet === Set(0, 2)) @@ -2607,7 +2615,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } // The map stage should have been submitted. - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedMapStageAttempts() === 1) // The first map task fails with TaskKilled. @@ -2625,7 +2632,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // Trigger resubmission of the failed map stage. runEvent(ResubmitFailedStages) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) // Another attempt for the map stage should have been submitted, resulting in 2 total attempts. assert(countSubmittedMapStageAttempts() === 2) @@ -2644,7 +2650,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } // The map stage should have been submitted. - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedMapStageAttempts() === 1) // The first map task fails with TaskKilled. @@ -2656,7 +2661,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // Trigger resubmission of the failed map stage. runEvent(ResubmitFailedStages) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) // Another attempt for the map stage should have been submitted, resulting in 2 total attempts. assert(countSubmittedMapStageAttempts() === 2) @@ -2669,7 +2673,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // The second map task failure doesn't trigger stage retry. runEvent(ResubmitFailedStages) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedMapStageAttempts() === 2) }