Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration

private[libsvm] class LibSVMOutputWriter(
path: String,
stagingDir: String,
fileNamePrefix: String,
dataSchema: StructType,
context: TaskAttemptContext)
extends OutputWriter {
Expand All @@ -50,11 +51,7 @@ private[libsvm] class LibSVMOutputWriter(
private val recordWriter: RecordWriter[NullWritable, Text] = {
new TextOutputFormat[NullWritable, Text]() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val configuration = context.getConfiguration
val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
new Path(stagingDir, fileNamePrefix + extension)
}
}.getRecordWriter(context)
}
Expand Down Expand Up @@ -132,12 +129,11 @@ private[libsvm] class LibSVMFileFormat extends TextBasedFileFormat with DataSour
dataSchema: StructType): OutputWriterFactory = {
new OutputWriterFactory {
override def newInstance(
path: String,
bucketId: Option[Int],
stagingDir: String,
fileNamePrefix: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
if (bucketId.isDefined) { sys.error("LibSVM doesn't support bucketing") }
new LibSVMOutputWriter(path, dataSchema, context)
new LibSVMOutputWriter(stagingDir, fileNamePrefix, dataSchema, context)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,23 @@ abstract class OutputWriterFactory extends Serializable {
* When writing to a [[HadoopFsRelation]], this method gets called by each task on executor side
* to instantiate new [[OutputWriter]]s.
*
* @param path Path of the file to which this [[OutputWriter]] is supposed to write. Note that
* this may not point to the final output file. For example, `FileOutputFormat` writes to
* temporary directories and then merge written files back to the final destination. In
* this case, `path` points to a temporary output file under the temporary directory.
* @param stagingDir Base path (directory) of the file to which this [[OutputWriter]] is supposed
* to write. Note that this may not point to the final output file. For
* example, `FileOutputFormat` writes to temporary directories and then merge
* written files back to the final destination. In this case, `path` points to
* a temporary output file under the temporary directory.
* @param fileNamePrefix Prefix of the file name. The returned OutputWriter must make sure this
* prefix is used in the actual file name. For example, if the prefix is
* "part-1-2-3", then the file name must start with "part_1_2_3" but can
* end in arbitrary extension.
* @param dataSchema Schema of the rows to be written. Partition columns are not included in the
* schema if the relation being written is partitioned.
* @param context The Hadoop MapReduce task context.
* @since 1.4.0
*/
def newInstance(
path: String,
bucketId: Option[Int], // TODO: This doesn't belong here...
stagingDir: String,
fileNamePrefix: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ object WriteOutput extends Logging {

/** A shared job description for all the write tasks. */
private class WriteJobDescription(
val uuid: String, // prevent collision between different (appending) write jobs
val serializableHadoopConf: SerializableConfiguration,
val outputWriterFactory: OutputWriterFactory,
val allColumns: Seq[Attribute],
Expand Down Expand Up @@ -102,6 +103,7 @@ object WriteOutput extends Logging {
fileFormat.prepareWrite(sparkSession, job, options, dataColumns.toStructType)

val description = new WriteJobDescription(
uuid = UUID.randomUUID().toString,
serializableHadoopConf = new SerializableConfiguration(job.getConfiguration),
outputWriterFactory = outputWriterFactory,
allColumns = plan.output,
Expand Down Expand Up @@ -213,6 +215,11 @@ object WriteOutput extends Logging {
private trait ExecuteWriteTask {
def execute(iterator: Iterator[InternalRow]): Unit
def releaseResources(): Unit

final def filePrefix(split: Int, uuid: String, bucketId: Option[Int]): String = {
val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
f"part-r-$split%05d-$uuid$bucketString"
}
}

/** Writes data to a single directory (used for non-dynamic-partition writes). */
Expand All @@ -222,9 +229,11 @@ object WriteOutput extends Logging {
stagingPath: String) extends ExecuteWriteTask {

private[this] var outputWriter: OutputWriter = {
val split = taskAttemptContext.getTaskAttemptID.getTaskID.getId

val outputWriter = description.outputWriterFactory.newInstance(
path = stagingPath,
bucketId = None,
stagingDir = stagingPath,
fileNamePrefix = filePrefix(split, description.uuid, None),
dataSchema = description.nonPartitionColumns.toStructType,
context = taskAttemptContext)
outputWriter.initConverter(dataSchema = description.nonPartitionColumns.toStructType)
Expand Down Expand Up @@ -287,29 +296,31 @@ object WriteOutput extends Logging {
}
}

private def getBucketIdFromKey(key: InternalRow): Option[Int] =
description.bucketSpec.map { _ => key.getInt(description.partitionColumns.length) }

/**
* Open and returns a new OutputWriter given a partition key and optional bucket id.
* If bucket id is specified, we will append it to the end of the file name, but before the
* file extension, e.g. part-r-00009-ea518ad4-455a-4431-b471-d24e03814677-00002.gz.parquet
*/
private def newOutputWriter(
key: InternalRow,
getPartitionString: UnsafeProjection): OutputWriter = {
private def newOutputWriter(key: InternalRow, partString: UnsafeProjection): OutputWriter = {
val path =
if (description.partitionColumns.nonEmpty) {
val partitionPath = getPartitionString(key).getString(0)
val partitionPath = partString(key).getString(0)
new Path(stagingPath, partitionPath).toString
} else {
stagingPath
}
val bucketId = getBucketIdFromKey(key)

// If the bucket spec is defined, the bucket column is right after the partition columns
val bucketId = if (description.bucketSpec.isDefined) {
Some(key.getInt(description.partitionColumns.length))
} else {
None
}

val split = taskAttemptContext.getTaskAttemptID.getTaskID.getId
val newWriter = description.outputWriterFactory.newInstance(
path = path,
bucketId = bucketId,
stagingDir = path,
fileNamePrefix = filePrefix(split, description.uuid, bucketId),
dataSchema = description.nonPartitionColumns.toStructType,
context = taskAttemptContext)
newWriter.initConverter(description.nonPartitionColumns.toStructType)
Expand All @@ -319,7 +330,7 @@ object WriteOutput extends Logging {
override def execute(iter: Iterator[InternalRow]): Unit = {
// We should first sort by partition columns, then bucket id, and finally sorting columns.
val sortingExpressions: Seq[Expression] =
description.partitionColumns ++ bucketIdExpression ++ sortColumns
description.partitionColumns ++ bucketIdExpression ++ sortColumns
val getSortingKey = UnsafeProjection.create(sortingExpressions, description.allColumns)

val sortingKeySchema = StructType(sortingExpressions.map {
Expand All @@ -333,8 +344,8 @@ object WriteOutput extends Logging {
description.nonPartitionColumns, description.allColumns)

// Returns the partition path given a partition key.
val getPartitionString =
UnsafeProjection.create(Seq(Concat(partitionStringExpression)), description.partitionColumns)
val getPartitionString = UnsafeProjection.create(
Seq(Concat(partitionStringExpression)), description.partitionColumns)

// Sorts the data before write, so that we only need one writer at the same time.
val sorter = new UnsafeKVExternalSorter(
Expand Down Expand Up @@ -405,17 +416,6 @@ object WriteOutput extends Logging {
job.getConfiguration.setBoolean("mapred.task.is.map", true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably preserve this comment and move it to the new place where we generate the UUID.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually there already in WriteJobDescription. I shortened it to a single line.

job.getConfiguration.setInt("mapred.task.partition", 0)

// This UUID is sent to executor side together with the serialized `Configuration` object within
// the `Job` instance. `OutputWriters` on the executor side should use this UUID to generate
// unique task output files.
// This UUID is used to avoid output file name collision between different appending write jobs.
// These jobs may belong to different SparkContext instances. Concrete data source
// implementations may use this UUID to generate unique file names (e.g.,
// `part-r-<task-id>-<job-uuid>.parquet`). The reason why this ID is used to identify a job
// rather than a single task output file is that, speculative tasks must generate the same
// output file name as the original task.
job.getConfiguration.set(WriterContainer.DATASOURCE_WRITEJOBUUID, UUID.randomUUID().toString)

val taskAttemptContext = new TaskAttemptContextImpl(job.getConfiguration, taskAttemptId)
val outputCommitter = newOutputCommitter(
job.getOutputFormatClass, taskAttemptContext, path, isAppend)
Expand Down Expand Up @@ -474,7 +474,3 @@ object WriteOutput extends Logging {
}
}
}

object WriterContainer {
val DATASOURCE_WRITEJOBUUID = "spark.sql.sources.writeJobUUID"
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory, PartitionedFile, WriterContainer}
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory, PartitionedFile}
import org.apache.spark.sql.types._

object CSVRelation extends Logging {
Expand Down Expand Up @@ -170,17 +170,17 @@ object CSVRelation extends Logging {

private[csv] class CSVOutputWriterFactory(params: CSVOptions) extends OutputWriterFactory {
override def newInstance(
path: String,
bucketId: Option[Int],
stagingDir: String,
fileNamePrefix: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
if (bucketId.isDefined) sys.error("csv doesn't support bucketing")
new CsvOutputWriter(path, dataSchema, context, params)
new CsvOutputWriter(stagingDir, fileNamePrefix, dataSchema, context, params)
}
}

private[csv] class CsvOutputWriter(
path: String,
stagingDir: String,
fileNamePrefix: String,
dataSchema: StructType,
context: TaskAttemptContext,
params: CSVOptions) extends OutputWriter with Logging {
Expand All @@ -199,11 +199,7 @@ private[csv] class CsvOutputWriter(
private val recordWriter: RecordWriter[NullWritable, Text] = {
new TextOutputFormat[NullWritable, Text]() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val configuration = context.getConfiguration
val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId.csv$extension")
new Path(stagingDir, s"$fileNamePrefix.csv$extension")
}
}.getRecordWriter(context)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {

new OutputWriterFactory {
override def newInstance(
path: String,
bucketId: Option[Int],
stagingDir: String,
fileNamePrefix: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
new JsonOutputWriter(path, parsedOptions, bucketId, dataSchema, context)
new JsonOutputWriter(stagingDir, parsedOptions, fileNamePrefix, dataSchema, context)
}
}
}
Expand Down Expand Up @@ -153,9 +153,9 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
}

private[json] class JsonOutputWriter(
path: String,
stagingDir: String,
options: JSONOptions,
bucketId: Option[Int],
fileNamePrefix: String,
dataSchema: StructType,
context: TaskAttemptContext)
extends OutputWriter with Logging {
Expand All @@ -168,12 +168,7 @@ private[json] class JsonOutputWriter(
private val recordWriter: RecordWriter[NullWritable, Text] = {
new TextOutputFormat[NullWritable, Text]() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val configuration = context.getConfiguration
val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString.json$extension")
new Path(stagingDir, s"$fileNamePrefix.json$extension")
}
}.getRecordWriter(context)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.util.{Failure, Try}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.parquet.{Log => ApacheParquetLog}
import org.apache.parquet.filter2.compat.FilterCompat
Expand All @@ -45,7 +45,6 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -134,10 +133,10 @@ class ParquetFileFormat
new OutputWriterFactory {
override def newInstance(
path: String,
bucketId: Option[Int],
fileNamePrefix: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
new ParquetOutputWriter(path, bucketId, context)
new ParquetOutputWriter(path, fileNamePrefix, context)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.parquet.hadoop.util.ContextUtil

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.{BucketingUtils, OutputWriter, OutputWriterFactory, WriterContainer}
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
Expand Down Expand Up @@ -122,47 +122,29 @@ private[parquet] class ParquetOutputWriterFactory(
}

/** Disable the use of the older API. */
def newInstance(
override def newInstance(
path: String,
bucketId: Option[Int],
fileNamePrefix: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
throw new UnsupportedOperationException(
"this version of newInstance not supported for " +
throw new UnsupportedOperationException("this version of newInstance not supported for " +
"ParquetOutputWriterFactory")
}
}


// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
private[parquet] class ParquetOutputWriter(
path: String,
bucketId: Option[Int],
stagingDir: String,
fileNamePrefix: String,
context: TaskAttemptContext)
extends OutputWriter {

private val recordWriter: RecordWriter[Void, InternalRow] = {
val outputFormat = {
new ParquetOutputFormat[InternalRow]() {
// Here we override `getDefaultWorkFile` for two reasons:
//
// 1. To allow appending. We need to generate unique output file names to avoid
// overwriting existing files (either exist before the write job, or are just written
// by other tasks within the same write job).
//
// 2. To allow dynamic partitioning. Default `getDefaultWorkFile` uses
// `FileOutputCommitter.getWorkPath()`, which points to the base directory of all
// partitions in the case of dynamic partitioning.
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
val configuration = context.getConfiguration
val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
// It has the `.parquet` extension at the end because (de)compression tools
// such as gunzip would not be able to decompress this as the compression
// is not applied on this whole file but on each "page" in Parquet format.
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString$extension")
new Path(stagingDir, fileNamePrefix + extension)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove this comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically the only contract now is that prefix needs to be enforced, and it is not the concern of these classes to think about dynamic partitioning or appending.

}
}
}
Expand Down
Loading