From acb6db12247c292e69961832180bc4f734c38bbc Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Sun, 6 Jun 2021 02:19:01 +0800 Subject: [PATCH] [SPARK-35592][SQL] An empty dataframe is saved with partitions should write a metadata only file --- .../datasources/FileFormatDataWriter.scala | 39 +++++++++++++++++++ .../datasources/FileFormatWriter.scala | 20 ++++++++-- .../InsertIntoHadoopFsRelationCommand.scala | 4 +- .../datasources/orc/OrcFileFormat.scala | 2 +- .../execution/datasources/orc/OrcUtils.scala | 2 +- .../execution/streaming/FileStreamSink.scala | 3 +- .../spark/sql/FileBasedDataSourceSuite.scala | 26 +++++++++++++ .../sql/hive/execution/SaveAsHiveFile.scala | 3 +- 8 files changed, 87 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala index 7e5a8cce2783..fbb195598a29 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala @@ -118,6 +118,45 @@ class EmptyDirectoryDataWriter( override def write(record: InternalRow): Unit = {} } +/** + * FileFormatWriteTask for empty partitioned table (used for non-partial-dynamic-partition writes). + */ +class EmptyPartitionDataWriter( + description: WriteJobDescription, + taskAttemptContext: TaskAttemptContext, + committer: FileCommitProtocol, + staticPartitions: TablePartitionSpec +) extends BaseDynamicPartitionDataWriter(description, taskAttemptContext, committer) { + newOutputWriter() + + private def newOutputWriter(): Unit = { + val isPartialDynamicPartitions = staticPartitions.nonEmpty && + description.partitionColumns.map(_.name).diff(staticPartitions.keys.toSeq).nonEmpty + if (!isPartialDynamicPartitions) { + val staticOrEmptyValues = description.allColumns.map { + case c: Attribute => + val partValue = { + if (c.dataType == StringType) { + staticPartitions.getOrElse(c.name, ExternalCatalogUtils.DEFAULT_PARTITION_NAME) + } else { + staticPartitions.get(c.name).orNull + } + } + Cast( + Literal.create(partValue, StringType), + c.dataType, + Some(SQLConf.get.sessionLocalTimeZone) + ).eval() + } + val emptyRow = InternalRow.fromSeq(staticOrEmptyValues) + val partitionValue = getPartitionValues(emptyRow) + statsTrackers.foreach(_.newPartition(partitionValue)) + renewCurrentWriter(Some(partitionValue), None, true) + } + } + override def write(record: InternalRow): Unit = {} +} + /** Writes data to a single directory (used for non-dynamic-partition writes). */ class SingleDirectoryDataWriter( description: WriteJobDescription, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 6839a4db0bc2..71791243b167 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -54,6 +54,11 @@ object FileFormatWriter extends Logging { customPartitionLocations: Map[TablePartitionSpec, String], outputColumns: Seq[Attribute]) + case class ColumnSpec( + partitionColumns: Seq[Attribute], + bucketSpec: Option[BucketSpec], + staticPartitions: Option[TablePartitionSpec]) + /** A function that converts the empty string to null for partition values. */ case class Empty2Null(child: Expression) extends UnaryExpression with String2StringExpression { override def convert(v: UTF8String): UTF8String = if (v.numBytes() == 0) null else v @@ -99,12 +104,13 @@ object FileFormatWriter extends Logging { committer: FileCommitProtocol, outputSpec: OutputSpec, hadoopConf: Configuration, - partitionColumns: Seq[Attribute], - bucketSpec: Option[BucketSpec], + columnSpec: ColumnSpec, statsTrackers: Seq[WriteJobStatsTracker], options: Map[String, String]) : Set[String] = { + val partitionColumns = columnSpec.partitionColumns + val bucketSpec = columnSpec.bucketSpec val job = Job.getInstance(hadoopConf) job.setOutputKeyClass(classOf[Void]) job.setOutputValueClass(classOf[InternalRow]) @@ -226,7 +232,8 @@ object FileFormatWriter extends Logging { sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE, committer, iterator = iter, - concurrentOutputWriterSpec = concurrentOutputWriterSpec) + concurrentOutputWriterSpec = concurrentOutputWriterSpec, + staticPartitions = columnSpec.staticPartitions) }, rddWithNonEmptyPartitions.partitions.indices, (index, res: WriteTaskResult) => { @@ -261,7 +268,8 @@ object FileFormatWriter extends Logging { sparkAttemptNumber: Int, committer: FileCommitProtocol, iterator: Iterator[InternalRow], - concurrentOutputWriterSpec: Option[ConcurrentOutputWriterSpec]): WriteTaskResult = { + concurrentOutputWriterSpec: Option[ConcurrentOutputWriterSpec], + staticPartitions: Option[TablePartitionSpec]): WriteTaskResult = { val jobId = SparkHadoopWriterUtils.createJobID(new Date(jobIdInstant), sparkStageId) val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId) @@ -288,6 +296,10 @@ object FileFormatWriter extends Logging { new EmptyDirectoryDataWriter(description, taskAttemptContext, committer) } else if (description.partitionColumns.isEmpty && description.bucketIdExpression.isEmpty) { new SingleDirectoryDataWriter(description, taskAttemptContext, committer) + } else if (sparkPartitionId == 0 && description.partitionColumns.nonEmpty && + !iterator.hasNext && staticPartitions.isDefined) { + new EmptyPartitionDataWriter(description, taskAttemptContext, committer, + staticPartitions.get) } else { concurrentOutputWriterSpec match { case Some(spec) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 267b360b474c..d7d2bdb944b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -180,8 +180,8 @@ case class InsertIntoHadoopFsRelationCommand( outputSpec = FileFormatWriter.OutputSpec( committerOutputPath.toString, customPartitionLocations, outputColumns), hadoopConf = hadoopConf, - partitionColumns = partitionColumns, - bucketSpec = bucketSpec, + columnSpec = FileFormatWriter.ColumnSpec( + partitionColumns, bucketSpec, Some(staticPartitions)), statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)), options = options) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 3a5097441ab3..952734724448 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -210,7 +210,7 @@ class OrcFileFormat val (requestedColIds, canPruneCols) = resultedColPruneInfo.get val resultSchemaString = OrcUtils.orcResultSchemaString(canPruneCols, - dataSchema, resultSchema, partitionSchema, conf) + dataSchema, StructType(requiredSchema.fields), partitionSchema, conf) assert(requestedColIds.length == requiredSchema.length, "[BUG] requested column IDs do not match required schema") val taskConf = new Configuration(conf) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index a63a5c6d5051..3da4315aca6d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -249,7 +249,7 @@ object OrcUtils extends Logging { val resultSchemaString = if (canPruneCols) { OrcUtils.orcTypeDescriptionString(resultSchema) } else { - OrcUtils.orcTypeDescriptionString(StructType(dataSchema.fields ++ partitionSchema.fields)) + OrcUtils.orcTypeDescriptionString(StructType(dataSchema.fields)) } OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString) resultSchemaString diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 876671c61d81..e3d89fde7051 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -185,8 +185,7 @@ class FileStreamSink( committer = committer, outputSpec = FileFormatWriter.OutputSpec(path, Map.empty, qe.analyzed.output), hadoopConf = hadoopConf, - partitionColumns = partitionColumns, - bucketSpec = None, + columnSpec = FileFormatWriter.ColumnSpec(partitionColumns, None, None), statsTrackers = Seq(basicWriteJobStatsTracker), options = options) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 876f62803dc7..fe32668a136f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.{LocalFileSystem, Path} import org.apache.spark.SparkException import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT} +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt, positiveInt} import org.apache.spark.sql.catalyst.plans.logical.Filter @@ -127,6 +128,31 @@ class FileBasedDataSourceSuite extends QueryTest } } + test(s"SPARK-35592 empty partitioned table when saved should write a metadata only file") { + Seq("orc", "parquet").foreach { format => + withTempPath { outputPath => + val df = spark.emptyDataFrame.select( + lit("").as("part_id"), + lit("").as("some_column") + ) + df.write.format(format).partitionBy("part_id").save(outputPath.toString) + val partFiles = new File(outputPath, + s"part_id=${ExternalCatalogUtils.DEFAULT_PARTITION_NAME}") + .listFiles() + .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_")) + assert(partFiles.length === 1) + + // Now read the file. + val df1 = spark.read.format(format).load(outputPath.toString) + checkAnswer(df1, Seq.empty[Row]) + val expectedSchema = StructType( + Seq(StructField("some_column", StringType), StructField("part_id", NullType)) + ) + assert(df1.schema.equals(expectedSchema)) + } + } + } + allFileBasedDataSources.foreach { format => test(s"SPARK-23372 error while writing empty schema files using $format") { withTempPath { outputPath => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala index ec189344f4fa..d3ac2fe0c2bc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala @@ -92,8 +92,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { outputSpec = FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations, outputColumns), hadoopConf = hadoopConf, - partitionColumns = partitionAttributes, - bucketSpec = None, + columnSpec = FileFormatWriter.ColumnSpec(partitionAttributes, None, None), statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)), options = Map.empty) }