Skip to content

Commit 08bba60

Browse files
committed
fix corner case for aborting indeterminate stage
1 parent a5df5ff commit 08bba60

File tree

2 files changed

+32
-24
lines changed

2 files changed

+32
-24
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1571,13 +1571,13 @@ private[spark] class DAGScheduler(
15711571
// guaranteed to be determinate, so the input data of the reducers will not change
15721572
// even if the map tasks are re-tried.
15731573
if (mapStage.rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE) {
1574-
// It's a little tricky to find all the succeeding stages of `failedStage`, because
1574+
// It's a little tricky to find all the succeeding stages of `mapStage`, because
15751575
// each stage only know its parents not children. Here we traverse the stages from
15761576
// the leaf nodes (the result stages of active jobs), and rollback all the stages
1577-
// in the stage chains that connect to the `failedStage`. To speed up the stage
1577+
// in the stage chains that connect to the `mapStage`. To speed up the stage
15781578
// traversing, we collect the stages to rollback first. If a stage needs to
15791579
// rollback, all its succeeding stages need to rollback to.
1580-
val stagesToRollback = HashSet(failedStage)
1580+
val stagesToRollback = HashSet[Stage](mapStage)
15811581

15821582
def collectStagesToRollback(stageChain: List[Stage]): Unit = {
15831583
if (stagesToRollback.contains(stageChain.head)) {

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2741,27 +2741,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
27412741
FetchFailed(makeBlockManagerId("hostC"), shuffleId2, 0, 0, "ignored"),
27422742
null))
27432743

2744-
val failedStages = scheduler.failedStages.toSeq
2745-
assert(failedStages.length == 2)
2746-
// Shuffle blocks of "hostC" is lost, so first task of the `shuffleMapRdd2` needs to retry.
2747-
assert(failedStages.collect {
2748-
case stage: ShuffleMapStage if stage.shuffleDep.shuffleId == shuffleId2 => stage
2749-
}.head.findMissingPartitions() == Seq(0))
2750-
// The result stage is still waiting for its 2 tasks to complete
2751-
assert(failedStages.collect {
2752-
case stage: ResultStage => stage
2753-
}.head.findMissingPartitions() == Seq(0, 1))
2754-
2755-
scheduler.resubmitFailedStages()
2756-
2757-
// The first task of the `shuffleMapRdd2` failed with fetch failure
2758-
runEvent(makeCompletionEvent(
2759-
taskSets(3).tasks(0),
2760-
FetchFailed(makeBlockManagerId("hostA"), shuffleId1, 0, 0, "ignored"),
2761-
null))
2762-
2763-
// The job should fail because Spark can't rollback the shuffle map stage.
2764-
assert(failure != null && failure.getMessage.contains("Spark cannot rollback"))
2744+
assert(failure != null && failure.getMessage
2745+
.contains("Spark cannot rollback the ShuffleMapStage 1"))
27652746
}
27662747

27672748
private def assertResultStageFailToRollback(mapRdd: MyRDD): Unit = {
@@ -2872,6 +2853,33 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
28722853
assert(latch.await(10, TimeUnit.SECONDS))
28732854
}
28742855

2856+
test("SPARK-28699: abort stage for parent stage is indeterminate stage") {
2857+
val shuffleMapRdd = new MyRDD(sc, 2, Nil, indeterminate = true)
2858+
2859+
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
2860+
val shuffleId = shuffleDep.shuffleId
2861+
val finalRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
2862+
2863+
submit(finalRdd, Array(0, 1))
2864+
2865+
// Finish the first shuffle map stage.
2866+
complete(taskSets(0), Seq(
2867+
(Success, makeMapStatus("hostA", 2)),
2868+
(Success, makeMapStatus("hostB", 2))))
2869+
assert(mapOutputTracker.findMissingPartitions(shuffleId) === Some(Seq.empty))
2870+
2871+
runEvent(makeCompletionEvent(
2872+
taskSets(1).tasks(0),
2873+
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
2874+
null))
2875+
2876+
// Shuffle blocks of "hostA" is lost, so first task of the `shuffleMapRdd` needs to retry.
2877+
// The result stage is still waiting for its 2 tasks to complete.
2878+
// Because of shuffleMapRdd is indeterminate, this job will be abort.
2879+
assert(failure != null && failure.getMessage
2880+
.contains("Spark cannot rollback the ShuffleMapStage 0"))
2881+
}
2882+
28752883
test("Completions in zombie tasksets update status of non-zombie taskset") {
28762884
val parts = 4
28772885
val shuffleMapRdd = new MyRDD(sc, parts, Nil)

0 commit comments

Comments
 (0)