diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 2f701edf728e4..5be821da32c88 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -190,12 +190,14 @@ object FileFormatWriter extends Logging { global = false, child = plan).execute() } + val jobIdInstant = new Date().getTime val ret = new Array[WriteTaskResult](rdd.partitions.length) sparkSession.sparkContext.runJob( rdd, (taskContext: TaskContext, iter: Iterator[InternalRow]) => { executeTask( description = description, + jobIdInstant = jobIdInstant, sparkStageId = taskContext.stageId(), sparkPartitionId = taskContext.partitionId(), sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE, @@ -228,13 +230,14 @@ object FileFormatWriter extends Logging { /** Writes data out in a single Spark task. */ private def executeTask( description: WriteJobDescription, + jobIdInstant: Long, sparkStageId: Int, sparkPartitionId: Int, sparkAttemptNumber: Int, committer: FileCommitProtocol, iterator: Iterator[InternalRow]): WriteTaskResult = { - val jobId = SparkHadoopWriterUtils.createJobID(new Date, sparkStageId) + val jobId = SparkHadoopWriterUtils.createJobID(new Date(jobIdInstant), sparkStageId) val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId) val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)