From 6c60d1462c34f01610ada50c989832775b6fd117 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 13 Jun 2018 12:50:00 -0700 Subject: [PATCH 01/12] SPARK-24552: Use task ID instead of attempt number for v2 writes. --- .../datasources/v2/WriteToDataSourceV2.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala index ea4bda327f36f..84adc0f199bc2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala @@ -29,10 +29,8 @@ import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.streaming.{MicroBatchExecution, StreamExecution} -import org.apache.spark.sql.execution.streaming.continuous.{CommitPartitionEpoch, ContinuousExecution, EpochCoordinatorRef, SetWriterPartitions} +import org.apache.spark.sql.execution.streaming.MicroBatchExecution import org.apache.spark.sql.sources.v2.writer._ -import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils @@ -110,7 +108,7 @@ object DataWritingSparkTask extends Logging { useCommitCoordinator: Boolean): WriterCommitMessage = { val stageId = context.stageId() val partId = context.partitionId() - val attemptId = context.attemptNumber() + val attemptId = context.taskAttemptId().toInt val epochId = Option(context.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY)).getOrElse("0") val dataWriter = writeTask.createDataWriter(partId, attemptId, epochId.toLong) @@ -124,10 +122,12 @@ object DataWritingSparkTask extends Logging { val coordinator = SparkEnv.get.outputCommitCoordinator val commitAuthorized = coordinator.canCommit(context.stageId(), partId, attemptId) if (commitAuthorized) { - logInfo(s"Writer for stage $stageId, task $partId.$attemptId is authorized to commit.") + logInfo( + s"Writer for stage $stageId, task $partId (TID $attemptId) is authorized to commit.") dataWriter.commit() } else { - val message = s"Stage $stageId, task $partId.$attemptId: driver did not authorize commit" + val message = + s"Stage $stageId, task $partId (TID $attemptId): driver did not authorize commit" logInfo(message) // throwing CommitDeniedException will trigger the catch block for abort throw new CommitDeniedException(message, stageId, partId, attemptId) @@ -138,15 +138,15 @@ object DataWritingSparkTask extends Logging { dataWriter.commit() } - logInfo(s"Writer for stage $stageId, task $partId.$attemptId committed.") + logInfo(s"Writer for stage $stageId, task $partId (TID $attemptId) committed.") msg })(catchBlock = { // If there is an error, abort this writer - logError(s"Writer for stage $stageId, task $partId.$attemptId is aborting.") + logError(s"Writer for stage $stageId, task $partId (TID $attemptId) is aborting.") dataWriter.abort() - logError(s"Writer for stage $stageId, task $partId.$attemptId aborted.") + logError(s"Writer for stage $stageId, task $partId (TID $attemptId) aborted.") }) } } From 2e6552460eed3013e649b06b16a1d14b1e542e2d Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 21 Jun 2018 10:21:00 -0700 Subject: [PATCH 02/12] Rename attemptId -> taskId for clarity. --- .../datasources/v2/WriteToDataSourceV2.scala | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala index 84adc0f199bc2..e510809e65e76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala @@ -108,9 +108,9 @@ object DataWritingSparkTask extends Logging { useCommitCoordinator: Boolean): WriterCommitMessage = { val stageId = context.stageId() val partId = context.partitionId() - val attemptId = context.taskAttemptId().toInt + val taskId = context.taskAttemptId().toInt val epochId = Option(context.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY)).getOrElse("0") - val dataWriter = writeTask.createDataWriter(partId, attemptId, epochId.toLong) + val dataWriter = writeTask.createDataWriter(partId, taskId, epochId.toLong) // write the data and commit this writer. Utils.tryWithSafeFinallyAndFailureCallbacks(block = { @@ -120,17 +120,18 @@ object DataWritingSparkTask extends Logging { val msg = if (useCommitCoordinator) { val coordinator = SparkEnv.get.outputCommitCoordinator - val commitAuthorized = coordinator.canCommit(context.stageId(), partId, attemptId) + val commitAuthorized = coordinator.canCommit(context.stageId(), partId, + context.attemptNumber()) if (commitAuthorized) { logInfo( - s"Writer for stage $stageId, task $partId (TID $attemptId) is authorized to commit.") + s"Writer for stage $stageId, task $partId (TID $taskId) is authorized to commit.") dataWriter.commit() } else { val message = - s"Stage $stageId, task $partId (TID $attemptId): driver did not authorize commit" + s"Stage $stageId, task $partId (TID $taskId): driver did not authorize commit" logInfo(message) // throwing CommitDeniedException will trigger the catch block for abort - throw new CommitDeniedException(message, stageId, partId, attemptId) + throw new CommitDeniedException(message, stageId, partId, taskId) } } else { @@ -138,15 +139,15 @@ object DataWritingSparkTask extends Logging { dataWriter.commit() } - logInfo(s"Writer for stage $stageId, task $partId (TID $attemptId) committed.") + logInfo(s"Writer for stage $stageId, task $partId (TID $taskId) committed.") msg })(catchBlock = { // If there is an error, abort this writer - logError(s"Writer for stage $stageId, task $partId (TID $attemptId) is aborting.") + logError(s"Writer for stage $stageId, task $partId (TID $taskId) is aborting.") dataWriter.abort() - logError(s"Writer for stage $stageId, task $partId (TID $attemptId) aborted.") + logError(s"Writer for stage $stageId, task $partId (TID $taskId) aborted.") }) } } From 3561723341c3062ba7d8682ea272c549b4bdc245 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 21 Jun 2018 10:28:12 -0700 Subject: [PATCH 03/12] Use task ID instead of attempt for the Hadoop API too. --- .../org/apache/spark/internal/io/SparkHadoopWriter.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala index abf39213fa0d2..007b3876db550 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala @@ -82,7 +82,7 @@ object SparkHadoopWriter extends Logging { jobTrackerId = jobTrackerId, commitJobId = commitJobId, sparkPartitionId = context.partitionId, - sparkAttemptNumber = context.attemptNumber, + sparkTaskId = context.taskAttemptId, committer = committer, iterator = iter) }) @@ -104,12 +104,12 @@ object SparkHadoopWriter extends Logging { jobTrackerId: String, commitJobId: Int, sparkPartitionId: Int, - sparkAttemptNumber: Int, + sparkTaskId: Long, committer: FileCommitProtocol, iterator: Iterator[(K, V)]): TaskCommitMessage = { // Set up a task. val taskContext = config.createTaskAttemptContext( - jobTrackerId, commitJobId, sparkPartitionId, sparkAttemptNumber) + jobTrackerId, commitJobId, sparkPartitionId, sparkTaskId.toInt) committer.setupTask(taskContext) val (outputMetrics, callback) = initHadoopOutputMetrics(context) From fdcd39c852e9a2d70da95c37da04190910e7b2f0 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 21 Jun 2018 11:51:48 -0700 Subject: [PATCH 04/12] Log message update. --- .../sql/execution/datasources/v2/WriteToDataSourceV2.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala index eb5aaacadbb56..050cbc554ffc7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala @@ -140,15 +140,15 @@ object DataWritingSparkTask extends Logging { dataWriter.commit() } - logInfo(s"Writer for stage $stageId, task $partId (TID $taskId) committed.") + logInfo(s"Writer for stage $stageId, part $partId (TID $taskId) committed.") msg })(catchBlock = { // If there is an error, abort this writer - logError(s"Writer for stage $stageId, task $partId (TID $taskId) is aborting.") + logError(s"Writer for stage $stageId, part $partId (TID $taskId) is aborting.") dataWriter.abort() - logError(s"Writer for stage $stageId, task $partId (TID $taskId) aborted.") + logError(s"Writer for stage $stageId, part $partId (TID $taskId) aborted.") }) } } From 7233a5fd7b154e2a1400c5fac11d0356a22f5f98 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 21 Jun 2018 11:57:02 -0700 Subject: [PATCH 05/12] Javadoc updates. --- .../spark/sql/sources/v2/writer/DataSourceWriter.java | 8 ++++---- .../spark/sql/sources/v2/writer/DataWriter.java | 4 ++-- .../sql/sources/v2/writer/DataWriterFactory.java | 11 ++++------- 3 files changed, 10 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java index 0030a9f05dba7..7eedc85a5d6f3 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java @@ -64,8 +64,8 @@ public interface DataSourceWriter { DataWriterFactory createWriterFactory(); /** - * Returns whether Spark should use the commit coordinator to ensure that at most one attempt for - * each task commits. + * Returns whether Spark should use the commit coordinator to ensure that at most one task for + * each partition commits. * * @return true if commit coordinator should be used, false otherwise. */ @@ -90,9 +90,9 @@ default void onDataWriterCommit(WriterCommitMessage message) {} * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it. * * Note that speculative execution may cause multiple tasks to run for a partition. By default, - * Spark uses the commit coordinator to allow at most one attempt to commit. Implementations can + * Spark uses the commit coordinator to allow at most one task to commit. Implementations can * disable this behavior by overriding {@link #useCommitCoordinator()}. If disabled, multiple - * attempts may have committed successfully and one successful commit message per task will be + * tasks may have committed successfully and one successful commit message per task will be * passed to this commit method. The remaining commit messages are ignored by Spark. */ void commit(WriterCommitMessage[] messages); diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java index 39bf458298862..eb5f1263b7a6a 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java @@ -40,13 +40,13 @@ * writers. If this data writer fails(one record fails to write or {@link #commit()} fails), an * exception will be sent to the driver side, and Spark may retry this writing task a few times. * In each retry, {@link DataWriterFactory#createDataWriter(int, int, long)} will receive a - * different `attemptNumber`. Spark will call {@link DataSourceWriter#abort(WriterCommitMessage[])} + * different `taskId`. Spark will call {@link DataSourceWriter#abort(WriterCommitMessage[])} * when the configured number of retries is exhausted. * * Besides the retry mechanism, Spark may launch speculative tasks if the existing writing task * takes too long to finish. Different from retried tasks, which are launched one by one after the * previous one fails, speculative tasks are running simultaneously. It's possible that one input - * RDD partition has multiple data writers with different `attemptNumber` running at the same time, + * RDD partition has multiple data writers with different `taskId` running at the same time, * and data sources should guarantee that these data writers don't conflict and can work together. * Implementations can coordinate with driver during {@link #commit()} to make sure only one of * these data writers can commit successfully. Or implementations can allow all of them to commit diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java index 7527bcc0c4027..0679452e4d6c6 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java @@ -42,15 +42,12 @@ public interface DataWriterFactory extends Serializable { * Usually Spark processes many RDD partitions at the same time, * implementations should use the partition id to distinguish writers for * different partitions. - * @param attemptNumber Spark may launch multiple tasks with the same task id. For example, a task - * failed, Spark launches a new task wth the same task id but different - * attempt number. Or a task is too slow, Spark launches new tasks wth the - * same task id but different attempt number, which means there are multiple - * tasks with the same task id running at the same time. Implementations can - * use this attempt number to distinguish writers of different task attempts. + * @param taskId A unique identifier for a task that is performing the write of the partition + * data. Spark may run multiple tasks for the same partition (due to speculation + * or task failures, for example). * @param epochId A monotonically increasing id for streaming queries that are split in to * discrete periods of execution. For non-streaming queries, * this ID will always be 0. */ - DataWriter createDataWriter(int partitionId, int attemptNumber, long epochId); + DataWriter createDataWriter(int partitionId, int taskId, long epochId); } From c884f4f27199b3c91f56ba0042b42d09bc243883 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 21 Jun 2018 15:18:00 -0700 Subject: [PATCH 06/12] Change attemptNumber:Int to taskId:Long in createDataWriter API. --- .../sql/kafka010/KafkaStreamWriter.scala | 2 +- .../sql/sources/v2/writer/DataWriter.java | 4 ++-- .../sources/v2/writer/DataWriterFactory.java | 2 +- .../datasources/v2/WriteToDataSourceV2.scala | 22 +++++++++---------- .../continuous/ContinuousWriteRDD.scala | 2 +- .../sources/ForeachWriterProvider.scala | 2 +- .../sources/PackedRowWriterFactory.scala | 2 +- .../streaming/sources/memoryV2.scala | 2 +- .../sources/v2/SimpleWritableDataSource.scala | 8 +++---- 9 files changed, 23 insertions(+), 23 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala index ae5b5c52d514e..32923dc9f5a6b 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala @@ -67,7 +67,7 @@ case class KafkaStreamWriterFactory( override def createDataWriter( partitionId: Int, - attemptNumber: Int, + taskId: Long, epochId: Long): DataWriter[InternalRow] = { new KafkaStreamDataWriter(topic, producerParams, schema.toAttributes) } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java index eb5f1263b7a6a..1626c0013e4e7 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java @@ -22,7 +22,7 @@ import org.apache.spark.annotation.InterfaceStability; /** - * A data writer returned by {@link DataWriterFactory#createDataWriter(int, int, long)} and is + * A data writer returned by {@link DataWriterFactory#createDataWriter(int, long, long)} and is * responsible for writing data for an input RDD partition. * * One Spark task has one exclusive data writer, so there is no thread-safe concern. @@ -39,7 +39,7 @@ * {@link DataSourceWriter#commit(WriterCommitMessage[])} with commit messages from other data * writers. If this data writer fails(one record fails to write or {@link #commit()} fails), an * exception will be sent to the driver side, and Spark may retry this writing task a few times. - * In each retry, {@link DataWriterFactory#createDataWriter(int, int, long)} will receive a + * In each retry, {@link DataWriterFactory#createDataWriter(int, long, long)} will receive a * different `taskId`. Spark will call {@link DataSourceWriter#abort(WriterCommitMessage[])} * when the configured number of retries is exhausted. * diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java index 0679452e4d6c6..0932ff8f8f8a7 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java @@ -49,5 +49,5 @@ public interface DataWriterFactory extends Serializable { * discrete periods of execution. For non-streaming queries, * this ID will always be 0. */ - DataWriter createDataWriter(int partitionId, int taskId, long epochId); + DataWriter createDataWriter(int partitionId, long taskId, long epochId); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala index 050cbc554ffc7..7d829421a0e0c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala @@ -109,7 +109,8 @@ object DataWritingSparkTask extends Logging { val stageId = context.stageId() val stageAttempt = context.stageAttemptNumber() val partId = context.partitionId() - val taskId = context.taskAttemptId().toInt + val taskId = context.taskAttemptId() + val attemptId = context.attemptNumber() val epochId = Option(context.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY)).getOrElse("0") val dataWriter = writeTask.createDataWriter(partId, taskId, epochId.toLong) @@ -121,18 +122,17 @@ object DataWritingSparkTask extends Logging { val msg = if (useCommitCoordinator) { val coordinator = SparkEnv.get.outputCommitCoordinator - val commitAuthorized = coordinator.canCommit(stageId, stageAttempt, partId, - context.attemptNumber()) + val commitAuthorized = coordinator.canCommit(stageId, stageAttempt, partId, attemptId) if (commitAuthorized) { logInfo(s"Writer for stage $stageId.$stageAttempt, " + - s"task $partId.$taskId is authorized to commit.") + s"task $partId.$attemptId is authorized to commit.") dataWriter.commit() } else { val message = s"Stage $stageId.$stageAttempt, " + - s"task $partId.$taskId: driver did not authorize commit" + s"task $partId.$attemptId: driver did not authorize commit" logInfo(message) // throwing CommitDeniedException will trigger the catch block for abort - throw new CommitDeniedException(message, stageId, partId, taskId) + throw new CommitDeniedException(message, stageId, partId, attemptId) } } else { @@ -140,15 +140,15 @@ object DataWritingSparkTask extends Logging { dataWriter.commit() } - logInfo(s"Writer for stage $stageId, part $partId (TID $taskId) committed.") + logInfo(s"Writer for stage $stageId, task $partId.$attemptId committed.") msg })(catchBlock = { // If there is an error, abort this writer - logError(s"Writer for stage $stageId, part $partId (TID $taskId) is aborting.") + logError(s"Writer for stage $stageId, task $partId.$attemptId is aborting.") dataWriter.abort() - logError(s"Writer for stage $stageId, part $partId (TID $taskId) aborted.") + logError(s"Writer for stage $stageId, task $partId.$attemptId aborted.") }) } } @@ -159,10 +159,10 @@ class InternalRowDataWriterFactory( override def createDataWriter( partitionId: Int, - attemptNumber: Int, + taskId: Long, epochId: Long): DataWriter[InternalRow] = { new InternalRowDataWriter( - rowWriterFactory.createDataWriter(partitionId, attemptNumber, epochId), + rowWriterFactory.createDataWriter(partitionId, taskId, epochId), RowEncoder.apply(schema).resolveAndBind()) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala index ef5f0da1e7cc2..76f3f5baa8d56 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala @@ -56,7 +56,7 @@ class ContinuousWriteRDD(var prev: RDD[InternalRow], writeTask: DataWriterFactor val dataIterator = prev.compute(split, context) dataWriter = writeTask.createDataWriter( context.partitionId(), - context.attemptNumber(), + context.taskAttemptId(), EpochTracker.getCurrentEpoch.get) while (dataIterator.hasNext) { dataWriter.write(dataIterator.next()) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterProvider.scala index f677f25f116a2..bc9b6d93ce7d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterProvider.scala @@ -88,7 +88,7 @@ case class ForeachWriterFactory[T]( extends DataWriterFactory[InternalRow] { override def createDataWriter( partitionId: Int, - attemptNumber: Int, + taskId: Long, epochId: Long): ForeachDataWriter[T] = { new ForeachDataWriter(writer, rowConverter, partitionId, epochId) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala index e07355aa37dba..b501d90c81f06 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, DataWriter, Dat case object PackedRowWriterFactory extends DataWriterFactory[Row] { override def createDataWriter( partitionId: Int, - attemptNumber: Int, + taskId: Long, epochId: Long): DataWriter[Row] = { new PackedRowDataWriter() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala index 47b482007822d..29f8cca476722 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala @@ -180,7 +180,7 @@ class MemoryStreamWriter( case class MemoryWriterFactory(outputMode: OutputMode) extends DataWriterFactory[Row] { override def createDataWriter( partitionId: Int, - attemptNumber: Int, + taskId: Long, epochId: Long): DataWriter[Row] = { new MemoryDataWriter(partitionId, outputMode) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala index 694bb3b95b0f0..1334cf71ae988 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala @@ -209,10 +209,10 @@ class SimpleCSVDataWriterFactory(path: String, jobId: String, conf: Serializable override def createDataWriter( partitionId: Int, - attemptNumber: Int, + taskId: Long, epochId: Long): DataWriter[Row] = { val jobPath = new Path(new Path(path, "_temporary"), jobId) - val filePath = new Path(jobPath, s"$jobId-$partitionId-$attemptNumber") + val filePath = new Path(jobPath, s"$jobId-$partitionId-$taskId") val fs = filePath.getFileSystem(conf.value) new SimpleCSVDataWriter(fs, filePath) } @@ -245,10 +245,10 @@ class InternalRowCSVDataWriterFactory(path: String, jobId: String, conf: Seriali override def createDataWriter( partitionId: Int, - attemptNumber: Int, + taskId: Long, epochId: Long): DataWriter[InternalRow] = { val jobPath = new Path(new Path(path, "_temporary"), jobId) - val filePath = new Path(jobPath, s"$jobId-$partitionId-$attemptNumber") + val filePath = new Path(jobPath, s"$jobId-$partitionId-$taskId") val fs = filePath.getFileSystem(conf.value) new InternalRowCSVDataWriter(fs, filePath) } From 5efaae74bf340fed4223b5209bed63475cc35516 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 21 Jun 2018 16:01:21 -0700 Subject: [PATCH 07/12] Keep task IDs positive in the Hadoop task context. --- .../spark/internal/io/SparkHadoopWriter.scala | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala index 007b3876db550..5266eec86012e 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala @@ -76,13 +76,29 @@ object SparkHadoopWriter extends Logging { // Try to write all RDD partitions as a Hadoop OutputFormat. try { val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => { + // Generate a positive integer task ID that is unique for the current stage. This makes a + // few assumptions: + // - the task ID is always positive + // - stages cannot have more than Int.MaxValue + // - the sum of task counts of all active stages doesn't exceed Int.MaxValue + // + // The first two are currently the case in Spark, while the last one is very unlikely to + // occur. If it does, two tasks IDs on a single stage could have a clashing integer value, + // which could lead to code that generates clashing file names for different tasks. Still, + // if the commit coordinator is enabled, only one task would be allowed to commit. + var taskId = context.taskAttemptId + while (taskId > Int.MaxValue) { + taskId -= Int.MaxValue + } + val stageTaskId = taskId.toInt + executeTask( context = context, config = config, jobTrackerId = jobTrackerId, commitJobId = commitJobId, sparkPartitionId = context.partitionId, - sparkTaskId = context.taskAttemptId, + sparkTaskId = stageTaskId, committer = committer, iterator = iter) }) From 227d513ade176fd56f7e6d75a16deb6c654982db Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 21 Jun 2018 16:10:42 -0700 Subject: [PATCH 08/12] Remove now unnecessary cast. --- .../org/apache/spark/internal/io/SparkHadoopWriter.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala index 5266eec86012e..92f38fe5d57dc 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala @@ -120,12 +120,12 @@ object SparkHadoopWriter extends Logging { jobTrackerId: String, commitJobId: Int, sparkPartitionId: Int, - sparkTaskId: Long, + sparkTaskId: Int, committer: FileCommitProtocol, iterator: Iterator[(K, V)]): TaskCommitMessage = { // Set up a task. val taskContext = config.createTaskAttemptContext( - jobTrackerId, commitJobId, sparkPartitionId, sparkTaskId.toInt) + jobTrackerId, commitJobId, sparkPartitionId, sparkTaskId) committer.setupTask(taskContext) val (outputMetrics, callback) = initHadoopOutputMetrics(context) From a16d9f907b3ce0078da72b7e7bcc56e187cbc8f9 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 21 Jun 2018 18:04:48 -0700 Subject: [PATCH 09/12] Different approach to generate unique task ID. --- .../spark/internal/io/SparkHadoopWriter.scala | 20 ++++--------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala index 92f38fe5d57dc..0667572fde25a 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala @@ -76,21 +76,9 @@ object SparkHadoopWriter extends Logging { // Try to write all RDD partitions as a Hadoop OutputFormat. try { val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => { - // Generate a positive integer task ID that is unique for the current stage. This makes a - // few assumptions: - // - the task ID is always positive - // - stages cannot have more than Int.MaxValue - // - the sum of task counts of all active stages doesn't exceed Int.MaxValue - // - // The first two are currently the case in Spark, while the last one is very unlikely to - // occur. If it does, two tasks IDs on a single stage could have a clashing integer value, - // which could lead to code that generates clashing file names for different tasks. Still, - // if the commit coordinator is enabled, only one task would be allowed to commit. - var taskId = context.taskAttemptId - while (taskId > Int.MaxValue) { - taskId -= Int.MaxValue - } - val stageTaskId = taskId.toInt + // SPARK-24552: Generate a unique "task ID" based on the stage and task atempt numbers. + // Assumes that there won't be more than Short.MaxValue attempts, at least not concurrently. + val taskId = (context.stageAttemptNumber << 16) | context.attemptNumber executeTask( context = context, @@ -98,7 +86,7 @@ object SparkHadoopWriter extends Logging { jobTrackerId = jobTrackerId, commitJobId = commitJobId, sparkPartitionId = context.partitionId, - sparkTaskId = stageTaskId, + sparkTaskId = taskId, committer = committer, iterator = iter) }) From 503852fb1aa309da52084f4e2b11f999ede4029b Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 22 Jun 2018 09:26:44 -0700 Subject: [PATCH 10/12] Revert to previous parameter names. --- .../apache/spark/internal/io/SparkHadoopWriter.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala index 0667572fde25a..c22f90ed2f8e1 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala @@ -76,9 +76,9 @@ object SparkHadoopWriter extends Logging { // Try to write all RDD partitions as a Hadoop OutputFormat. try { val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => { - // SPARK-24552: Generate a unique "task ID" based on the stage and task atempt numbers. + // SPARK-24552: Generate a unique "attempt ID" based on the stage and task atempt numbers. // Assumes that there won't be more than Short.MaxValue attempts, at least not concurrently. - val taskId = (context.stageAttemptNumber << 16) | context.attemptNumber + val attemptId = (context.stageAttemptNumber << 16) | context.attemptNumber executeTask( context = context, @@ -86,7 +86,7 @@ object SparkHadoopWriter extends Logging { jobTrackerId = jobTrackerId, commitJobId = commitJobId, sparkPartitionId = context.partitionId, - sparkTaskId = taskId, + sparkAttemptNumber = attemptId, committer = committer, iterator = iter) }) @@ -108,12 +108,12 @@ object SparkHadoopWriter extends Logging { jobTrackerId: String, commitJobId: Int, sparkPartitionId: Int, - sparkTaskId: Int, + sparkAttemptNumber: Int, committer: FileCommitProtocol, iterator: Iterator[(K, V)]): TaskCommitMessage = { // Set up a task. val taskContext = config.createTaskAttemptContext( - jobTrackerId, commitJobId, sparkPartitionId, sparkTaskId) + jobTrackerId, commitJobId, sparkPartitionId, sparkAttemptNumber) committer.setupTask(taskContext) val (outputMetrics, callback) = initHadoopOutputMetrics(context) From 47131c5507a253bc0e342d6cfa66c50ea5b09c72 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 22 Jun 2018 12:50:11 -0700 Subject: [PATCH 11/12] Reword some log messages. --- .../datasources/v2/WriteToDataSourceV2.scala | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala index 7d829421a0e0c..b1148c0f62f7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala @@ -124,12 +124,12 @@ object DataWritingSparkTask extends Logging { val coordinator = SparkEnv.get.outputCommitCoordinator val commitAuthorized = coordinator.canCommit(stageId, stageAttempt, partId, attemptId) if (commitAuthorized) { - logInfo(s"Writer for stage $stageId.$stageAttempt, " + - s"task $partId.$attemptId is authorized to commit.") + logInfo(s"Commit authorized for partition $partId (task $taskId, attempt $attemptId" + + s"stage $stageId.$stageAttempt)") dataWriter.commit() } else { - val message = s"Stage $stageId.$stageAttempt, " + - s"task $partId.$attemptId: driver did not authorize commit" + val message = s"Commit denied for partition $partId (task $taskId, attempt $attemptId" + + s"stage $stageId.$stageAttempt)" logInfo(message) // throwing CommitDeniedException will trigger the catch block for abort throw new CommitDeniedException(message, stageId, partId, attemptId) @@ -140,15 +140,18 @@ object DataWritingSparkTask extends Logging { dataWriter.commit() } - logInfo(s"Writer for stage $stageId, task $partId.$attemptId committed.") + logInfo(s"Committed partition $partId (task $taskId, attempt $attemptId" + + s"stage $stageId.$stageAttempt)") msg })(catchBlock = { // If there is an error, abort this writer - logError(s"Writer for stage $stageId, task $partId.$attemptId is aborting.") + logError(s"Aborting commit for partition $partId (task $taskId, attempt $attemptId" + + s"stage $stageId.$stageAttempt)") dataWriter.abort() - logError(s"Writer for stage $stageId, task $partId.$attemptId aborted.") + logError(s"Aborted commit for partition $partId (task $taskId, attempt $attemptId" + + s"stage $stageId.$stageAttempt)") }) } } From e89293781c1feaed0bb7683d0059fe9663188833 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 22 Jun 2018 16:36:37 -0700 Subject: [PATCH 12/12] Typo. --- .../scala/org/apache/spark/internal/io/SparkHadoopWriter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala index c22f90ed2f8e1..9ebd0aa301592 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala @@ -76,7 +76,7 @@ object SparkHadoopWriter extends Logging { // Try to write all RDD partitions as a Hadoop OutputFormat. try { val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => { - // SPARK-24552: Generate a unique "attempt ID" based on the stage and task atempt numbers. + // SPARK-24552: Generate a unique "attempt ID" based on the stage and task attempt numbers. // Assumes that there won't be more than Short.MaxValue attempts, at least not concurrently. val attemptId = (context.stageAttemptNumber << 16) | context.attemptNumber