From 85b861846927938aea6c31c510320bc3066fc04f Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Sun, 6 Jun 2021 02:19:01 +0800 Subject: [PATCH] [SPARK-35592][SQL] An empty dataframe is saved with partitions should write a metadata only file --- .../datasources/FileFormatDataWriter.scala | 23 ++++++++++++++++ .../datasources/FileFormatWriter.scala | 3 +++ .../datasources/orc/OrcFileFormat.scala | 2 +- .../execution/datasources/orc/OrcUtils.scala | 2 +- .../spark/sql/FileBasedDataSourceSuite.scala | 26 +++++++++++++++++++ 5 files changed, 54 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala index 7e5a8cce2783..ef17f02fad9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.connector.write.{DataWriter, WriterCommitMessage} import org.apache.spark.sql.execution.datasources.FileFormatWriter.ConcurrentOutputWriterSpec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StringType +import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.SerializableConfiguration /** @@ -118,6 +119,28 @@ class EmptyDirectoryDataWriter( override def write(record: InternalRow): Unit = {} } +/** FileFormatWriteTask for empty partitioned table */ +class EmptyPartitionDataWriter( + description: WriteJobDescription, + taskAttemptContext: TaskAttemptContext, + committer: FileCommitProtocol +) extends BaseDynamicPartitionDataWriter(description, taskAttemptContext, committer) { + newOutputWriter() + + private def newOutputWriter(): Unit = { + val emptyValues = description.allColumns.map { + case c if description.partitionColumns.contains(c.toAttribute) => + UTF8String.fromString(ExternalCatalogUtils.DEFAULT_PARTITION_NAME) + case _ => null + } + val emptyRow = InternalRow.fromSeq(emptyValues) + val partitionValue = getPartitionValues(emptyRow) + statsTrackers.foreach(_.newPartition(partitionValue)) + renewCurrentWriter(Some(partitionValue), None, true) + } + override def write(record: InternalRow): Unit = {} +} + /** Writes data to a single directory (used for non-dynamic-partition writes). */ class SingleDirectoryDataWriter( description: WriteJobDescription, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 6839a4db0bc2..59df502e62c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -288,6 +288,9 @@ object FileFormatWriter extends Logging { new EmptyDirectoryDataWriter(description, taskAttemptContext, committer) } else if (description.partitionColumns.isEmpty && description.bucketIdExpression.isEmpty) { new SingleDirectoryDataWriter(description, taskAttemptContext, committer) + } else if (sparkPartitionId == 0 && description.partitionColumns.nonEmpty && + !iterator.hasNext) { + new EmptyPartitionDataWriter(description, taskAttemptContext, committer) } else { concurrentOutputWriterSpec match { case Some(spec) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 3a5097441ab3..952734724448 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -210,7 +210,7 @@ class OrcFileFormat val (requestedColIds, canPruneCols) = resultedColPruneInfo.get val resultSchemaString = OrcUtils.orcResultSchemaString(canPruneCols, - dataSchema, resultSchema, partitionSchema, conf) + dataSchema, StructType(requiredSchema.fields), partitionSchema, conf) assert(requestedColIds.length == requiredSchema.length, "[BUG] requested column IDs do not match required schema") val taskConf = new Configuration(conf) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index a63a5c6d5051..3da4315aca6d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -249,7 +249,7 @@ object OrcUtils extends Logging { val resultSchemaString = if (canPruneCols) { OrcUtils.orcTypeDescriptionString(resultSchema) } else { - OrcUtils.orcTypeDescriptionString(StructType(dataSchema.fields ++ partitionSchema.fields)) + OrcUtils.orcTypeDescriptionString(StructType(dataSchema.fields)) } OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString) resultSchemaString diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 876f62803dc7..fe32668a136f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.{LocalFileSystem, Path} import org.apache.spark.SparkException import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT} +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt, positiveInt} import org.apache.spark.sql.catalyst.plans.logical.Filter @@ -127,6 +128,31 @@ class FileBasedDataSourceSuite extends QueryTest } } + test(s"SPARK-35592 empty partitioned table when saved should write a metadata only file") { + Seq("orc", "parquet").foreach { format => + withTempPath { outputPath => + val df = spark.emptyDataFrame.select( + lit("").as("part_id"), + lit("").as("some_column") + ) + df.write.format(format).partitionBy("part_id").save(outputPath.toString) + val partFiles = new File(outputPath, + s"part_id=${ExternalCatalogUtils.DEFAULT_PARTITION_NAME}") + .listFiles() + .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_")) + assert(partFiles.length === 1) + + // Now read the file. + val df1 = spark.read.format(format).load(outputPath.toString) + checkAnswer(df1, Seq.empty[Row]) + val expectedSchema = StructType( + Seq(StructField("some_column", StringType), StructField("part_id", NullType)) + ) + assert(df1.schema.equals(expectedSchema)) + } + } + } + allFileBasedDataSources.foreach { format => test(s"SPARK-23372 error while writing empty schema files using $format") { withTempPath { outputPath =>