From 818379b2f100b41973737ac3931922294a035f5a Mon Sep 17 00:00:00 2001 From: Liupengcheng Date: Wed, 16 Jan 2019 16:17:40 +0800 Subject: [PATCH 1/2] Do not allow task of FetchFailureStage commit in OutputCommitCoordinator --- .../apache/spark/scheduler/DAGScheduler.scala | 6 ++-- .../scheduler/OutputCommitCoordinator.scala | 13 +++++--- .../OutputCommitCoordinatorSuite.scala | 30 +++++++++++++++---- 3 files changed, 37 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index f6ade180ee25..74440c1620b7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1101,10 +1101,12 @@ private[spark] class DAGScheduler( // event. stage match { case s: ShuffleMapStage => - outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1) + outputCommitCoordinator.stageStart( + stage = s.id, stage.latestInfo.attemptNumber(), maxPartitionId = s.numPartitions - 1) case s: ResultStage => outputCommitCoordinator.stageStart( - stage = s.id, maxPartitionId = s.rdd.partitions.length - 1) + stage = s.id, stage.latestInfo.attemptNumber(), + maxPartitionId = s.rdd.partitions.length - 1) } val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try { stage match { diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index b382d623806e..a8bb9b4eaaee 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -58,6 +58,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) private case class StageState(numPartitions: Int) { val authorizedCommitters = Array.fill[TaskIdentifier](numPartitions)(null) val failures = mutable.Map[Int, mutable.Set[TaskIdentifier]]() + var latestStageAttempt: Int = -1 } /** @@ -114,13 +115,16 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) * yet been initialized. * * @param stage the stage id. + * @param stageAttemptNumber the stage attempt number. * @param maxPartitionId the maximum partition id that could appear in this stage's tasks (i.e. * the maximum possible value of `context.partitionId`). */ - private[scheduler] def stageStart(stage: Int, maxPartitionId: Int): Unit = synchronized { + private[scheduler] def stageStart( + stage: Int, stageAttemptNumber: Int, maxPartitionId: Int): Unit = synchronized { stageStates.get(stage) match { case Some(state) => require(state.authorizedCommitters.length == maxPartitionId + 1) + state.latestStageAttempt = stageAttemptNumber logInfo(s"Reusing state from previous attempt of stage $stage.") case _ => @@ -177,7 +181,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) partition: Int, attemptNumber: Int): Boolean = synchronized { stageStates.get(stage) match { - case Some(state) if attemptFailed(state, stageAttempt, partition, attemptNumber) => + case Some(state) if attemptInvalidOrFailed(state, stageAttempt, partition, attemptNumber) => logInfo(s"Commit denied for stage=$stage.$stageAttempt, partition=$partition: " + s"task attempt $attemptNumber already marked as failed.") false @@ -200,13 +204,14 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) } } - private def attemptFailed( + private def attemptInvalidOrFailed( stageState: StageState, stageAttempt: Int, partition: Int, attempt: Int): Boolean = synchronized { val failInfo = TaskIdentifier(stageAttempt, attempt) - stageState.failures.get(partition).exists(_.contains(failInfo)) + stageAttempt < stageState.latestStageAttempt || + stageState.failures.get(partition).exists(_.contains(failInfo)) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index a560013dba96..f6d3d2b6a23e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -176,7 +176,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { val partition: Int = 2 val authorizedCommitter: Int = 3 val nonAuthorizedCommitter: Int = 100 - outputCommitCoordinator.stageStart(stage, maxPartitionId = 2) + outputCommitCoordinator.stageStart(stage, stageAttempt, maxPartitionId = 2) assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition, authorizedCommitter)) assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition, @@ -203,7 +203,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { val stageAttempt: Int = 1 val partition: Int = 1 val failedAttempt: Int = 0 - outputCommitCoordinator.stageStart(stage, maxPartitionId = 1) + outputCommitCoordinator.stageStart(stage, stageAttempt, maxPartitionId = 1) outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition, attemptNumber = failedAttempt, reason = ExecutorLostFailure("0", exitCausedByApp = true, None)) @@ -213,16 +213,17 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { test("SPARK-24589: Differentiate tasks from different stage attempts") { var stage = 1 + val stageAttempt: Int = 1 val taskAttempt = 1 val partition = 1 - outputCommitCoordinator.stageStart(stage, maxPartitionId = 1) + outputCommitCoordinator.stageStart(stage, stageAttempt, maxPartitionId = 1) assert(outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt)) assert(!outputCommitCoordinator.canCommit(stage, 2, partition, taskAttempt)) // Fail the task in the first attempt, the task in the second attempt should succeed. stage += 1 - outputCommitCoordinator.stageStart(stage, maxPartitionId = 1) + outputCommitCoordinator.stageStart(stage, stageAttempt, maxPartitionId = 1) outputCommitCoordinator.taskCompleted(stage, 1, partition, taskAttempt, ExecutorLostFailure("0", exitCausedByApp = true, None)) assert(!outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt)) @@ -231,7 +232,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { // Commit the 1st attempt, fail the 2nd attempt, make sure 3rd attempt cannot commit, // then fail the 1st attempt and make sure the 4th one can commit again. stage += 1 - outputCommitCoordinator.stageStart(stage, maxPartitionId = 1) + outputCommitCoordinator.stageStart(stage, stageAttempt, maxPartitionId = 1) assert(outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt)) outputCommitCoordinator.taskCompleted(stage, 2, partition, taskAttempt, ExecutorLostFailure("0", exitCausedByApp = true, None)) @@ -270,9 +271,26 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { assert(retriedStage.size === 1) assert(sc.dagScheduler.outputCommitCoordinator.isEmpty) verify(sc.env.outputCommitCoordinator, times(2)) - .stageStart(meq(retriedStage.head), any()) + .stageStart(meq(retriedStage.head), any(), any()) verify(sc.env.outputCommitCoordinator).stageEnd(meq(retriedStage.head)) } + + test("SPARK-26634: Do not allow attempts of failed stage to commit") { + val stage: Int = 1 + var stageAttempt: Int = 1 + val partition: Int = 1 + val taskAttempt: Int = 0 + outputCommitCoordinator.stageStart(stage, stageAttempt, maxPartitionId = 1) + stageAttempt += 1 + outputCommitCoordinator.stageStart(stage, stageAttempt, maxPartitionId = 1) + // attempts of failed stage is not authorized for committing + assert(!outputCommitCoordinator.canCommit(stage, stageAttempt - 1, partition, taskAttempt)) + outputCommitCoordinator.taskCompleted(stage, stageAttempt - 1, partition, + attemptNumber = taskAttempt, + reason = Success) + // attempts of current stage is authorized for committing + assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition, taskAttempt)) + } } /** From 5967f11f338ff319cbc2ac97d9c12eab92821658 Mon Sep 17 00:00:00 2001 From: Liupengcheng Date: Thu, 17 Jan 2019 10:21:31 +0800 Subject: [PATCH 2/2] Refine --- .../apache/spark/scheduler/OutputCommitCoordinator.scala | 6 +++--- .../spark/scheduler/OutputCommitCoordinatorSuite.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index a8bb9b4eaaee..a6538df3972f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -181,9 +181,9 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) partition: Int, attemptNumber: Int): Boolean = synchronized { stageStates.get(stage) match { - case Some(state) if attemptInvalidOrFailed(state, stageAttempt, partition, attemptNumber) => + case Some(state) if attemptOutdatedOrFailed(state, stageAttempt, partition, attemptNumber) => logInfo(s"Commit denied for stage=$stage.$stageAttempt, partition=$partition: " + - s"task attempt $attemptNumber already marked as failed.") + s"task attempt $attemptNumber already outdated or marked as failed.") false case Some(state) => val existing = state.authorizedCommitters(partition) @@ -204,7 +204,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) } } - private def attemptInvalidOrFailed( + private def attemptOutdatedOrFailed( stageState: StageState, stageAttempt: Int, partition: Int, diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index f6d3d2b6a23e..a8641b8ab226 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -288,7 +288,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { outputCommitCoordinator.taskCompleted(stage, stageAttempt - 1, partition, attemptNumber = taskAttempt, reason = Success) - // attempts of current stage is authorized for committing + // attempts of latest retry stage is authorized for committing assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition, taskAttempt)) } }