@@ -594,11 +594,17 @@ class DAGSchedulerSuite
594594 * @param stageId - The current stageId
595595 * @param attemptIdx - The current attempt count
596596 */
597- private def completeNextResultStageWithSuccess (stageId : Int , attemptIdx : Int ): Unit = {
597+ private def completeNextResultStageWithSuccess (
598+ stageId : Int ,
599+ attemptIdx : Int ,
600+ partitionToResult : Int => Int = _ => 42 ): Unit = {
598601 val stageAttempt = taskSets.last
599602 checkStageId(stageId, attemptIdx, stageAttempt)
600603 assert(scheduler.stageIdToStage(stageId).isInstanceOf [ResultStage ])
601- complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map(_ => (Success , 42 )).toSeq)
604+ val taskResults = stageAttempt.tasks.zipWithIndex.map { case (task, idx) =>
605+ (Success , partitionToResult(idx))
606+ }
607+ complete(stageAttempt, taskResults.toSeq)
602608 }
603609
604610 /**
@@ -1054,6 +1060,47 @@ class DAGSchedulerSuite
10541060 assertDataStructuresEmpty()
10551061 }
10561062
1063+ /**
1064+ * Run two jobs, with a shared dependency. We simulate a fetch failure in the second job, which
1065+ * requires regenerating some outputs of the shared dependency. One key aspect of this test is
1066+ * that the second job actually uses a different stage for the shared dependency (a "skipped"
1067+ * stage).
1068+ */
1069+ test(" shuffle fetch failure in a reused shuffle dependency" ) {
1070+ // Run the first job successfully, which creates one shuffle dependency
1071+
1072+ val shuffleMapRdd = new MyRDD (sc, 2 , Nil )
1073+ val shuffleDep = new ShuffleDependency (shuffleMapRdd, null )
1074+ val reduceRdd = new MyRDD (sc, 2 , List (shuffleDep))
1075+ submit(reduceRdd, Array (0 , 1 ))
1076+
1077+ completeShuffleMapStageSuccessfully(0 , 0 , 2 )
1078+ completeNextResultStageWithSuccess(1 , 0 )
1079+ assert(results === Map (0 -> 42 , 1 -> 42 ))
1080+ assertDataStructuresEmpty()
1081+
1082+ // submit another job w/ the shared dependency, and have a fetch failure
1083+ val reduce2 = new MyRDD (sc, 2 , List (shuffleDep))
1084+ submit(reduce2, Array (0 , 1 ))
1085+ // Note that the stage numbering here is only b/c the shared dependency produces a new, skipped
1086+ // stage. If instead it reused the existing stage, then this would be stage 2
1087+ completeNextStageWithFetchFailure(3 , 0 , shuffleDep)
1088+ scheduler.resubmitFailedStages()
1089+
1090+ // the scheduler now creates a new task set to regenerate the missing map output, but this time
1091+ // using a different stage, the "skipped" one
1092+
1093+ // SPARK-9809 -- this stage is submitted without a task for each partition (because some of
1094+ // the shuffle map output is still available from stage 0); make sure we've still got internal
1095+ // accumulators setup
1096+ assert(scheduler.stageIdToStage(2 ).internalAccumulators.nonEmpty)
1097+ completeShuffleMapStageSuccessfully(2 , 0 , 2 )
1098+ completeNextResultStageWithSuccess(3 , 1 , idx => idx + 1234 )
1099+ assert(results === Map (0 -> 1234 , 1 -> 1235 ))
1100+
1101+ assertDataStructuresEmpty()
1102+ }
1103+
10571104 /**
10581105 * This test runs a three stage job, with a fetch failure in stage 1. but during the retry, we
10591106 * have completions from both the first & second attempt of stage 1. So all the map output is
0 commit comments