diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala index 1ce9ae4266c1a..f5dd5ce22919d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala @@ -32,9 +32,9 @@ import org.apache.spark.util.Utils object FileCommitProtocol { - class TaskCommitMessage(obj: Any) extends Serializable + class TaskCommitMessage(val obj: Any) extends Serializable - object EmptyTaskCommitMessage extends TaskCommitMessage(Unit) + object EmptyTaskCommitMessage extends TaskCommitMessage(null) /** * Instantiates a FileCommitProtocol using the given className. @@ -62,8 +62,11 @@ object FileCommitProtocol { /** - * An interface to define how a Spark job commits its outputs. Implementations must be serializable, - * as the committer instance instantiated on the driver will be used for tasks on executors. + * An interface to define how a single Spark job commits its outputs. Two notes: + * + * 1. Implementations must be serializable, as the committer instance instantiated on the driver + * will be used for tasks on executors. + * 2. A committer should not be reused across multiple Spark jobs. * * The proper call sequence is: * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index 9d153cec731a8..4f4aaaa5026fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -55,20 +55,6 @@ trait FileFormat { options: Map[String, String], dataSchema: StructType): OutputWriterFactory - /** - * Returns a [[OutputWriterFactory]] for generating output writers that can write data. - * This method is current used only by FileStreamSinkWriter to generate output writers that - * does not use output committers to write data. The OutputWriter generated by the returned - * [[OutputWriterFactory]] must implement the method `newWriter(path)`.. - */ - def buildWriter( - sqlContext: SQLContext, - dataSchema: StructType, - options: Map[String, String]): OutputWriterFactory = { - // TODO: Remove this default implementation when the other formats have been ported - throw new UnsupportedOperationException(s"buildWriter is not supported for $this") - } - /** * Returns whether this format support returning columnar batch or not. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala similarity index 97% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index a07855111b401..bc00a0a749c09 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -43,8 +43,8 @@ import org.apache.spark.util.{SerializableConfiguration, Utils} import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter -/** A helper object for writing data out to a location. */ -object WriteOutput extends Logging { +/** A helper object for writing FileFormat data out to a location. */ +object FileFormatWriter extends Logging { /** A shared job description for all the write tasks. */ private class WriteJobDescription( @@ -55,7 +55,6 @@ object WriteOutput extends Logging { val partitionColumns: Seq[Attribute], val nonPartitionColumns: Seq[Attribute], val bucketSpec: Option[BucketSpec], - val isAppend: Boolean, val path: String) extends Serializable { @@ -82,18 +81,18 @@ object WriteOutput extends Logging { sparkSession: SparkSession, plan: LogicalPlan, fileFormat: FileFormat, - outputPath: Path, + committer: FileCommitProtocol, + outputPath: String, hadoopConf: Configuration, partitionColumns: Seq[Attribute], bucketSpec: Option[BucketSpec], refreshFunction: (Seq[TablePartitionSpec]) => Unit, - options: Map[String, String], - isAppend: Boolean): Unit = { + options: Map[String, String]): Unit = { val job = Job.getInstance(hadoopConf) job.setOutputKeyClass(classOf[Void]) job.setOutputValueClass(classOf[InternalRow]) - FileOutputFormat.setOutputPath(job, outputPath) + FileOutputFormat.setOutputPath(job, new Path(outputPath)) val partitionSet = AttributeSet(partitionColumns) val dataColumns = plan.output.filterNot(partitionSet.contains) @@ -111,16 +110,11 @@ object WriteOutput extends Logging { partitionColumns = partitionColumns, nonPartitionColumns = dataColumns, bucketSpec = bucketSpec, - isAppend = isAppend, - path = outputPath.toString) + path = outputPath) SQLExecution.withNewExecutionId(sparkSession, queryExecution) { // This call shouldn't be put into the `try` block below because it only initializes and // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called. - val committer = FileCommitProtocol.instantiate( - sparkSession.sessionState.conf.fileCommitProtocolClass, - outputPath.toString, - isAppend) committer.setupJob(job) try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index a1221d0ae6d27..230c74a47ba2a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -84,17 +84,22 @@ case class InsertIntoHadoopFsRelationCommand( val isAppend = pathExists && (mode == SaveMode.Append) if (doInsertion) { - WriteOutput.write( - sparkSession, - query, - fileFormat, - qualifiedOutputPath, - hadoopConf, - partitionColumns, - bucketSpec, - refreshFunction, - options, + val committer = FileCommitProtocol.instantiate( + sparkSession.sessionState.conf.fileCommitProtocolClass, + outputPath.toString, isAppend) + + FileFormatWriter.write( + sparkSession = sparkSession, + plan = query, + fileFormat = fileFormat, + committer = committer, + outputPath = qualifiedOutputPath.toString, + hadoopConf = hadoopConf, + partitionColumns = partitionColumns, + bucketSpec = bucketSpec, + refreshFunction = refreshFunction, + options = options) } else { logInfo("Skipping insertion into a relation that already exists.") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 77c83ba38efee..b8ea7f40c4ab3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -415,17 +415,6 @@ class ParquetFileFormat } } } - - override def buildWriter( - sqlContext: SQLContext, - dataSchema: StructType, - options: Map[String, String]): OutputWriterFactory = { - new ParquetOutputWriterFactory( - sqlContext.conf, - dataSchema, - sqlContext.sessionState.newHadoopConf(), - options) - } } object ParquetFileFormat extends Logging { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala index 92d4f27be3fd5..5c0f8af17a232 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala @@ -17,125 +17,13 @@ package org.apache.spark.sql.execution.datasources.parquet -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import org.apache.parquet.hadoop.{ParquetOutputFormat, ParquetRecordWriter} -import org.apache.parquet.hadoop.codec.CodecConfig -import org.apache.parquet.hadoop.util.ContextUtil +import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.StructType -import org.apache.spark.util.SerializableConfiguration - - -/** - * A factory for generating OutputWriters for writing parquet files. This implemented is different - * from the [[ParquetOutputWriter]] as this does not use any [[OutputCommitter]]. It simply - * writes the data to the path used to generate the output writer. Callers of this factory - * has to ensure which files are to be considered as committed. - */ -private[parquet] class ParquetOutputWriterFactory( - sqlConf: SQLConf, - dataSchema: StructType, - hadoopConf: Configuration, - options: Map[String, String]) - extends OutputWriterFactory { - - private val serializableConf: SerializableConfiguration = { - val job = Job.getInstance(hadoopConf) - val conf = ContextUtil.getConfiguration(job) - val parquetOptions = new ParquetOptions(options, sqlConf) - - // We're not really using `ParquetOutputFormat[Row]` for writing data here, because we override - // it in `ParquetOutputWriter` to support appending and dynamic partitioning. The reason why - // we set it here is to setup the output committer class to `ParquetOutputCommitter`, which is - // bundled with `ParquetOutputFormat[Row]`. - job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]]) - - ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport]) - - // We want to clear this temporary metadata from saving into Parquet file. - // This metadata is only useful for detecting optional columns when pushing down filters. - val dataSchemaToWrite = StructType.removeMetadata( - StructType.metadataKeyForOptionalField, - dataSchema).asInstanceOf[StructType] - ParquetWriteSupport.setSchema(dataSchemaToWrite, conf) - - // Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema) - // and `CatalystWriteSupport` (writing actual rows to Parquet files). - conf.set( - SQLConf.PARQUET_BINARY_AS_STRING.key, - sqlConf.isParquetBinaryAsString.toString) - - conf.set( - SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, - sqlConf.isParquetINT96AsTimestamp.toString) - - conf.set( - SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, - sqlConf.writeLegacyParquetFormat.toString) - - // Sets compression scheme - conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName) - new SerializableConfiguration(conf) - } - - /** - * Returns a [[OutputWriter]] that writes data to the give path without using - * [[OutputCommitter]]. - */ - override def newWriter(path: String): OutputWriter = new OutputWriter { - - // Create TaskAttemptContext that is used to pass on Configuration to the ParquetRecordWriter - private val hadoopTaskAttemptId = new TaskAttemptID(new TaskID(new JobID, TaskType.MAP, 0), 0) - private val hadoopAttemptContext = new TaskAttemptContextImpl( - serializableConf.value, hadoopTaskAttemptId) - - // Instance of ParquetRecordWriter that does not use OutputCommitter - private val recordWriter = createNoCommitterRecordWriter(path, hadoopAttemptContext) - - override def write(row: Row): Unit = { - throw new UnsupportedOperationException("call writeInternal") - } - - protected[sql] override def writeInternal(row: InternalRow): Unit = { - recordWriter.write(null, row) - } - - override def close(): Unit = recordWriter.close(hadoopAttemptContext) - } - - /** Create a [[ParquetRecordWriter]] that writes the given path without using OutputCommitter */ - private def createNoCommitterRecordWriter( - path: String, - hadoopAttemptContext: TaskAttemptContext): RecordWriter[Void, InternalRow] = { - // Custom ParquetOutputFormat that disable use of committer and writes to the given path - val outputFormat = new ParquetOutputFormat[InternalRow]() { - override def getOutputCommitter(c: TaskAttemptContext): OutputCommitter = { null } - override def getDefaultWorkFile(c: TaskAttemptContext, ext: String): Path = { new Path(path) } - } - outputFormat.getRecordWriter(hadoopAttemptContext) - } - - /** Disable the use of the older API. */ - override def newInstance( - path: String, - dataSchema: StructType, - context: TaskAttemptContext): OutputWriter = { - throw new UnsupportedOperationException("this version of newInstance not supported for " + - "ParquetOutputWriterFactory") - } - - override def getFileExtension(context: TaskAttemptContext): String = { - CodecConfig.from(context).getCodec.getExtension + ".parquet" - } -} - +import org.apache.spark.sql.execution.datasources.OutputWriter // NOTE: This class is instantiated and used on executor side only, no need to be serializable. private[parquet] class ParquetOutputWriter(path: String, context: TaskAttemptContext) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 02c5b857ee7fe..daec2b5450971 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -17,23 +17,12 @@ package org.apache.spark.sql.execution.streaming -import java.util.UUID - -import scala.collection.mutable.ArrayBuffer - -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.spark.{SparkEnv, SparkException, TaskContext, TaskContextImpl} import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.execution.UnsafeKVExternalSorter -import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, PartitioningUtils} -import org.apache.spark.sql.types.{StringType, StructType} -import org.apache.spark.util.SerializableConfiguration -import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter +import org.apache.spark.sql.execution.datasources.{FileCommitProtocol, FileFormat, FileFormatWriter} object FileStreamSink { // The name of the subdirectory that is used to store metadata about which files are valid. @@ -59,207 +48,41 @@ class FileStreamSink( private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toUri.toString) private val hadoopConf = sparkSession.sessionState.newHadoopConf() - private val fs = basePath.getFileSystem(hadoopConf) override def addBatch(batchId: Long, data: DataFrame): Unit = { if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) { logInfo(s"Skipping already committed batch $batchId") } else { - val writer = new FileStreamSinkWriter( - data, fileFormat, path, partitionColumnNames, hadoopConf, options) - val fileStatuses = writer.write() - if (fileLog.add(batchId, fileStatuses)) { - logInfo(s"Committed batch $batchId") - } else { - throw new IllegalStateException(s"Race while writing batch $batchId") + val committer = FileCommitProtocol.instantiate( + sparkSession.sessionState.conf.streamingFileCommitProtocolClass, path, isAppend = false) + committer match { + case manifestCommitter: ManifestFileCommitProtocol => + manifestCommitter.setupManifestOptions(fileLog, batchId) + case _ => // Do nothing } - } - } - - override def toString: String = s"FileSink[$path]" -} - - -/** - * Writes data given to a [[FileStreamSink]] to the given `basePath` in the given `fileFormat`, - * partitioned by the given `partitionColumnNames`. This writer always appends data to the - * directory if it already has data. - */ -class FileStreamSinkWriter( - data: DataFrame, - fileFormat: FileFormat, - basePath: String, - partitionColumnNames: Seq[String], - hadoopConf: Configuration, - options: Map[String, String]) extends Serializable with Logging { - - PartitioningUtils.validatePartitionColumn( - data.schema, partitionColumnNames, data.sqlContext.conf.caseSensitiveAnalysis) - - private val serializableConf = new SerializableConfiguration(hadoopConf) - private val dataSchema = data.schema - private val dataColumns = data.logicalPlan.output - - // Get the actual partition columns as attributes after matching them by name with - // the given columns names. - private val partitionColumns = partitionColumnNames.map { col => - val nameEquality = data.sparkSession.sessionState.conf.resolver - data.logicalPlan.output.find(f => nameEquality(f.name, col)).getOrElse { - throw new RuntimeException(s"Partition column $col not found in schema $dataSchema") - } - } - - // Columns that are to be written to the files. If there are partitioning columns, then - // those will not be written to the files. - private val writeColumns = { - val partitionSet = AttributeSet(partitionColumns) - dataColumns.filterNot(partitionSet.contains) - } - - // An OutputWriterFactory for generating writers in the executors for writing the files. - private val outputWriterFactory = - fileFormat.buildWriter(data.sqlContext, writeColumns.toStructType, options) - - /** Expressions that given a partition key build a string like: col1=val/col2=val/... */ - private def partitionStringExpression: Seq[Expression] = { - partitionColumns.zipWithIndex.flatMap { case (c, i) => - val escaped = - ScalaUDF( - PartitioningUtils.escapePathName _, - StringType, - Seq(Cast(c, StringType)), - Seq(StringType)) - val str = If(IsNull(c), Literal(PartitioningUtils.DEFAULT_PARTITION_NAME), escaped) - val partitionName = Literal(c.name + "=") :: str :: Nil - if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName - } - } - - /** Generate a new output writer from the writer factory */ - private def newOutputWriter(path: Path): OutputWriter = { - val newWriter = outputWriterFactory.newWriter(path.toString) - newWriter.initConverter(dataSchema) - newWriter - } - /** Write the dataframe to files. This gets called in the driver by the [[FileStreamSink]]. */ - def write(): Array[SinkFileStatus] = { - data.sqlContext.sparkContext.runJob( - data.queryExecution.toRdd, - (taskContext: TaskContext, iterator: Iterator[InternalRow]) => { - if (partitionColumns.isEmpty) { - Seq(writePartitionToSingleFile(iterator)) - } else { - writePartitionToPartitionedFiles(iterator) + // Get the actual partition columns as attributes after matching them by name with + // the given columns names. + val partitionColumns: Seq[Attribute] = partitionColumnNames.map { col => + val nameEquality = data.sparkSession.sessionState.conf.resolver + data.logicalPlan.output.find(f => nameEquality(f.name, col)).getOrElse { + throw new RuntimeException(s"Partition column $col not found in schema ${data.schema}") } - }).flatten - } - - /** - * Writes a RDD partition to a single file without dynamic partitioning. - * This gets called in the executor, and it uses a [[OutputWriter]] to write the data. - */ - def writePartitionToSingleFile(iterator: Iterator[InternalRow]): SinkFileStatus = { - var writer: OutputWriter = null - try { - val path = new Path(basePath, UUID.randomUUID.toString) - val fs = path.getFileSystem(serializableConf.value) - writer = newOutputWriter(path) - while (iterator.hasNext) { - writer.writeInternal(iterator.next) - } - writer.close() - writer = null - SinkFileStatus(fs.getFileStatus(path)) - } catch { - case cause: Throwable => - logError("Aborting task.", cause) - // call failure callbacks first, so we could have a chance to cleanup the writer. - TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause) - throw new SparkException("Task failed while writing rows.", cause) - } finally { - if (writer != null) { - writer.close() } - } - } - - /** - * Writes a RDD partition to multiple dynamically partitioned files. - * This gets called in the executor. It first sorts the data based on the partitioning columns - * and then writes the data of each key to separate files using [[OutputWriter]]s. - */ - def writePartitionToPartitionedFiles(iterator: Iterator[InternalRow]): Seq[SinkFileStatus] = { - - // Returns the partitioning columns for sorting - val getSortingKey = UnsafeProjection.create(partitionColumns, dataColumns) - - // Returns the data columns to be written given an input row - val getOutputRow = UnsafeProjection.create(writeColumns, dataColumns) - - // Returns the partition path given a partition key - val getPartitionString = - UnsafeProjection.create(Concat(partitionStringExpression) :: Nil, partitionColumns) - // Sort the data before write, so that we only need one writer at the same time. - val sorter = new UnsafeKVExternalSorter( - partitionColumns.toStructType, - StructType.fromAttributes(writeColumns), - SparkEnv.get.blockManager, - SparkEnv.get.serializerManager, - TaskContext.get().taskMemoryManager().pageSizeBytes, - SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", - UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD)) - - while (iterator.hasNext) { - val currentRow = iterator.next() - sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow)) - } - logDebug(s"Sorting complete. Writing out partition files one at a time.") - - val sortedIterator = sorter.sortedIterator() - val paths = new ArrayBuffer[Path] - - // Write the sorted data to partitioned files, one for each unique key - var currentWriter: OutputWriter = null - try { - var currentKey: UnsafeRow = null - while (sortedIterator.next()) { - val nextKey = sortedIterator.getKey - - // If key changes, close current writer, and open a new writer to a new partitioned file - if (currentKey != nextKey) { - if (currentWriter != null) { - currentWriter.close() - currentWriter = null - } - currentKey = nextKey.copy() - val partitionPath = getPartitionString(currentKey).getString(0) - val path = new Path(new Path(basePath, partitionPath), UUID.randomUUID.toString) - paths += path - currentWriter = newOutputWriter(path) - logInfo(s"Writing partition $currentKey to $path") - } - currentWriter.writeInternal(sortedIterator.getValue) - } - if (currentWriter != null) { - currentWriter.close() - currentWriter = null - } - if (paths.nonEmpty) { - val fs = paths.head.getFileSystem(serializableConf.value) - paths.map(p => SinkFileStatus(fs.getFileStatus(p))) - } else Seq.empty - } catch { - case cause: Throwable => - logError("Aborting task.", cause) - // call failure callbacks first, so we could have a chance to cleanup the writer. - TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause) - throw new SparkException("Task failed while writing rows.", cause) - } finally { - if (currentWriter != null) { - currentWriter.close() - } + FileFormatWriter.write( + sparkSession = sparkSession, + plan = data.logicalPlan, + fileFormat = fileFormat, + committer = committer, + outputPath = path, + hadoopConf = hadoopConf, + partitionColumns = partitionColumns, + bucketSpec = None, + refreshFunction = _ => (), + options = options) } } + + override def toString: String = s"FileSink[$path]" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala new file mode 100644 index 0000000000000..510312267a98d --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.UUID + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.execution.datasources.FileCommitProtocol +import org.apache.spark.sql.execution.datasources.FileCommitProtocol.TaskCommitMessage + +/** + * A [[FileCommitProtocol]] that tracks the list of valid files in a manifest file, used in + * structured streaming. + * + * @param path path to write the final output to. + */ +class ManifestFileCommitProtocol(path: String) + extends FileCommitProtocol with Serializable with Logging { + + // Track the list of files added by a task, only used on the executors. + @transient private var addedFiles: ArrayBuffer[String] = _ + + @transient private var fileLog: FileStreamSinkLog = _ + private var batchId: Long = _ + + /** + * Sets up the manifest log output and the batch id for this job. + * Must be called before any other function. + */ + def setupManifestOptions(fileLog: FileStreamSinkLog, batchId: Long): Unit = { + this.fileLog = fileLog + this.batchId = batchId + } + + override def setupJob(jobContext: JobContext): Unit = { + require(fileLog != null, "setupManifestOptions must be called before this function") + // Do nothing + } + + override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { + require(fileLog != null, "setupManifestOptions must be called before this function") + val fileStatuses = taskCommits.flatMap(_.obj.asInstanceOf[Seq[SinkFileStatus]]).toArray + + if (fileLog.add(batchId, fileStatuses)) { + logInfo(s"Committed batch $batchId") + } else { + throw new IllegalStateException(s"Race while writing batch $batchId") + } + } + + override def abortJob(jobContext: JobContext): Unit = { + require(fileLog != null, "setupManifestOptions must be called before this function") + // Do nothing + } + + override def setupTask(taskContext: TaskAttemptContext): Unit = { + addedFiles = new ArrayBuffer[String] + } + + override def newTaskTempFile( + taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = { + // The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet + // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, + // the file name is fine and won't overflow. + val split = taskContext.getTaskAttemptID.getTaskID.getId + val uuid = UUID.randomUUID.toString + val filename = f"part-$split%05d-$uuid$ext" + + val file = dir.map { d => + new Path(new Path(path, d), filename).toString + }.getOrElse { + new Path(path, filename).toString + } + + addedFiles += file + file + } + + override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { + if (addedFiles.nonEmpty) { + val fs = new Path(addedFiles.head).getFileSystem(taskContext.getConfiguration) + val statuses: Seq[SinkFileStatus] = + addedFiles.map(f => SinkFileStatus(fs.getFileStatus(new Path(f)))) + new TaskCommitMessage(statuses) + } else { + new TaskCommitMessage(Seq.empty[SinkFileStatus]) + } + } + + override def abortTask(taskContext: TaskAttemptContext): Unit = { + // Do nothing + // TODO: we can also try delete the addedFiles as a best-effort cleanup. + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 29e79847aa38b..7bb3ac02fa5d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -31,6 +31,7 @@ import org.apache.spark.internal.config._ import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.catalyst.CatalystConf import org.apache.spark.sql.execution.datasources.HadoopCommitProtocolWrapper +import org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol import org.apache.spark.util.Utils //////////////////////////////////////////////////////////////////////////////////////////////////// @@ -523,7 +524,7 @@ object SQLConf { SQLConfigBuilder("spark.sql.streaming.commitProtocolClass") .internal() .stringConf - .createWithDefault(classOf[HadoopCommitProtocolWrapper].getName) + .createWithDefault(classOf[ManifestFileCommitProtocol].getName) val FILE_SINK_LOG_DELETION = SQLConfigBuilder("spark.sql.streaming.fileSink.log.deletion") .internal() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 18b42a81a098c..902cf05344716 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -17,106 +17,16 @@ package org.apache.spark.sql.streaming -import java.io.File - -import org.apache.commons.io.FileUtils -import org.apache.commons.io.filefilter.{DirectoryFileFilter, RegexFileFilter} - import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.streaming.{FileStreamSinkWriter, MemoryStream, MetadataLogFileIndex} -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.execution.streaming.{MemoryStream, MetadataLogFileIndex} import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.util.Utils class FileStreamSinkSuite extends StreamTest { import testImplicits._ - - test("FileStreamSinkWriter - unpartitioned data") { - val path = Utils.createTempDir() - path.delete() - - val hadoopConf = spark.sparkContext.hadoopConfiguration - val fileFormat = new parquet.ParquetFileFormat() - - def writeRange(start: Int, end: Int, numPartitions: Int): Seq[String] = { - val df = spark - .range(start, end, 1, numPartitions) - .select($"id", lit(100).as("data")) - val writer = new FileStreamSinkWriter( - df, fileFormat, path.toString, partitionColumnNames = Nil, hadoopConf, Map.empty) - writer.write().map(_.path.stripPrefix("file://")) - } - - // Write and check whether new files are written correctly - val files1 = writeRange(0, 10, 2) - assert(files1.size === 2, s"unexpected number of files: $files1") - checkFilesExist(path, files1, "file not written") - checkAnswer(spark.read.load(path.getCanonicalPath), (0 until 10).map(Row(_, 100))) - - // Append and check whether new files are written correctly and old files still exist - val files2 = writeRange(10, 20, 3) - assert(files2.size === 3, s"unexpected number of files: $files2") - assert(files2.intersect(files1).isEmpty, "old files returned") - checkFilesExist(path, files2, s"New file not written") - checkFilesExist(path, files1, s"Old file not found") - checkAnswer(spark.read.load(path.getCanonicalPath), (0 until 20).map(Row(_, 100))) - } - - test("FileStreamSinkWriter - partitioned data") { - implicit val e = ExpressionEncoder[java.lang.Long] - val path = Utils.createTempDir() - path.delete() - - val hadoopConf = spark.sparkContext.hadoopConfiguration - val fileFormat = new parquet.ParquetFileFormat() - - def writeRange(start: Int, end: Int, numPartitions: Int): Seq[String] = { - val df = spark - .range(start, end, 1, numPartitions) - .flatMap(x => Iterator(x, x, x)).toDF("id") - .select($"id", lit(100).as("data1"), lit(1000).as("data2")) - - require(df.rdd.partitions.size === numPartitions) - val writer = new FileStreamSinkWriter( - df, fileFormat, path.toString, partitionColumnNames = Seq("id"), hadoopConf, Map.empty) - writer.write().map(_.path.stripPrefix("file://")) - } - - def checkOneFileWrittenPerKey(keys: Seq[Int], filesWritten: Seq[String]): Unit = { - keys.foreach { id => - assert( - filesWritten.count(_.contains(s"/id=$id/")) == 1, - s"no file for id=$id. all files: \n\t${filesWritten.mkString("\n\t")}" - ) - } - } - - // Write and check whether new files are written correctly - val files1 = writeRange(0, 10, 2) - assert(files1.size === 10, s"unexpected number of files:\n${files1.mkString("\n")}") - checkFilesExist(path, files1, "file not written") - checkOneFileWrittenPerKey(0 until 10, files1) - - val answer1 = (0 until 10).flatMap(x => Iterator(x, x, x)).map(Row(100, 1000, _)) - checkAnswer(spark.read.load(path.getCanonicalPath), answer1) - - // Append and check whether new files are written correctly and old files still exist - val files2 = writeRange(0, 20, 3) - assert(files2.size === 20, s"unexpected number of files:\n${files2.mkString("\n")}") - assert(files2.intersect(files1).isEmpty, "old files returned") - checkFilesExist(path, files2, s"New file not written") - checkFilesExist(path, files1, s"Old file not found") - checkOneFileWrittenPerKey(0 until 20, files2) - - val answer2 = (0 until 20).flatMap(x => Iterator(x, x, x)).map(Row(100, 1000, _)) - checkAnswer(spark.read.load(path.getCanonicalPath), answer1 ++ answer2) - } - test("FileStreamSink - unpartitioned writing and batch reading") { val inputData = MemoryStream[Int] val df = inputData.toDF() @@ -270,18 +180,4 @@ class FileStreamSinkSuite extends StreamTest { } } - private def checkFilesExist(dir: File, expectedFiles: Seq[String], msg: String): Unit = { - import scala.collection.JavaConverters._ - val files = - FileUtils.listFiles(dir, new RegexFileFilter("[^.]+"), DirectoryFileFilter.DIRECTORY) - .asScala - .map(_.getCanonicalPath) - .toSet - - expectedFiles.foreach { f => - assert(files.contains(f), - s"\n$msg\nexpected file:\n\t$f\nfound files:\n${files.mkString("\n\t")}") - } - } - }