Skip to content

Commit cfba7bb

Browse files
committed
Simplify output committer path.
1 parent d405a3c commit cfba7bb

File tree

1 file changed

+27
-61
lines changed

1 file changed

+27
-61
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala

Lines changed: 27 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,8 @@ object WriteOutput extends Logging {
152152
val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
153153
val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
154154

155-
// Set up the attempt context required to use in the output committers.
155+
// Set up the attempt context required to use in the output committer.
156156
val taskAttemptContext: TaskAttemptContext = {
157-
158157
// Set up the configuration object
159158
val hadoopConf = description.serializableHadoopConf.value
160159
hadoopConf.set("mapred.job.id", jobId.toString)
@@ -170,11 +169,18 @@ object WriteOutput extends Logging {
170169
description.outputFormatClass, taskAttemptContext, description.path, description.isAppend)
171170
committer.setupTask(taskAttemptContext)
172171

172+
// Figure out where we need to write data to for staging.
173+
// For FileOutputCommitter it has its own staging path called "work path".
174+
val stagingPath = committer match {
175+
case f: FileOutputCommitter => f.getWorkPath.toString
176+
case _ => description.path
177+
}
178+
173179
val writeTask =
174180
if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) {
175-
new SingleDirectoryWriteTask(description, taskAttemptContext, committer)
181+
new SingleDirectoryWriteTask(description, taskAttemptContext, stagingPath)
176182
} else {
177-
new DynamicPartitionWriteTask(description, taskAttemptContext, committer)
183+
new DynamicPartitionWriteTask(description, taskAttemptContext, stagingPath)
178184
}
179185

180186
try {
@@ -210,15 +216,14 @@ object WriteOutput extends Logging {
210216
private class SingleDirectoryWriteTask(
211217
description: WriteJobDescription,
212218
taskAttemptContext: TaskAttemptContext,
213-
committer: OutputCommitter) extends ExecuteWriteTask {
219+
stagingPath: String) extends ExecuteWriteTask {
214220

215221
private[this] var outputWriter: OutputWriter = {
216-
outputWriter = createOutputWriter(
217-
description.outputWriterFactory,
218-
getWorkPath(committer, description.path),
219-
description.nonPartitionColumns,
220-
taskAttemptContext,
221-
committer)
222+
val outputWriter = description.outputWriterFactory.newInstance(
223+
path = stagingPath,
224+
bucketId = None,
225+
dataSchema = description.nonPartitionColumns.toStructType,
226+
context = taskAttemptContext)
222227
outputWriter.initConverter(dataSchema = description.nonPartitionColumns.toStructType)
223228
outputWriter
224229
}
@@ -246,7 +251,7 @@ object WriteOutput extends Logging {
246251
private class DynamicPartitionWriteTask(
247252
description: WriteJobDescription,
248253
taskAttemptContext: TaskAttemptContext,
249-
committer: OutputCommitter) extends ExecuteWriteTask {
254+
stagingPath: String) extends ExecuteWriteTask {
250255

251256
private var currentWriter: OutputWriter = _
252257

@@ -291,22 +296,20 @@ object WriteOutput extends Logging {
291296
private def newOutputWriter(
292297
key: InternalRow,
293298
getPartitionString: UnsafeProjection): OutputWriter = {
294-
val basePath = getWorkPath(committer, description.path.toString)
295299
val path =
296300
if (description.partitionColumns.nonEmpty) {
297301
val partitionPath = getPartitionString(key).getString(0)
298-
new Path(basePath, partitionPath).toString
302+
new Path(stagingPath, partitionPath).toString
299303
} else {
300-
basePath
304+
stagingPath
301305
}
302306
val bucketId = getBucketIdFromKey(key)
303-
val newWriter = createOutputWriter(
304-
description.outputWriterFactory,
305-
path.toString,
306-
description.nonPartitionColumns,
307-
taskAttemptContext,
308-
committer,
309-
bucketId)
307+
308+
val newWriter = description.outputWriterFactory.newInstance(
309+
path = path,
310+
bucketId = bucketId,
311+
dataSchema = description.nonPartitionColumns.toStructType,
312+
context = taskAttemptContext)
310313
newWriter.initConverter(description.nonPartitionColumns.toStructType)
311314
newWriter
312315
}
@@ -324,8 +327,8 @@ object WriteOutput extends Logging {
324327
})
325328

326329
// Returns the data columns to be written given an input row
327-
val getOutputRow =
328-
UnsafeProjection.create(description.nonPartitionColumns, description.allColumns)
330+
val getOutputRow = UnsafeProjection.create(
331+
description.nonPartitionColumns, description.allColumns)
329332

330333
// Returns the partition path given a partition key.
331334
val getPartitionString =
@@ -387,43 +390,6 @@ object WriteOutput extends Logging {
387390
}
388391
}
389392

390-
def getWorkPath(committer: OutputCommitter, defaultPath: String): String = {
391-
committer match {
392-
// FileOutputCommitter writes to a temporary location returned by `getWorkPath`.
393-
case f: FileOutputCommitter => f.getWorkPath.toString
394-
case _ => defaultPath
395-
}
396-
}
397-
398-
def createOutputWriter(
399-
outputWriterFactory: OutputWriterFactory,
400-
path: String,
401-
nonPartitionColumns: Seq[Attribute],
402-
taskAttemptContext: TaskAttemptContext,
403-
committer: OutputCommitter,
404-
bucketId: Option[Int] = None): OutputWriter = {
405-
try {
406-
outputWriterFactory.newInstance(
407-
path,
408-
bucketId,
409-
nonPartitionColumns.toStructType,
410-
taskAttemptContext)
411-
} catch {
412-
case e: org.apache.hadoop.fs.FileAlreadyExistsException =>
413-
if (committer.getClass.getName.contains("Direct")) {
414-
// SPARK-11382: DirectParquetOutputCommitter is not idempotent, meaning on retry
415-
// attempts, the task will fail because the output file is created from a prior attempt.
416-
// This often means the most visible error to the user is misleading. Augment the error
417-
// to tell the user to look for the actual error.
418-
throw new SparkException("The output file already exists but this could be due to a " +
419-
"failure from an earlier attempt. Look through the earlier logs or stage page for " +
420-
"the first error.\n File exists error: " + e, e)
421-
} else {
422-
throw e
423-
}
424-
}
425-
}
426-
427393
def setupDriverCommitter(job: Job, path: String, isAppend: Boolean): OutputCommitter = {
428394
// Setup IDs
429395
val jobId = SparkHadoopWriter.createJobID(new Date, 0)

0 commit comments

Comments
 (0)