Skip to content

Commit 41df43f

Browse files
rdblueMarcelo Vanzin
authored andcommitted
[SPARK-26873][SQL] Use a consistent timestamp to build Hadoop Job IDs.
## What changes were proposed in this pull request? Backport SPARK-26873 (#23777) to branch-2.3. ## How was this patch tested? Existing tests to cover regressions. Closes #23832 from rdblue/SPARK-26873-branch-2.3. Authored-by: Ryan Blue <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent 214b6b2 commit 41df43f

File tree

1 file changed

+4
-1
lines changed

1 file changed

+4
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,12 +190,14 @@ object FileFormatWriter extends Logging {
190190
global = false,
191191
child = plan).execute()
192192
}
193+
val jobIdInstant = new Date().getTime
193194
val ret = new Array[WriteTaskResult](rdd.partitions.length)
194195
sparkSession.sparkContext.runJob(
195196
rdd,
196197
(taskContext: TaskContext, iter: Iterator[InternalRow]) => {
197198
executeTask(
198199
description = description,
200+
jobIdInstant = jobIdInstant,
199201
sparkStageId = taskContext.stageId(),
200202
sparkPartitionId = taskContext.partitionId(),
201203
sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE,
@@ -228,13 +230,14 @@ object FileFormatWriter extends Logging {
228230
/** Writes data out in a single Spark task. */
229231
private def executeTask(
230232
description: WriteJobDescription,
233+
jobIdInstant: Long,
231234
sparkStageId: Int,
232235
sparkPartitionId: Int,
233236
sparkAttemptNumber: Int,
234237
committer: FileCommitProtocol,
235238
iterator: Iterator[InternalRow]): WriteTaskResult = {
236239

237-
val jobId = SparkHadoopWriterUtils.createJobID(new Date, sparkStageId)
240+
val jobId = SparkHadoopWriterUtils.createJobID(new Date(jobIdInstant), sparkStageId)
238241
val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
239242
val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
240243

0 commit comments

Comments
 (0)