From 88679a0631bb3ddd6707c2f2b81f8886bf837fd8 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 22 Jun 2018 12:58:16 -0700 Subject: [PATCH 1/2] [SPARK-24552][core] Use unique id instead of attempt number for writes [branch-2.2]. This passes a unique attempt id to the Hadoop APIs, because attempt number is reused when stages are retried. When attempt numbers are reused, sources that track data by partition id and attempt number may incorrectly clean up data because the same attempt number can be both committed and aborted. --- .../spark/internal/io/SparkHadoopMapReduceWriter.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala index 3b0a15848cd3b..930bba9a5c4d2 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala @@ -86,12 +86,16 @@ object SparkHadoopMapReduceWriter extends Logging { // Try to write all RDD partitions as a Hadoop OutputFormat. try { val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => { + // SPARK-24552: Generate a unique "attempt ID" based on the stage and task atempt numbers. + // Assumes that there won't be more than Short.MaxValue attempts, at least not concurrently. + val attemptId = (context.stageAttemptNumber << 16) | context.attemptNumber + executeTask( context = context, jobTrackerId = jobTrackerId, commitJobId = commitJobId, sparkPartitionId = context.partitionId, - sparkAttemptNumber = context.attemptNumber, + sparkAttemptNumber = attemptId, committer = committer, hadoopConf = conf.value, outputFormat = format.asInstanceOf[Class[OutputFormat[K, V]]], From ab2f7014528938464fb5c5fccea09c803ec1324e Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 22 Jun 2018 16:37:44 -0700 Subject: [PATCH 2/2] Typo. --- .../apache/spark/internal/io/SparkHadoopMapReduceWriter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala index 930bba9a5c4d2..dd72f94303666 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala @@ -86,7 +86,7 @@ object SparkHadoopMapReduceWriter extends Logging { // Try to write all RDD partitions as a Hadoop OutputFormat. try { val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => { - // SPARK-24552: Generate a unique "attempt ID" based on the stage and task atempt numbers. + // SPARK-24552: Generate a unique "attempt ID" based on the stage and task attempt numbers. // Assumes that there won't be more than Short.MaxValue attempts, at least not concurrently. val attemptId = (context.stageAttemptNumber << 16) | context.attemptNumber