From 09e5d158e5dda6af7d83e9714dad6a64c21adf17 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 15 Jun 2018 16:21:58 -0700 Subject: [PATCH 01/12] [SPARK-24552][core] Correctly identify tasks in output commit coordinator. When an output stage is retried, it's possible that tasks from the previous attempt are still running. In that case, there would be a new task for the same partition in the new attempt, and the coordinator would allow both tasks to commit their output since it did not keep track of stage attempts. The change adds more information to the stage state tracked by the coordinator, so that only one task if allowed to commit the output in the above case. This also removes some code added in SPARK-18113 that allowed for duplicate commit requests; with the RPC code used in Spark 2, that situation cannot happen, so there is no need to handle it. --- .../spark/mapred/SparkHadoopMapRedUtil.scala | 8 +- .../apache/spark/scheduler/DAGScheduler.scala | 1 + .../scheduler/OutputCommitCoordinator.scala | 112 +++++++++--------- .../OutputCommitCoordinatorSuite.scala | 70 ++++++----- 4 files changed, 101 insertions(+), 90 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala index 764735dc4eae7..db8aff94ea1e1 100644 --- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala +++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala @@ -69,9 +69,9 @@ object SparkHadoopMapRedUtil extends Logging { if (shouldCoordinateWithDriver) { val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator - val taskAttemptNumber = TaskContext.get().attemptNumber() - val stageId = TaskContext.get().stageId() - val canCommit = outputCommitCoordinator.canCommit(stageId, splitId, taskAttemptNumber) + val ctx = TaskContext.get() + val canCommit = outputCommitCoordinator.canCommit(ctx.stageId(), ctx.stageAttemptNumber(), + splitId, ctx.attemptNumber()) if (canCommit) { performCommit() @@ -81,7 +81,7 @@ object SparkHadoopMapRedUtil extends Logging { logInfo(message) // We need to abort the task so that the driver can reschedule new attempts, if necessary committer.abortTask(mrTaskContext) - throw new CommitDeniedException(message, stageId, splitId, taskAttemptNumber) + throw new CommitDeniedException(message, ctx.stageId(), splitId, ctx.attemptNumber()) } } else { // Speculation is disabled or a user has chosen to manually bypass the commit coordination diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 041eade82d3ca..a12fa877b6a2c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1171,6 +1171,7 @@ class DAGScheduler( outputCommitCoordinator.taskCompleted( stageId, + task.stageAttemptId, task.partitionId, event.taskInfo.attemptNumber, // this is a task attempt number event.reason) diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index 83d87b548a430..6984b6b3751eb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -27,7 +27,11 @@ import org.apache.spark.util.{RpcUtils, ThreadUtils} private sealed trait OutputCommitCoordinationMessage extends Serializable private case object StopCoordinator extends OutputCommitCoordinationMessage -private case class AskPermissionToCommitOutput(stage: Int, partition: Int, attemptNumber: Int) +private case class AskPermissionToCommitOutput( + stage: Int, + stageAttempt: Int, + partition: Int, + attemptNumber: Int) /** * Authority that decides whether tasks can commit output to HDFS. Uses a "first committer wins" @@ -45,13 +49,15 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) // Initialized by SparkEnv var coordinatorRef: Option[RpcEndpointRef] = None - private type StageId = Int - private type PartitionId = Int - private type TaskAttemptNumber = Int - private val NO_AUTHORIZED_COMMITTER: TaskAttemptNumber = -1 + // Class used to identify a committer. The task ID for a committer is implicitly defined by + // the partition being processed, but the coordinator need to keep track of both the stage + // attempt and the task attempt, because in some situations the same task may be running + // concurrently in two different attempts of the same stage. + private case class TaskIdentifier(stageAttempt: Int, taskAttempt: Int) + private case class StageState(numPartitions: Int) { - val authorizedCommitters = Array.fill[TaskAttemptNumber](numPartitions)(NO_AUTHORIZED_COMMITTER) - val failures = mutable.Map[PartitionId, mutable.Set[TaskAttemptNumber]]() + val authorizedCommitters = Array.fill[TaskIdentifier](numPartitions)(null) + val failures = mutable.Map[Int, mutable.Set[TaskIdentifier]]() } /** @@ -64,7 +70,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) * * Access to this map should be guarded by synchronizing on the OutputCommitCoordinator instance. */ - private val stageStates = mutable.Map[StageId, StageState]() + private val stageStates = mutable.Map[Int, StageState]() /** * Returns whether the OutputCommitCoordinator's internal data structures are all empty. @@ -87,10 +93,11 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) * @return true if this task is authorized to commit, false otherwise */ def canCommit( - stage: StageId, - partition: PartitionId, - attemptNumber: TaskAttemptNumber): Boolean = { - val msg = AskPermissionToCommitOutput(stage, partition, attemptNumber) + stage: Int, + stageAttempt: Int, + partition: Int, + attemptNumber: Int): Boolean = { + val msg = AskPermissionToCommitOutput(stage, stageAttempt, partition, attemptNumber) coordinatorRef match { case Some(endpointRef) => ThreadUtils.awaitResult(endpointRef.ask[Boolean](msg), @@ -109,20 +116,21 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) * @param maxPartitionId the maximum partition id that could appear in this stage's tasks (i.e. * the maximum possible value of `context.partitionId`). */ - private[scheduler] def stageStart(stage: StageId, maxPartitionId: Int): Unit = synchronized { + private[scheduler] def stageStart(stage: Int, maxPartitionId: Int): Unit = synchronized { stageStates(stage) = new StageState(maxPartitionId + 1) } // Called by DAGScheduler - private[scheduler] def stageEnd(stage: StageId): Unit = synchronized { + private[scheduler] def stageEnd(stage: Int): Unit = synchronized { stageStates.remove(stage) } // Called by DAGScheduler private[scheduler] def taskCompleted( - stage: StageId, - partition: PartitionId, - attemptNumber: TaskAttemptNumber, + stage: Int, + stageAttempt: Int, + partition: Int, + attemptNumber: Int, reason: TaskEndReason): Unit = synchronized { val stageState = stageStates.getOrElse(stage, { logDebug(s"Ignoring task completion for completed stage") @@ -132,15 +140,16 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) case Success => // The task output has been committed successfully case denied: TaskCommitDenied => - logInfo(s"Task was denied committing, stage: $stage, partition: $partition, " + - s"attempt: $attemptNumber") + logInfo(s"Task was denied committing, stage: $stage / $stageAttempt, " + + s"partition: $partition, attempt: $attemptNumber") case otherReason => // Mark the attempt as failed to blacklist from future commit protocol - stageState.failures.getOrElseUpdate(partition, mutable.Set()) += attemptNumber - if (stageState.authorizedCommitters(partition) == attemptNumber) { + val taskId = TaskIdentifier(stageAttempt, attemptNumber) + stageState.failures.getOrElseUpdate(partition, mutable.Set()) += taskId + if (stageState.authorizedCommitters(partition) == taskId) { logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " + s"partition=$partition) failed; clearing lock") - stageState.authorizedCommitters(partition) = NO_AUTHORIZED_COMMITTER + stageState.authorizedCommitters(partition) = null } } } @@ -155,47 +164,41 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) // Marked private[scheduler] instead of private so this can be mocked in tests private[scheduler] def handleAskPermissionToCommit( - stage: StageId, - partition: PartitionId, - attemptNumber: TaskAttemptNumber): Boolean = synchronized { + stage: Int, + stageAttempt: Int, + partition: Int, + attemptNumber: Int): Boolean = synchronized { stageStates.get(stage) match { - case Some(state) if attemptFailed(state, partition, attemptNumber) => - logInfo(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage," + - s" partition=$partition as task attempt $attemptNumber has already failed.") + case Some(state) if attemptFailed(state, stageAttempt, partition, attemptNumber) => + logInfo(s"Commit denied for stage=$stage/$attemptNumber, partition=$partition: " + + s"task attempt $attemptNumber already marked as failed.") false case Some(state) => - state.authorizedCommitters(partition) match { - case NO_AUTHORIZED_COMMITTER => - logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for stage=$stage, " + - s"partition=$partition") - state.authorizedCommitters(partition) = attemptNumber - true - case existingCommitter => - // Coordinator should be idempotent when receiving AskPermissionToCommit. - if (existingCommitter == attemptNumber) { - logWarning(s"Authorizing duplicate request to commit for " + - s"attemptNumber=$attemptNumber to commit for stage=$stage," + - s" partition=$partition; existingCommitter = $existingCommitter." + - s" This can indicate dropped network traffic.") - true - } else { - logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " + - s"partition=$partition; existingCommitter = $existingCommitter") - false - } + val existing = state.authorizedCommitters(partition) + if (existing == null) { + logDebug(s"Commit allowed for stage=$stage/$attemptNumber, partition=$partition: " + + s"task attempt $attemptNumber") + state.authorizedCommitters(partition) = TaskIdentifier(stageAttempt, attemptNumber) + true + } else { + logDebug(s"Commit denied for stage=$stage/$attemptNumber, partition=$partition: " + + s"already committed by $existing") + false } case None => - logDebug(s"Stage $stage has completed, so not allowing" + - s" attempt number $attemptNumber of partition $partition to commit") + logDebug(s"Commit denied for stage=$stage/$attemptNumber, partition=$partition: " + + "stage already marked as completed.") false } } private def attemptFailed( stageState: StageState, - partition: PartitionId, - attempt: TaskAttemptNumber): Boolean = synchronized { - stageState.failures.get(partition).exists(_.contains(attempt)) + stageAttempt: Int, + partition: Int, + attempt: Int): Boolean = synchronized { + val failInfo = TaskIdentifier(stageAttempt, attempt) + stageState.failures.get(partition).exists(_.contains(failInfo)) } } @@ -215,9 +218,10 @@ private[spark] object OutputCommitCoordinator { } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case AskPermissionToCommitOutput(stage, partition, attemptNumber) => + case AskPermissionToCommitOutput(stage, stageAttempt, partition, attemptNumber) => context.reply( - outputCommitCoordinator.handleAskPermissionToCommit(stage, partition, attemptNumber)) + outputCommitCoordinator.handleAskPermissionToCommit(stage, stageAttempt, partition, + attemptNumber)) } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index 03b1903902491..51ceed5cef092 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -153,7 +153,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { test("Job should not complete if all commits are denied") { // Create a mock OutputCommitCoordinator that denies all attempts to commit doReturn(false).when(outputCommitCoordinator).handleAskPermissionToCommit( - Matchers.any(), Matchers.any(), Matchers.any()) + Matchers.any(), Matchers.any(), Matchers.any(), Matchers.any()) val rdd: RDD[Int] = sc.parallelize(Seq(1), 1) def resultHandler(x: Int, y: Unit): Unit = {} val futureAction: SimpleFutureAction[Unit] = sc.submitJob[Int, Unit, Unit](rdd, @@ -169,45 +169,61 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { test("Only authorized committer failures can clear the authorized committer lock (SPARK-6614)") { val stage: Int = 1 + val stageAttempt: Int = 1 val partition: Int = 2 val authorizedCommitter: Int = 3 val nonAuthorizedCommitter: Int = 100 outputCommitCoordinator.stageStart(stage, maxPartitionId = 2) - assert(outputCommitCoordinator.canCommit(stage, partition, authorizedCommitter)) - assert(!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter)) + assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition, authorizedCommitter)) + assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition, + nonAuthorizedCommitter)) // The non-authorized committer fails - outputCommitCoordinator.taskCompleted( - stage, partition, attemptNumber = nonAuthorizedCommitter, reason = TaskKilled("test")) + outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition, + attemptNumber = nonAuthorizedCommitter, reason = TaskKilled("test")) // New tasks should still not be able to commit because the authorized committer has not failed - assert( - !outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 1)) + assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition, + nonAuthorizedCommitter + 1)) // The authorized committer now fails, clearing the lock - outputCommitCoordinator.taskCompleted( - stage, partition, attemptNumber = authorizedCommitter, reason = TaskKilled("test")) + outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition, + attemptNumber = authorizedCommitter, reason = TaskKilled("test")) // A new task should now be allowed to become the authorized committer - assert( - outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 2)) + assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition, + nonAuthorizedCommitter + 2)) // There can only be one authorized committer - assert( - !outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 3)) - } - - test("Duplicate calls to canCommit from the authorized committer gets idempotent responses.") { - val rdd = sc.parallelize(Seq(1), 1) - sc.runJob(rdd, OutputCommitFunctions(tempDir.getAbsolutePath).callCanCommitMultipleTimes _, - 0 until rdd.partitions.size) + assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition, + nonAuthorizedCommitter + 3)) } test("SPARK-19631: Do not allow failed attempts to be authorized for committing") { val stage: Int = 1 + val stageAttempt: Int = 1 val partition: Int = 1 val failedAttempt: Int = 0 outputCommitCoordinator.stageStart(stage, maxPartitionId = 1) - outputCommitCoordinator.taskCompleted(stage, partition, attemptNumber = failedAttempt, + outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition, + attemptNumber = failedAttempt, reason = ExecutorLostFailure("0", exitCausedByApp = true, None)) - assert(!outputCommitCoordinator.canCommit(stage, partition, failedAttempt)) - assert(outputCommitCoordinator.canCommit(stage, partition, failedAttempt + 1)) + assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition, failedAttempt)) + assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition, failedAttempt + 1)) + } + + test("SPARK-24552: Differentiate tasks from different stage attempts") { + var stage = 1 + val taskAttempt = 1 + val partition = 1 + + outputCommitCoordinator.stageStart(stage, maxPartitionId = 1) + assert(outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt)) + assert(!outputCommitCoordinator.canCommit(stage, 2, partition, taskAttempt)) + + // Fail the task in the first attempt, the task in the second attempt should succeed. + stage += 1 + outputCommitCoordinator.stageStart(stage, maxPartitionId = 1) + outputCommitCoordinator.taskCompleted(stage, 1, partition, taskAttempt, + ExecutorLostFailure("0", exitCausedByApp = true, None)) + assert(!outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt)) + assert(outputCommitCoordinator.canCommit(stage, 2, partition, taskAttempt)) } } @@ -243,16 +259,6 @@ private case class OutputCommitFunctions(tempDirPath: String) { if (ctx.attemptNumber == 0) failingOutputCommitter else successfulOutputCommitter) } - // Receiver should be idempotent for AskPermissionToCommitOutput - def callCanCommitMultipleTimes(iter: Iterator[Int]): Unit = { - val ctx = TaskContext.get() - val canCommit1 = SparkEnv.get.outputCommitCoordinator - .canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber()) - val canCommit2 = SparkEnv.get.outputCommitCoordinator - .canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber()) - assert(canCommit1 && canCommit2) - } - private def runCommitWithProvidedCommitter( ctx: TaskContext, iter: Iterator[Int], From d471b74d4562ca8aa8e68a7ff90d881a67de5e59 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 15 Jun 2018 16:55:41 -0700 Subject: [PATCH 02/12] Fix sql compilation. --- .../execution/datasources/v2/WriteToDataSourceV2.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala index ea4bda327f36f..11ed7131e7e3d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala @@ -109,6 +109,7 @@ object DataWritingSparkTask extends Logging { iter: Iterator[InternalRow], useCommitCoordinator: Boolean): WriterCommitMessage = { val stageId = context.stageId() + val stageAttempt = context.stageAttemptNumber() val partId = context.partitionId() val attemptId = context.attemptNumber() val epochId = Option(context.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY)).getOrElse("0") @@ -122,12 +123,14 @@ object DataWritingSparkTask extends Logging { val msg = if (useCommitCoordinator) { val coordinator = SparkEnv.get.outputCommitCoordinator - val commitAuthorized = coordinator.canCommit(context.stageId(), partId, attemptId) + val commitAuthorized = coordinator.canCommit(stageId, stageAttempt, partId, attemptId) if (commitAuthorized) { - logInfo(s"Writer for stage $stageId, task $partId.$attemptId is authorized to commit.") + logInfo(s"Writer for stage $stageId / $stageAttempt, " + + s"task $partId.$attemptId is authorized to commit.") dataWriter.commit() } else { - val message = s"Stage $stageId, task $partId.$attemptId: driver did not authorize commit" + val message = s"Stage $stageId / $stageAttempt, " + + s"task $partId.$attemptId: driver did not authorize commit" logInfo(message) // throwing CommitDeniedException will trigger the catch block for abort throw new CommitDeniedException(message, stageId, partId, attemptId) From 1cde305d92992dba8e18230fd869a898d57870be Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 15 Jun 2018 17:04:59 -0700 Subject: [PATCH 03/12] Cleanup. --- .../apache/spark/scheduler/OutputCommitCoordinator.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index 6984b6b3751eb..d7d55ad15edc8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -50,7 +50,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) var coordinatorRef: Option[RpcEndpointRef] = None // Class used to identify a committer. The task ID for a committer is implicitly defined by - // the partition being processed, but the coordinator need to keep track of both the stage + // the partition being processed, but the coordinator needs to keep track of both the stage // attempt and the task attempt, because in some situations the same task may be running // concurrently in two different attempts of the same stage. private case class TaskIdentifier(stageAttempt: Int, taskAttempt: Int) @@ -139,10 +139,10 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) reason match { case Success => // The task output has been committed successfully - case denied: TaskCommitDenied => + case _: TaskCommitDenied => logInfo(s"Task was denied committing, stage: $stage / $stageAttempt, " + s"partition: $partition, attempt: $attemptNumber") - case otherReason => + case _ => // Mark the attempt as failed to blacklist from future commit protocol val taskId = TaskIdentifier(stageAttempt, attemptNumber) stageState.failures.getOrElseUpdate(partition, mutable.Set()) += taskId @@ -176,7 +176,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) case Some(state) => val existing = state.authorizedCommitters(partition) if (existing == null) { - logDebug(s"Commit allowed for stage=$stage/$attemptNumber, partition=$partition: " + + logDebug(s"Commit allowed for stage=$stage/$attemptNumber, partition=$partition, " + s"task attempt $attemptNumber") state.authorizedCommitters(partition) = TaskIdentifier(stageAttempt, attemptNumber) true From 5437d4abfea2ce740de41d515a186430b35c23d3 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 18 Jun 2018 11:00:54 -0700 Subject: [PATCH 04/12] Add stage ID to CommitDeniedException. Rename the field to match what it actually is; except for the JSON-serialized version, which for backwards compatibility still uses "job" instead of "stage". --- .../org/apache/spark/TaskEndReason.scala | 5 +++-- .../executor/CommitDeniedException.scala | 8 ++++++-- .../spark/mapred/SparkHadoopMapRedUtil.scala | 3 ++- .../org/apache/spark/util/JsonProtocol.scala | 6 ++++-- .../spark/scheduler/TaskSetManagerSuite.scala | 2 +- .../spark/status/AppStatusListenerSuite.scala | 2 +- .../apache/spark/util/JsonProtocolSuite.scala | 19 ++++++++++++++----- .../datasources/v2/WriteToDataSourceV2.scala | 2 +- 8 files changed, 32 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 33901bc8380e9..f86b7c5f03898 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -229,11 +229,12 @@ case class TaskKilled( */ @DeveloperApi case class TaskCommitDenied( - jobID: Int, + stageID: Int, + stageAttempt: Int, partitionID: Int, attemptNumber: Int) extends TaskFailedReason { override def toErrorString: String = s"TaskCommitDenied (Driver denied task commit)" + - s" for job: $jobID, partition: $partitionID, attemptNumber: $attemptNumber" + s" for stage: $stageID / $stageAttempt, partition: $partitionID, attemptNumber: $attemptNumber" /** * If a task failed because its attempt to commit was denied, do not count this failure * towards failing the stage. This is intended to prevent spurious stage failures in cases diff --git a/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala b/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala index 3e0d52cb4ccb9..99b0415bfca94 100644 --- a/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala +++ b/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala @@ -24,10 +24,14 @@ import org.apache.spark.TaskCommitDenied */ private[spark] class CommitDeniedException( msg: String, - jobID: Int, + stageID: Int, + stageAttempt: Int, splitID: Int, attemptNumber: Int) extends Exception(msg) { - def toTaskCommitDeniedReason: TaskCommitDenied = TaskCommitDenied(jobID, splitID, attemptNumber) + def toTaskCommitDeniedReason: TaskCommitDenied = { + TaskCommitDenied(stageID, stageAttempt, splitID, attemptNumber) + } + } diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala index db8aff94ea1e1..309727f0ddf10 100644 --- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala +++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala @@ -81,7 +81,8 @@ object SparkHadoopMapRedUtil extends Logging { logInfo(message) // We need to abort the task so that the driver can reschedule new attempts, if necessary committer.abortTask(mrTaskContext) - throw new CommitDeniedException(message, ctx.stageId(), splitId, ctx.attemptNumber()) + throw new CommitDeniedException(message, ctx.stageId(), ctx.stageAttemptNumber(), + splitId, ctx.attemptNumber()) } } else { // Speculation is disabled or a user has chosen to manually bypass the commit coordination diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 50c6461373dee..2213b69ba1700 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -399,7 +399,8 @@ private[spark] object JsonProtocol { ("Full Stack Trace" -> exceptionFailure.fullStackTrace) ~ ("Accumulator Updates" -> accumUpdates) case taskCommitDenied: TaskCommitDenied => - ("Job ID" -> taskCommitDenied.jobID) ~ + ("Job ID" -> taskCommitDenied.stageID) ~ + ("Job Attempt Number" -> taskCommitDenied.stageAttempt) ~ ("Partition ID" -> taskCommitDenied.partitionID) ~ ("Attempt Number" -> taskCommitDenied.attemptNumber) case ExecutorLostFailure(executorId, exitCausedByApp, reason) => @@ -928,9 +929,10 @@ private[spark] object JsonProtocol { // de/serialization logic was not added until 1.5.1. To provide backward compatibility // for reading those logs, we need to provide default values for all the fields. val jobId = jsonOption(json \ "Job ID").map(_.extract[Int]).getOrElse(-1) + val jobAttemptNo = jsonOption(json \ "Job Attempt Number").map(_.extract[Int]).getOrElse(-1) val partitionId = jsonOption(json \ "Partition ID").map(_.extract[Int]).getOrElse(-1) val attemptNo = jsonOption(json \ "Attempt Number").map(_.extract[Int]).getOrElse(-1) - TaskCommitDenied(jobId, partitionId, attemptNo) + TaskCommitDenied(jobId, jobAttemptNo, partitionId, attemptNo) case `executorLostFailure` => val exitCausedByApp = jsonOption(json \ "Exit Caused By App").map(_.extract[Boolean]) val executorId = jsonOption(json \ "Executor ID").map(_.extract[String]) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index ca6a7e5db3b17..204a2bfc96e06 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1250,7 +1250,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg tsmSpy.handleFailedTask(taskDescs(1).taskId, TaskState.FAILED, ExecutorLostFailure(taskDescs(1).executorId, exitCausedByApp = false, reason = None)) tsmSpy.handleFailedTask(taskDescs(2).taskId, TaskState.FAILED, - TaskCommitDenied(0, 2, 0)) + TaskCommitDenied(0, 1, 2, 0)) tsmSpy.handleFailedTask(taskDescs(3).taskId, TaskState.KILLED, TaskKilled("test")) // Make sure that the blacklist ignored all of the task failures above, since they aren't diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 1cd71955ad4d9..e17bd14c200ca 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -357,7 +357,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { // Start a new attempt and finish it with TaskCommitDenied, make sure it's handled like a kill. time += 1 val denied = newAttempt(killed, nextTaskId()) - val denyReason = TaskCommitDenied(1, 1, 1) + val denyReason = TaskCommitDenied(1, 1, 1, 1) listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptNumber, denied)) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 74b72d940eeef..5b367183032f4 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -169,7 +169,7 @@ class JsonProtocolSuite extends SparkFunSuite { testTaskEndReason(exceptionFailure) testTaskEndReason(TaskResultLost) testTaskEndReason(TaskKilled("test")) - testTaskEndReason(TaskCommitDenied(2, 3, 4)) + testTaskEndReason(TaskCommitDenied(2, 1, 3, 4)) testTaskEndReason(ExecutorLostFailure("100", true, Some("Induced failure"))) testTaskEndReason(UnknownReason) @@ -369,13 +369,21 @@ class JsonProtocolSuite extends SparkFunSuite { // `TaskCommitDenied` was added in 1.3.0 but JSON de/serialization logic was added in 1.5.1 test("TaskCommitDenied backward compatibility") { - val denied = TaskCommitDenied(1, 2, 3) + val denied = TaskCommitDenied(1, 1, 2, 3) val oldDenied = JsonProtocol.taskEndReasonToJson(denied) .removeField({ _._1 == "Job ID" }) + .removeField({ _._1 == "Job Attempt Number" }) .removeField({ _._1 == "Partition ID" }) .removeField({ _._1 == "Attempt Number" }) - val expectedDenied = TaskCommitDenied(-1, -1, -1) + val expectedDenied = TaskCommitDenied(-1, -1, -1, -1) assertEquals(expectedDenied, JsonProtocol.taskEndReasonFromJson(oldDenied)) + + // Job (actually, stage) attempt number was added in SPARK-24552. + val noJobAttempt = JsonProtocol.taskEndReasonToJson(denied) + .removeField({ _._1 == "Job Attempt Number" }) + val expectedDenied2 = TaskCommitDenied(denied.stageID, -1, denied.partitionID, + denied.attemptNumber) + assertEquals(expectedDenied2, JsonProtocol.taskEndReasonFromJson(noJobAttempt)) } test("AccumulableInfo backward compatibility") { @@ -682,9 +690,10 @@ private[spark] object JsonProtocolSuite extends Assertions { case (TaskResultLost, TaskResultLost) => case (r1: TaskKilled, r2: TaskKilled) => assert(r1.reason == r2.reason) - case (TaskCommitDenied(jobId1, partitionId1, attemptNumber1), - TaskCommitDenied(jobId2, partitionId2, attemptNumber2)) => + case (TaskCommitDenied(jobId1, jobAttempt1, partitionId1, attemptNumber1), + TaskCommitDenied(jobId2, jobAttempt2, partitionId2, attemptNumber2)) => assert(jobId1 === jobId2) + assert(jobAttempt1 === jobAttempt2) assert(partitionId1 === partitionId2) assert(attemptNumber1 === attemptNumber2) case (ExecutorLostFailure(execId1, exit1CausedByApp, reason1), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala index 11ed7131e7e3d..2a4d8a3b818b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala @@ -133,7 +133,7 @@ object DataWritingSparkTask extends Logging { s"task $partId.$attemptId: driver did not authorize commit" logInfo(message) // throwing CommitDeniedException will trigger the catch block for abort - throw new CommitDeniedException(message, stageId, partId, attemptId) + throw new CommitDeniedException(message, stageId, stageAttempt, partId, attemptId) } } else { From a2f4c1bde8b6d293c20983bd019f15c3c4f2703d Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 18 Jun 2018 11:13:17 -0700 Subject: [PATCH 05/12] Fix log message (task attempt -> stage attempt). --- .../apache/spark/scheduler/OutputCommitCoordinator.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index d7d55ad15edc8..a32dfb2fffa39 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -170,23 +170,23 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) attemptNumber: Int): Boolean = synchronized { stageStates.get(stage) match { case Some(state) if attemptFailed(state, stageAttempt, partition, attemptNumber) => - logInfo(s"Commit denied for stage=$stage/$attemptNumber, partition=$partition: " + + logInfo(s"Commit denied for stage=$stage/$stageAttempt, partition=$partition: " + s"task attempt $attemptNumber already marked as failed.") false case Some(state) => val existing = state.authorizedCommitters(partition) if (existing == null) { - logDebug(s"Commit allowed for stage=$stage/$attemptNumber, partition=$partition, " + + logDebug(s"Commit allowed for stage=$stage/$stageAttempt, partition=$partition, " + s"task attempt $attemptNumber") state.authorizedCommitters(partition) = TaskIdentifier(stageAttempt, attemptNumber) true } else { - logDebug(s"Commit denied for stage=$stage/$attemptNumber, partition=$partition: " + + logDebug(s"Commit denied for stage=$stage/$stageAttempt, partition=$partition: " + s"already committed by $existing") false } case None => - logDebug(s"Commit denied for stage=$stage/$attemptNumber, partition=$partition: " + + logDebug(s"Commit denied for stage=$stage/$stageAttempt, partition=$partition: " + "stage already marked as completed.") false } From 37ff30754596ac19a30b71f39921012ee382151b Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 19 Jun 2018 09:33:02 -0700 Subject: [PATCH 06/12] Revert "Add stage ID to CommitDeniedException." This reverts commit 5437d4abfea2ce740de41d515a186430b35c23d3. --- .../org/apache/spark/TaskEndReason.scala | 5 ++--- .../executor/CommitDeniedException.scala | 8 ++------ .../spark/mapred/SparkHadoopMapRedUtil.scala | 3 +-- .../org/apache/spark/util/JsonProtocol.scala | 6 ++---- .../spark/scheduler/TaskSetManagerSuite.scala | 2 +- .../spark/status/AppStatusListenerSuite.scala | 2 +- .../apache/spark/util/JsonProtocolSuite.scala | 19 +++++-------------- .../datasources/v2/WriteToDataSourceV2.scala | 2 +- 8 files changed, 15 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index f86b7c5f03898..33901bc8380e9 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -229,12 +229,11 @@ case class TaskKilled( */ @DeveloperApi case class TaskCommitDenied( - stageID: Int, - stageAttempt: Int, + jobID: Int, partitionID: Int, attemptNumber: Int) extends TaskFailedReason { override def toErrorString: String = s"TaskCommitDenied (Driver denied task commit)" + - s" for stage: $stageID / $stageAttempt, partition: $partitionID, attemptNumber: $attemptNumber" + s" for job: $jobID, partition: $partitionID, attemptNumber: $attemptNumber" /** * If a task failed because its attempt to commit was denied, do not count this failure * towards failing the stage. This is intended to prevent spurious stage failures in cases diff --git a/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala b/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala index 99b0415bfca94..3e0d52cb4ccb9 100644 --- a/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala +++ b/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala @@ -24,14 +24,10 @@ import org.apache.spark.TaskCommitDenied */ private[spark] class CommitDeniedException( msg: String, - stageID: Int, - stageAttempt: Int, + jobID: Int, splitID: Int, attemptNumber: Int) extends Exception(msg) { - def toTaskCommitDeniedReason: TaskCommitDenied = { - TaskCommitDenied(stageID, stageAttempt, splitID, attemptNumber) - } - + def toTaskCommitDeniedReason: TaskCommitDenied = TaskCommitDenied(jobID, splitID, attemptNumber) } diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala index 309727f0ddf10..db8aff94ea1e1 100644 --- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala +++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala @@ -81,8 +81,7 @@ object SparkHadoopMapRedUtil extends Logging { logInfo(message) // We need to abort the task so that the driver can reschedule new attempts, if necessary committer.abortTask(mrTaskContext) - throw new CommitDeniedException(message, ctx.stageId(), ctx.stageAttemptNumber(), - splitId, ctx.attemptNumber()) + throw new CommitDeniedException(message, ctx.stageId(), splitId, ctx.attemptNumber()) } } else { // Speculation is disabled or a user has chosen to manually bypass the commit coordination diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 2213b69ba1700..50c6461373dee 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -399,8 +399,7 @@ private[spark] object JsonProtocol { ("Full Stack Trace" -> exceptionFailure.fullStackTrace) ~ ("Accumulator Updates" -> accumUpdates) case taskCommitDenied: TaskCommitDenied => - ("Job ID" -> taskCommitDenied.stageID) ~ - ("Job Attempt Number" -> taskCommitDenied.stageAttempt) ~ + ("Job ID" -> taskCommitDenied.jobID) ~ ("Partition ID" -> taskCommitDenied.partitionID) ~ ("Attempt Number" -> taskCommitDenied.attemptNumber) case ExecutorLostFailure(executorId, exitCausedByApp, reason) => @@ -929,10 +928,9 @@ private[spark] object JsonProtocol { // de/serialization logic was not added until 1.5.1. To provide backward compatibility // for reading those logs, we need to provide default values for all the fields. val jobId = jsonOption(json \ "Job ID").map(_.extract[Int]).getOrElse(-1) - val jobAttemptNo = jsonOption(json \ "Job Attempt Number").map(_.extract[Int]).getOrElse(-1) val partitionId = jsonOption(json \ "Partition ID").map(_.extract[Int]).getOrElse(-1) val attemptNo = jsonOption(json \ "Attempt Number").map(_.extract[Int]).getOrElse(-1) - TaskCommitDenied(jobId, jobAttemptNo, partitionId, attemptNo) + TaskCommitDenied(jobId, partitionId, attemptNo) case `executorLostFailure` => val exitCausedByApp = jsonOption(json \ "Exit Caused By App").map(_.extract[Boolean]) val executorId = jsonOption(json \ "Executor ID").map(_.extract[String]) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 204a2bfc96e06..ca6a7e5db3b17 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1250,7 +1250,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg tsmSpy.handleFailedTask(taskDescs(1).taskId, TaskState.FAILED, ExecutorLostFailure(taskDescs(1).executorId, exitCausedByApp = false, reason = None)) tsmSpy.handleFailedTask(taskDescs(2).taskId, TaskState.FAILED, - TaskCommitDenied(0, 1, 2, 0)) + TaskCommitDenied(0, 2, 0)) tsmSpy.handleFailedTask(taskDescs(3).taskId, TaskState.KILLED, TaskKilled("test")) // Make sure that the blacklist ignored all of the task failures above, since they aren't diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index e17bd14c200ca..1cd71955ad4d9 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -357,7 +357,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { // Start a new attempt and finish it with TaskCommitDenied, make sure it's handled like a kill. time += 1 val denied = newAttempt(killed, nextTaskId()) - val denyReason = TaskCommitDenied(1, 1, 1, 1) + val denyReason = TaskCommitDenied(1, 1, 1) listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptNumber, denied)) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 5b367183032f4..74b72d940eeef 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -169,7 +169,7 @@ class JsonProtocolSuite extends SparkFunSuite { testTaskEndReason(exceptionFailure) testTaskEndReason(TaskResultLost) testTaskEndReason(TaskKilled("test")) - testTaskEndReason(TaskCommitDenied(2, 1, 3, 4)) + testTaskEndReason(TaskCommitDenied(2, 3, 4)) testTaskEndReason(ExecutorLostFailure("100", true, Some("Induced failure"))) testTaskEndReason(UnknownReason) @@ -369,21 +369,13 @@ class JsonProtocolSuite extends SparkFunSuite { // `TaskCommitDenied` was added in 1.3.0 but JSON de/serialization logic was added in 1.5.1 test("TaskCommitDenied backward compatibility") { - val denied = TaskCommitDenied(1, 1, 2, 3) + val denied = TaskCommitDenied(1, 2, 3) val oldDenied = JsonProtocol.taskEndReasonToJson(denied) .removeField({ _._1 == "Job ID" }) - .removeField({ _._1 == "Job Attempt Number" }) .removeField({ _._1 == "Partition ID" }) .removeField({ _._1 == "Attempt Number" }) - val expectedDenied = TaskCommitDenied(-1, -1, -1, -1) + val expectedDenied = TaskCommitDenied(-1, -1, -1) assertEquals(expectedDenied, JsonProtocol.taskEndReasonFromJson(oldDenied)) - - // Job (actually, stage) attempt number was added in SPARK-24552. - val noJobAttempt = JsonProtocol.taskEndReasonToJson(denied) - .removeField({ _._1 == "Job Attempt Number" }) - val expectedDenied2 = TaskCommitDenied(denied.stageID, -1, denied.partitionID, - denied.attemptNumber) - assertEquals(expectedDenied2, JsonProtocol.taskEndReasonFromJson(noJobAttempt)) } test("AccumulableInfo backward compatibility") { @@ -690,10 +682,9 @@ private[spark] object JsonProtocolSuite extends Assertions { case (TaskResultLost, TaskResultLost) => case (r1: TaskKilled, r2: TaskKilled) => assert(r1.reason == r2.reason) - case (TaskCommitDenied(jobId1, jobAttempt1, partitionId1, attemptNumber1), - TaskCommitDenied(jobId2, jobAttempt2, partitionId2, attemptNumber2)) => + case (TaskCommitDenied(jobId1, partitionId1, attemptNumber1), + TaskCommitDenied(jobId2, partitionId2, attemptNumber2)) => assert(jobId1 === jobId2) - assert(jobAttempt1 === jobAttempt2) assert(partitionId1 === partitionId2) assert(attemptNumber1 === attemptNumber2) case (ExecutorLostFailure(execId1, exit1CausedByApp, reason1), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala index 2a4d8a3b818b6..11ed7131e7e3d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala @@ -133,7 +133,7 @@ object DataWritingSparkTask extends Logging { s"task $partId.$attemptId: driver did not authorize commit" logInfo(message) // throwing CommitDeniedException will trigger the catch block for abort - throw new CommitDeniedException(message, stageId, stageAttempt, partId, attemptId) + throw new CommitDeniedException(message, stageId, partId, attemptId) } } else { From 066dca560e39860bef3b9bd246b3fb141f9a8c1f Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 19 Jun 2018 11:21:26 -0700 Subject: [PATCH 07/12] Reuse the stage state when it's immediately re-attempted. This avoids an issue where commit information is discarded across stage attempt boundaries, which could lead to speculative tasks that haven't yet been properly killed generating duplicate output. --- .../apache/spark/scheduler/DAGScheduler.scala | 22 +++++++---- .../scheduler/OutputCommitCoordinator.scala | 12 +++++- .../OutputCommitCoordinatorSuite.scala | 37 ++++++++++++++++++- 3 files changed, 59 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index a12fa877b6a2c..f74425d73b392 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1331,23 +1331,24 @@ class DAGScheduler( s" ${task.stageAttemptId} and there is a more recent attempt for that stage " + s"(attempt ${failedStage.latestInfo.attemptNumber}) running") } else { + failedStage.fetchFailedAttemptIds.add(task.stageAttemptId) + val shouldAbortStage = + failedStage.fetchFailedAttemptIds.size >= maxConsecutiveStageAttempts || + disallowStageRetryForTest + // It is likely that we receive multiple FetchFailed for a single stage (because we have // multiple tasks running concurrently on different executors). In that case, it is // possible the fetch failure has already been handled by the scheduler. if (runningStages.contains(failedStage)) { logInfo(s"Marking $failedStage (${failedStage.name}) as failed " + s"due to a fetch failure from $mapStage (${mapStage.name})") - markStageAsFinished(failedStage, Some(failureMessage)) + markStageAsFinished(failedStage, errorMessage = Some(failureMessage), + willRetry = !shouldAbortStage) } else { logDebug(s"Received fetch failure from $task, but its from $failedStage which is no " + s"longer running") } - failedStage.fetchFailedAttemptIds.add(task.stageAttemptId) - val shouldAbortStage = - failedStage.fetchFailedAttemptIds.size >= maxConsecutiveStageAttempts || - disallowStageRetryForTest - if (shouldAbortStage) { val abortMessage = if (disallowStageRetryForTest) { "Fetch failure will not retry stage due to testing config" @@ -1546,7 +1547,10 @@ class DAGScheduler( /** * Marks a stage as finished and removes it from the list of running stages. */ - private def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None): Unit = { + private def markStageAsFinished( + stage: Stage, + errorMessage: Option[String] = None, + willRetry: Boolean = false): Unit = { val serviceTime = stage.latestInfo.submissionTime match { case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0) case _ => "Unknown" @@ -1565,7 +1569,9 @@ class DAGScheduler( logInfo(s"$stage (${stage.name}) failed in $serviceTime s due to ${errorMessage.get}") } - outputCommitCoordinator.stageEnd(stage.id) + if (!willRetry) { + outputCommitCoordinator.stageEnd(stage.id) + } listenerBus.post(SparkListenerStageCompleted(stage.latestInfo)) runningStages -= stage } diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index a32dfb2fffa39..b14d430e6e8ad 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -110,14 +110,22 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) } /** - * Called by the DAGScheduler when a stage starts. + * Called by the DAGScheduler when a stage starts. Initializes the stage's state if it hasn't + * yet been initialized. * * @param stage the stage id. * @param maxPartitionId the maximum partition id that could appear in this stage's tasks (i.e. * the maximum possible value of `context.partitionId`). */ private[scheduler] def stageStart(stage: Int, maxPartitionId: Int): Unit = synchronized { - stageStates(stage) = new StageState(maxPartitionId + 1) + stageStates.get(stage) match { + case Some(state) => + require(state.authorizedCommitters.length == maxPartitionId + 1) + logInfo(s"Reusing state from previous attempt of stage $stage.") + + case _ => + stageStates(stage) = new StageState(maxPartitionId + 1) + } } // Called by DAGScheduler diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index 51ceed5cef092..fd450d34e0806 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -21,12 +21,13 @@ import java.io.File import java.util.Date import java.util.concurrent.TimeoutException +import scala.collection.JavaConverters._ import scala.concurrent.duration._ import scala.language.postfixOps import org.apache.hadoop.mapred._ import org.apache.hadoop.mapreduce.TaskType -import org.mockito.Matchers +import org.mockito.{ArgumentCaptor, Matchers} import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer @@ -35,6 +36,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark._ import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapRedCommitProtocol, SparkHadoopWriterUtils} import org.apache.spark.rdd.{FakeOutputCommitter, RDD} +import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.util.{ThreadUtils, Utils} /** @@ -208,7 +210,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition, failedAttempt + 1)) } - test("SPARK-24552: Differentiate tasks from different stage attempts") { + test("SPARK-24589: Differentiate tasks from different stage attempts") { var stage = 1 val taskAttempt = 1 val partition = 1 @@ -225,6 +227,37 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { assert(!outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt)) assert(outputCommitCoordinator.canCommit(stage, 2, partition, taskAttempt)) } + + test("SPARK-24589: Make sure stage state is cleaned up") { + // Normal application without stage failures. + sc.parallelize(1 to 100, 100) + .map { i => (i % 10, i) } + .reduceByKey(_ + _) + .collect() + + assert(sc.dagScheduler.outputCommitCoordinator.isEmpty) + + // Force failures in a few tasks so that a stage is retried. Collect the ID of the failing + // stage so that we can check the state of the output committer. + val retriedStage = sc.parallelize(1 to 100, 10) + .map { i => (i % 10, i) } + .reduceByKey { case (_, _) => + val ctx = TaskContext.get() + if (ctx.stageAttemptNumber() == 0) { + throw new FetchFailedException(SparkEnv.get.blockManager.blockManagerId, 1, 1, 1, + new Exception("Failure for test.")) + } else { + ctx.stageId() + } + } + .collect() + .map { case (k, v) => v } + .toSet + + assert(retriedStage.size === 1) + assert(sc.dagScheduler.outputCommitCoordinator.isEmpty) + verify(sc.env.outputCommitCoordinator).stageEnd(Matchers.eq(retriedStage.head)) + } } /** From adb0d18fa1bb43488245b7d6b7ee02d4997d6215 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 19 Jun 2018 11:23:11 -0700 Subject: [PATCH 08/12] Prettier logs. --- .../spark/scheduler/OutputCommitCoordinator.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index b14d430e6e8ad..b382d623806e2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -148,7 +148,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) case Success => // The task output has been committed successfully case _: TaskCommitDenied => - logInfo(s"Task was denied committing, stage: $stage / $stageAttempt, " + + logInfo(s"Task was denied committing, stage: $stage.$stageAttempt, " + s"partition: $partition, attempt: $attemptNumber") case _ => // Mark the attempt as failed to blacklist from future commit protocol @@ -178,23 +178,23 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) attemptNumber: Int): Boolean = synchronized { stageStates.get(stage) match { case Some(state) if attemptFailed(state, stageAttempt, partition, attemptNumber) => - logInfo(s"Commit denied for stage=$stage/$stageAttempt, partition=$partition: " + + logInfo(s"Commit denied for stage=$stage.$stageAttempt, partition=$partition: " + s"task attempt $attemptNumber already marked as failed.") false case Some(state) => val existing = state.authorizedCommitters(partition) if (existing == null) { - logDebug(s"Commit allowed for stage=$stage/$stageAttempt, partition=$partition, " + + logDebug(s"Commit allowed for stage=$stage.$stageAttempt, partition=$partition, " + s"task attempt $attemptNumber") state.authorizedCommitters(partition) = TaskIdentifier(stageAttempt, attemptNumber) true } else { - logDebug(s"Commit denied for stage=$stage/$stageAttempt, partition=$partition: " + + logDebug(s"Commit denied for stage=$stage.$stageAttempt, partition=$partition: " + s"already committed by $existing") false } case None => - logDebug(s"Commit denied for stage=$stage/$stageAttempt, partition=$partition: " + + logDebug(s"Commit denied for stage=$stage.$stageAttempt, partition=$partition: " + "stage already marked as completed.") false } From 535fbd4f093dcfd75d0d41791d0d1b5da1cf36d8 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 19 Jun 2018 14:10:40 -0700 Subject: [PATCH 09/12] Remove unneeded import. --- .../apache/spark/scheduler/OutputCommitCoordinatorSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index fd450d34e0806..1c742d3575b49 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -27,7 +27,7 @@ import scala.language.postfixOps import org.apache.hadoop.mapred._ import org.apache.hadoop.mapreduce.TaskType -import org.mockito.{ArgumentCaptor, Matchers} +import org.mockito.Matchers import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer From 0e91f1b49ee4720c9b89a2090080efd7c89ccaf4 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 19 Jun 2018 14:11:43 -0700 Subject: [PATCH 10/12] Another unused import. --- .../apache/spark/scheduler/OutputCommitCoordinatorSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index 1c742d3575b49..67c15f5e4eaa9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -21,7 +21,6 @@ import java.io.File import java.util.Date import java.util.concurrent.TimeoutException -import scala.collection.JavaConverters._ import scala.concurrent.duration._ import scala.language.postfixOps From 5ece2f12a820d6438146758f0e944f3b1c70d489 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 19 Jun 2018 15:38:42 -0700 Subject: [PATCH 11/12] Add another check in new test. --- .../apache/spark/scheduler/OutputCommitCoordinatorSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index 67c15f5e4eaa9..320456161ac5e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -255,6 +255,8 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { assert(retriedStage.size === 1) assert(sc.dagScheduler.outputCommitCoordinator.isEmpty) + verify(sc.env.outputCommitCoordinator, times(2)) + .stageStart(Matchers.eq(retriedStage.head), Matchers.any()) verify(sc.env.outputCommitCoordinator).stageEnd(Matchers.eq(retriedStage.head)) } } From 264c533737410786faae24df8cb5b27218f804cd Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 19 Jun 2018 17:27:37 -0700 Subject: [PATCH 12/12] Add one more test just in case. --- .../scheduler/OutputCommitCoordinatorSuite.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index 320456161ac5e..158c9eb75f2b6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -225,6 +225,18 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { ExecutorLostFailure("0", exitCausedByApp = true, None)) assert(!outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt)) assert(outputCommitCoordinator.canCommit(stage, 2, partition, taskAttempt)) + + // Commit the 1st attempt, fail the 2nd attempt, make sure 3rd attempt cannot commit, + // then fail the 1st attempt and make sure the 4th one can commit again. + stage += 1 + outputCommitCoordinator.stageStart(stage, maxPartitionId = 1) + assert(outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt)) + outputCommitCoordinator.taskCompleted(stage, 2, partition, taskAttempt, + ExecutorLostFailure("0", exitCausedByApp = true, None)) + assert(!outputCommitCoordinator.canCommit(stage, 3, partition, taskAttempt)) + outputCommitCoordinator.taskCompleted(stage, 1, partition, taskAttempt, + ExecutorLostFailure("0", exitCausedByApp = true, None)) + assert(outputCommitCoordinator.canCommit(stage, 4, partition, taskAttempt)) } test("SPARK-24589: Make sure stage state is cleaned up") {