Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ private[sql] abstract class BaseWriterContainer(
@transient private val jobContext: JobContext = job

// The following fields are initialized and used on both driver and executor side.
@transient protected var outputCommitter: FileOutputCommitter = _
@transient protected var outputCommitter: OutputCommitter = _
@transient private var jobId: JobID = _
@transient private var taskId: TaskID = _
@transient private var taskAttemptId: TaskAttemptID = _
Expand Down Expand Up @@ -282,14 +282,18 @@ private[sql] abstract class BaseWriterContainer(
initWriters()
}

private def newOutputCommitter(context: TaskAttemptContext): FileOutputCommitter = {
outputFormatClass.newInstance().getOutputCommitter(context) match {
case f: FileOutputCommitter => f
case f => sys.error(
s"FileOutputCommitter or its subclass is expected, but got a ${f.getClass.getName}.")
protected def getWorkPath: String = {
outputCommitter match {
// FileOutputCommitter writes to a temporary location returned by `getWorkPath`.
case f: FileOutputCommitter => f.getWorkPath.toString
case _ => outputPath
}
}

private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
outputFormatClass.newInstance().getOutputCommitter(context)
}

private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
this.jobId = SparkHadoopWriter.createJobID(new Date, jobId)
this.taskId = new TaskID(this.jobId, true, splitId)
Expand Down Expand Up @@ -339,7 +343,7 @@ private[sql] class DefaultWriterContainer(

override protected def initWriters(): Unit = {
writer = outputWriterClass.newInstance()
writer.init(outputCommitter.getWorkPath.toString, dataSchema, taskAttemptContext)
writer.init(getWorkPath, dataSchema, taskAttemptContext)
}

override def outputWriterForRow(row: Row): OutputWriter = writer
Expand Down Expand Up @@ -381,7 +385,7 @@ private[sql] class DynamicPartitionWriterContainer(
}.mkString

outputWriters.getOrElseUpdate(partitionPath, {
val path = new Path(outputCommitter.getWorkPath, partitionPath.stripPrefix(Path.SEPARATOR))
val path = new Path(getWorkPath, partitionPath.stripPrefix(Path.SEPARATOR))
val writer = outputWriterClass.newInstance()
writer.init(path.toString, dataSchema, taskAttemptContext)
writer
Expand Down