Skip to content

Commit 0d3a783

Browse files
xuanyuankingcloud-fan
authored andcommitted
[SPARK-28699][CORE] Fix a corner case for aborting indeterminate stage
### What changes were proposed in this pull request? Change the logic of collecting the indeterminate stage, we should look at stages from mapStage, not failedStage during handle FetchFailed. ### Why are the changes needed? In the fetch failed error handle logic, the original logic of collecting indeterminate stage from the fetch failed stage. And in the scenario of the fetch failed happened in the first task of this stage, this logic will cause the indeterminate stage to resubmit partially. Eventually, we are capable of getting correctness bug. ### Does this PR introduce any user-facing change? It makes the corner case of indeterminate stage abort as expected. ### How was this patch tested? New UT in DAGSchedulerSuite. Run below integrated test with `local-cluster[5, 2, 5120]`, and set `spark.sql.execution.sortBeforeRepartition`=false, it will abort the indeterminate stage as expected: ``` import scala.sys.process._ import org.apache.spark.TaskContext val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)} // kill an executor in the stage that performs repartition(239) val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) { throw new Exception("pkill -f -n java".!!) } x } val r2 = df.distinct.count() ``` Closes #25498 from xuanyuanking/SPARK-28699-followup. Authored-by: Yuanjian Li <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent a787bc2 commit 0d3a783

File tree

2 files changed

+35
-24
lines changed

2 files changed

+35
-24
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1571,13 +1571,13 @@ private[spark] class DAGScheduler(
15711571
// guaranteed to be determinate, so the input data of the reducers will not change
15721572
// even if the map tasks are re-tried.
15731573
if (mapStage.rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE) {
1574-
// It's a little tricky to find all the succeeding stages of `failedStage`, because
1574+
// It's a little tricky to find all the succeeding stages of `mapStage`, because
15751575
// each stage only know its parents not children. Here we traverse the stages from
15761576
// the leaf nodes (the result stages of active jobs), and rollback all the stages
1577-
// in the stage chains that connect to the `failedStage`. To speed up the stage
1577+
// in the stage chains that connect to the `mapStage`. To speed up the stage
15781578
// traversing, we collect the stages to rollback first. If a stage needs to
15791579
// rollback, all its succeeding stages need to rollback to.
1580-
val stagesToRollback = HashSet(failedStage)
1580+
val stagesToRollback = HashSet[Stage](mapStage)
15811581

15821582
def collectStagesToRollback(stageChain: List[Stage]): Unit = {
15831583
if (stagesToRollback.contains(stageChain.head)) {

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2741,27 +2741,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
27412741
FetchFailed(makeBlockManagerId("hostC"), shuffleId2, 0, 0, "ignored"),
27422742
null))
27432743

2744-
val failedStages = scheduler.failedStages.toSeq
2745-
assert(failedStages.length == 2)
2746-
// Shuffle blocks of "hostC" is lost, so first task of the `shuffleMapRdd2` needs to retry.
2747-
assert(failedStages.collect {
2748-
case stage: ShuffleMapStage if stage.shuffleDep.shuffleId == shuffleId2 => stage
2749-
}.head.findMissingPartitions() == Seq(0))
2750-
// The result stage is still waiting for its 2 tasks to complete
2751-
assert(failedStages.collect {
2752-
case stage: ResultStage => stage
2753-
}.head.findMissingPartitions() == Seq(0, 1))
2754-
2755-
scheduler.resubmitFailedStages()
2756-
2757-
// The first task of the `shuffleMapRdd2` failed with fetch failure
2758-
runEvent(makeCompletionEvent(
2759-
taskSets(3).tasks(0),
2760-
FetchFailed(makeBlockManagerId("hostA"), shuffleId1, 0, 0, "ignored"),
2761-
null))
2762-
2763-
// The job should fail because Spark can't rollback the shuffle map stage.
2764-
assert(failure != null && failure.getMessage.contains("Spark cannot rollback"))
2744+
// The second shuffle map stage need to rerun, the job will abort for the indeterminate
2745+
// stage rerun.
2746+
// TODO: After we support re-generate shuffle file(SPARK-25341), this test will be extended.
2747+
assert(failure != null && failure.getMessage
2748+
.contains("Spark cannot rollback the ShuffleMapStage 1"))
27652749
}
27662750

27672751
private def assertResultStageFailToRollback(mapRdd: MyRDD): Unit = {
@@ -2872,6 +2856,33 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
28722856
assert(latch.await(10, TimeUnit.SECONDS))
28732857
}
28742858

2859+
test("SPARK-28699: abort stage if parent stage is indeterminate stage") {
2860+
val shuffleMapRdd = new MyRDD(sc, 2, Nil, indeterminate = true)
2861+
2862+
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
2863+
val shuffleId = shuffleDep.shuffleId
2864+
val finalRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
2865+
2866+
submit(finalRdd, Array(0, 1))
2867+
2868+
// Finish the first shuffle map stage.
2869+
complete(taskSets(0), Seq(
2870+
(Success, makeMapStatus("hostA", 2)),
2871+
(Success, makeMapStatus("hostB", 2))))
2872+
assert(mapOutputTracker.findMissingPartitions(shuffleId) === Some(Seq.empty))
2873+
2874+
runEvent(makeCompletionEvent(
2875+
taskSets(1).tasks(0),
2876+
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
2877+
null))
2878+
2879+
// Shuffle blocks of "hostA" is lost, so first task of the `shuffleMapRdd` needs to retry.
2880+
// The result stage is still waiting for its 2 tasks to complete.
2881+
// Because of shuffleMapRdd is indeterminate, this job will be abort.
2882+
assert(failure != null && failure.getMessage
2883+
.contains("Spark cannot rollback the ShuffleMapStage 0"))
2884+
}
2885+
28752886
test("Completions in zombie tasksets update status of non-zombie taskset") {
28762887
val parts = 4
28772888
val shuffleMapRdd = new MyRDD(sc, parts, Nil)

0 commit comments

Comments
 (0)