@@ -26,11 +26,11 @@ import org.scalatest.concurrent.Timeouts
2626import org .scalatest .time .SpanSugar ._
2727
2828import org .apache .spark ._
29+ import org .apache .spark .executor .TaskMetrics
2930import org .apache .spark .rdd .RDD
3031import org .apache .spark .scheduler .SchedulingMode .SchedulingMode
3132import org .apache .spark .storage .{BlockId , BlockManagerId , BlockManagerMaster }
3233import org .apache .spark .util .CallSite
33- import org .apache .spark .executor .TaskMetrics
3434
3535class DAGSchedulerEventProcessLoopTester (dagScheduler : DAGScheduler )
3636 extends DAGSchedulerEventProcessLoop (dagScheduler) {
@@ -473,6 +473,282 @@ class DAGSchedulerSuite
473473 assertDataStructuresEmpty()
474474 }
475475
476+
477+ // Helper function to validate state when creating tests for task failures
478+ private def checkStageId (stageId : Int , attempt : Int , stageAttempt : TaskSet ) {
479+ assert(stageAttempt.stageId === stageId)
480+ assert(stageAttempt.stageAttemptId == attempt)
481+ }
482+
483+
484+ // Helper functions to extract commonly used code in Fetch Failure test cases
485+ private def setupStageAbortTest (sc : SparkContext ) {
486+ sc.listenerBus.addListener(new EndListener ())
487+ ended = false
488+ jobResult = null
489+ }
490+
491+ // Create a new Listener to confirm that the listenerBus sees the JobEnd message
492+ // when we abort the stage. This message will also be consumed by the EventLoggingListener
493+ // so this will propagate up to the user.
494+ var ended = false
495+ var jobResult : JobResult = null
496+
497+ class EndListener extends SparkListener {
498+ override def onJobEnd (jobEnd : SparkListenerJobEnd ): Unit = {
499+ jobResult = jobEnd.jobResult
500+ ended = true
501+ }
502+ }
503+
504+ /**
505+ * Common code to get the next stage attempt, confirm it's the one we expect, and complete it
506+ * successfully.
507+ *
508+ * @param stageId - The current stageId
509+ * @param attemptIdx - The current attempt count
510+ * @param numShufflePartitions - The number of partitions in the next stage
511+ */
512+ private def completeShuffleMapStageSuccessfully (
513+ stageId : Int ,
514+ attemptIdx : Int ,
515+ numShufflePartitions : Int ): Unit = {
516+ val stageAttempt = taskSets.last
517+ checkStageId(stageId, attemptIdx, stageAttempt)
518+ complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map {
519+ case (task, idx) =>
520+ (Success , makeMapStatus(" host" + ('A' + idx).toChar, numShufflePartitions))
521+ }.toSeq)
522+ }
523+
524+ /**
525+ * Common code to get the next stage attempt, confirm it's the one we expect, and complete it
526+ * with all FetchFailure.
527+ *
528+ * @param stageId - The current stageId
529+ * @param attemptIdx - The current attempt count
530+ * @param shuffleDep - The shuffle dependency of the stage with a fetch failure
531+ */
532+ private def completeNextStageWithFetchFailure (
533+ stageId : Int ,
534+ attemptIdx : Int ,
535+ shuffleDep : ShuffleDependency [_, _, _]): Unit = {
536+ val stageAttempt = taskSets.last
537+ checkStageId(stageId, attemptIdx, stageAttempt)
538+ complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map { case (task, idx) =>
539+ (FetchFailed (makeBlockManagerId(" hostA" ), shuffleDep.shuffleId, 0 , idx, " ignored" ), null )
540+ }.toSeq)
541+ }
542+
543+ /**
544+ * Common code to get the next result stage attempt, confirm it's the one we expect, and
545+ * complete it with a success where we return 42.
546+ *
547+ * @param stageId - The current stageId
548+ * @param attemptIdx - The current attempt count
549+ */
550+ private def completeNextResultStageWithSuccess (stageId : Int , attemptIdx : Int ): Unit = {
551+ val stageAttempt = taskSets.last
552+ checkStageId(stageId, attemptIdx, stageAttempt)
553+ assert(scheduler.stageIdToStage(stageId).isInstanceOf [ResultStage ])
554+ complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map(_ => (Success , 42 )).toSeq)
555+ }
556+
557+ /**
558+ * In this test, we simulate a job where many tasks in the same stage fail. We want to show
559+ * that many fetch failures inside a single stage attempt do not trigger an abort
560+ * on their own, but only when there are enough failing stage attempts.
561+ */
562+ test(" Single stage fetch failure should not abort the stage." ) {
563+ setupStageAbortTest(sc)
564+
565+ val parts = 8
566+ val shuffleMapRdd = new MyRDD (sc, parts, Nil )
567+ val shuffleDep = new ShuffleDependency (shuffleMapRdd, null )
568+ val shuffleId = shuffleDep.shuffleId
569+ val reduceRdd = new MyRDD (sc, parts, List (shuffleDep))
570+ submit(reduceRdd, (0 until parts).toArray)
571+
572+ completeShuffleMapStageSuccessfully(0 , 0 , numShufflePartitions = parts)
573+
574+ completeNextStageWithFetchFailure(1 , 0 , shuffleDep)
575+
576+ // Resubmit and confirm that now all is well
577+ scheduler.resubmitFailedStages()
578+
579+ assert(scheduler.runningStages.nonEmpty)
580+ assert(! ended)
581+
582+ // Complete stage 0 and then stage 1 with a "42"
583+ completeShuffleMapStageSuccessfully(0 , 1 , numShufflePartitions = parts)
584+ completeNextResultStageWithSuccess(1 , 1 )
585+
586+ // Confirm job finished succesfully
587+ sc.listenerBus.waitUntilEmpty(1000 )
588+ assert(ended === true )
589+ assert(results === (0 until parts).map { idx => idx -> 42 }.toMap)
590+ assertDataStructuresEmpty()
591+ }
592+
593+ /**
594+ * In this test we simulate a job failure where the first stage completes successfully and
595+ * the second stage fails due to a fetch failure. Multiple successive fetch failures of a stage
596+ * trigger an overall job abort to avoid endless retries.
597+ */
598+ test(" Multiple consecutive stage fetch failures should lead to job being aborted." ) {
599+ setupStageAbortTest(sc)
600+
601+ val shuffleMapRdd = new MyRDD (sc, 2 , Nil )
602+ val shuffleDep = new ShuffleDependency (shuffleMapRdd, null )
603+ val shuffleId = shuffleDep.shuffleId
604+ val reduceRdd = new MyRDD (sc, 2 , List (shuffleDep))
605+ submit(reduceRdd, Array (0 , 1 ))
606+
607+ for (attempt <- 0 until Stage .MAX_CONSECUTIVE_FETCH_FAILURES ) {
608+ // Complete all the tasks for the current attempt of stage 0 successfully
609+ completeShuffleMapStageSuccessfully(0 , attempt, numShufflePartitions = 2 )
610+
611+ // Now we should have a new taskSet, for a new attempt of stage 1.
612+ // Fail all these tasks with FetchFailure
613+ completeNextStageWithFetchFailure(1 , attempt, shuffleDep)
614+
615+ // this will trigger a resubmission of stage 0, since we've lost some of its
616+ // map output, for the next iteration through the loop
617+ scheduler.resubmitFailedStages()
618+
619+ if (attempt < Stage .MAX_CONSECUTIVE_FETCH_FAILURES - 1 ) {
620+ assert(scheduler.runningStages.nonEmpty)
621+ assert(! ended)
622+ } else {
623+ // Stage should have been aborted and removed from running stages
624+ assertDataStructuresEmpty()
625+ sc.listenerBus.waitUntilEmpty(1000 )
626+ assert(ended)
627+ jobResult match {
628+ case JobFailed (reason) =>
629+ assert(reason.getMessage.contains(" ResultStage 1 () has failed the maximum" ))
630+ case other => fail(s " expected JobFailed, not $other" )
631+ }
632+ }
633+ }
634+ }
635+
636+ /**
637+ * In this test, we create a job with two consecutive shuffles, and simulate 2 failures for each
638+ * shuffle fetch. In total In total, the job has had four failures overall but not four failures
639+ * for a particular stage, and as such should not be aborted.
640+ */
641+ test(" Failures in different stages should not trigger an overall abort" ) {
642+ setupStageAbortTest(sc)
643+
644+ val shuffleOneRdd = new MyRDD (sc, 2 , Nil ).cache()
645+ val shuffleDepOne = new ShuffleDependency (shuffleOneRdd, null )
646+ val shuffleTwoRdd = new MyRDD (sc, 2 , List (shuffleDepOne)).cache()
647+ val shuffleDepTwo = new ShuffleDependency (shuffleTwoRdd, null )
648+ val finalRdd = new MyRDD (sc, 1 , List (shuffleDepTwo))
649+ submit(finalRdd, Array (0 ))
650+
651+ // In the first two iterations, Stage 0 succeeds and stage 1 fails. In the next two iterations,
652+ // stage 2 fails.
653+ for (attempt <- 0 until Stage .MAX_CONSECUTIVE_FETCH_FAILURES ) {
654+ // Complete all the tasks for the current attempt of stage 0 successfully
655+ completeShuffleMapStageSuccessfully(0 , attempt, numShufflePartitions = 2 )
656+
657+ if (attempt < Stage .MAX_CONSECUTIVE_FETCH_FAILURES / 2 ) {
658+ // Now we should have a new taskSet, for a new attempt of stage 1.
659+ // Fail all these tasks with FetchFailure
660+ completeNextStageWithFetchFailure(1 , attempt, shuffleDepOne)
661+ } else {
662+ completeShuffleMapStageSuccessfully(1 , attempt, numShufflePartitions = 1 )
663+
664+ // Fail stage 2
665+ completeNextStageWithFetchFailure(2 , attempt - Stage .MAX_CONSECUTIVE_FETCH_FAILURES / 2 ,
666+ shuffleDepTwo)
667+ }
668+
669+ // this will trigger a resubmission of stage 0, since we've lost some of its
670+ // map output, for the next iteration through the loop
671+ scheduler.resubmitFailedStages()
672+ }
673+
674+ completeShuffleMapStageSuccessfully(0 , 4 , numShufflePartitions = 2 )
675+ completeShuffleMapStageSuccessfully(1 , 4 , numShufflePartitions = 1 )
676+
677+ // Succeed stage2 with a "42"
678+ completeNextResultStageWithSuccess(2 , Stage .MAX_CONSECUTIVE_FETCH_FAILURES / 2 )
679+
680+ assert(results === Map (0 -> 42 ))
681+ assertDataStructuresEmpty()
682+ }
683+
684+ /**
685+ * In this test we demonstrate that only consecutive failures trigger a stage abort. A stage may
686+ * fail multiple times, succeed, then fail a few more times (because its run again by downstream
687+ * dependencies). The total number of failed attempts for one stage will go over the limit,
688+ * but that doesn't matter, since they have successes in the middle.
689+ */
690+ test(" Non-consecutive stage failures don't trigger abort" ) {
691+ setupStageAbortTest(sc)
692+
693+ val shuffleOneRdd = new MyRDD (sc, 2 , Nil ).cache()
694+ val shuffleDepOne = new ShuffleDependency (shuffleOneRdd, null )
695+ val shuffleTwoRdd = new MyRDD (sc, 2 , List (shuffleDepOne)).cache()
696+ val shuffleDepTwo = new ShuffleDependency (shuffleTwoRdd, null )
697+ val finalRdd = new MyRDD (sc, 1 , List (shuffleDepTwo))
698+ submit(finalRdd, Array (0 ))
699+
700+ // First, execute stages 0 and 1, failing stage 1 up to MAX-1 times.
701+ for (attempt <- 0 until Stage .MAX_CONSECUTIVE_FETCH_FAILURES - 1 ) {
702+ // Make each task in stage 0 success
703+ completeShuffleMapStageSuccessfully(0 , attempt, numShufflePartitions = 2 )
704+
705+ // Now we should have a new taskSet, for a new attempt of stage 1.
706+ // Fail these tasks with FetchFailure
707+ completeNextStageWithFetchFailure(1 , attempt, shuffleDepOne)
708+
709+ scheduler.resubmitFailedStages()
710+
711+ // Confirm we have not yet aborted
712+ assert(scheduler.runningStages.nonEmpty)
713+ assert(! ended)
714+ }
715+
716+ // Rerun stage 0 and 1 to step through the task set
717+ completeShuffleMapStageSuccessfully(0 , 3 , numShufflePartitions = 2 )
718+ completeShuffleMapStageSuccessfully(1 , 3 , numShufflePartitions = 1 )
719+
720+ // Fail stage 2 so that stage 1 is resubmitted when we call scheduler.resubmitFailedStages()
721+ completeNextStageWithFetchFailure(2 , 0 , shuffleDepTwo)
722+
723+ scheduler.resubmitFailedStages()
724+
725+ // Rerun stage 0 to step through the task set
726+ completeShuffleMapStageSuccessfully(0 , 4 , numShufflePartitions = 2 )
727+
728+ // Now again, fail stage 1 (up to MAX_FAILURES) but confirm that this doesn't trigger an abort
729+ // since we succeeded in between.
730+ completeNextStageWithFetchFailure(1 , 4 , shuffleDepOne)
731+
732+ scheduler.resubmitFailedStages()
733+
734+ // Confirm we have not yet aborted
735+ assert(scheduler.runningStages.nonEmpty)
736+ assert(! ended)
737+
738+ // Next, succeed all and confirm output
739+ // Rerun stage 0 + 1
740+ completeShuffleMapStageSuccessfully(0 , 5 , numShufflePartitions = 2 )
741+ completeShuffleMapStageSuccessfully(1 , 5 , numShufflePartitions = 1 )
742+
743+ // Succeed stage 2 and verify results
744+ completeNextResultStageWithSuccess(2 , 1 )
745+
746+ assertDataStructuresEmpty()
747+ sc.listenerBus.waitUntilEmpty(1000 )
748+ assert(ended === true )
749+ assert(results === Map (0 -> 42 ))
750+ }
751+
476752 test(" trivial shuffle with multiple fetch failures" ) {
477753 val shuffleMapRdd = new MyRDD (sc, 2 , Nil )
478754 val shuffleDep = new ShuffleDependency (shuffleMapRdd, null )
@@ -810,15 +1086,15 @@ class DAGSchedulerSuite
8101086 submit(finalRdd, Array (0 ))
8111087 cacheLocations(shuffleTwoRdd.id -> 0 ) = Seq (makeBlockManagerId(" hostD" ))
8121088 cacheLocations(shuffleTwoRdd.id -> 1 ) = Seq (makeBlockManagerId(" hostC" ))
813- // complete stage 2
1089+ // complete stage 0
8141090 complete(taskSets(0 ), Seq (
8151091 (Success , makeMapStatus(" hostA" , 2 )),
8161092 (Success , makeMapStatus(" hostB" , 2 ))))
8171093 // complete stage 1
8181094 complete(taskSets(1 ), Seq (
8191095 (Success , makeMapStatus(" hostA" , 1 )),
8201096 (Success , makeMapStatus(" hostB" , 1 ))))
821- // pretend stage 0 failed because hostA went down
1097+ // pretend stage 2 failed because hostA went down
8221098 complete(taskSets(2 ), Seq (
8231099 (FetchFailed (makeBlockManagerId(" hostA" ), shuffleDepTwo.shuffleId, 0 , 0 , " ignored" ), null )))
8241100 // TODO assert this:
0 commit comments