Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,45 @@ class EmptyDirectoryDataWriter(
override def write(record: InternalRow): Unit = {}
}

/**
* FileFormatWriteTask for empty partitioned table (used for non-partial-dynamic-partition writes).
*/
class EmptyPartitionDataWriter(
description: WriteJobDescription,
taskAttemptContext: TaskAttemptContext,
committer: FileCommitProtocol,
staticPartitions: TablePartitionSpec
) extends BaseDynamicPartitionDataWriter(description, taskAttemptContext, committer) {
newOutputWriter()

private def newOutputWriter(): Unit = {
val isPartialDynamicPartitions = staticPartitions.nonEmpty &&
description.partitionColumns.map(_.name).diff(staticPartitions.keys.toSeq).nonEmpty
if (!isPartialDynamicPartitions) {
val staticOrEmptyValues = description.allColumns.map {
case c: Attribute =>
val partValue = {
if (c.dataType == StringType) {
staticPartitions.getOrElse(c.name, ExternalCatalogUtils.DEFAULT_PARTITION_NAME)
} else {
staticPartitions.get(c.name).orNull
}
}
Cast(
Literal.create(partValue, StringType),
c.dataType,
Some(SQLConf.get.sessionLocalTimeZone)
).eval()
}
val emptyRow = InternalRow.fromSeq(staticOrEmptyValues)
val partitionValue = getPartitionValues(emptyRow)
statsTrackers.foreach(_.newPartition(partitionValue))
renewCurrentWriter(Some(partitionValue), None, true)
}
}
override def write(record: InternalRow): Unit = {}
}

/** Writes data to a single directory (used for non-dynamic-partition writes). */
class SingleDirectoryDataWriter(
description: WriteJobDescription,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ object FileFormatWriter extends Logging {
customPartitionLocations: Map[TablePartitionSpec, String],
outputColumns: Seq[Attribute])

case class ColumnSpec(
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
staticPartitions: Option[TablePartitionSpec])

/** A function that converts the empty string to null for partition values. */
case class Empty2Null(child: Expression) extends UnaryExpression with String2StringExpression {
override def convert(v: UTF8String): UTF8String = if (v.numBytes() == 0) null else v
Expand Down Expand Up @@ -99,12 +104,13 @@ object FileFormatWriter extends Logging {
committer: FileCommitProtocol,
outputSpec: OutputSpec,
hadoopConf: Configuration,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
columnSpec: ColumnSpec,
statsTrackers: Seq[WriteJobStatsTracker],
options: Map[String, String])
: Set[String] = {

val partitionColumns = columnSpec.partitionColumns
val bucketSpec = columnSpec.bucketSpec
val job = Job.getInstance(hadoopConf)
job.setOutputKeyClass(classOf[Void])
job.setOutputValueClass(classOf[InternalRow])
Expand Down Expand Up @@ -226,7 +232,8 @@ object FileFormatWriter extends Logging {
sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE,
committer,
iterator = iter,
concurrentOutputWriterSpec = concurrentOutputWriterSpec)
concurrentOutputWriterSpec = concurrentOutputWriterSpec,
staticPartitions = columnSpec.staticPartitions)
},
rddWithNonEmptyPartitions.partitions.indices,
(index, res: WriteTaskResult) => {
Expand Down Expand Up @@ -261,7 +268,8 @@ object FileFormatWriter extends Logging {
sparkAttemptNumber: Int,
committer: FileCommitProtocol,
iterator: Iterator[InternalRow],
concurrentOutputWriterSpec: Option[ConcurrentOutputWriterSpec]): WriteTaskResult = {
concurrentOutputWriterSpec: Option[ConcurrentOutputWriterSpec],
staticPartitions: Option[TablePartitionSpec]): WriteTaskResult = {

val jobId = SparkHadoopWriterUtils.createJobID(new Date(jobIdInstant), sparkStageId)
val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
Expand All @@ -288,6 +296,10 @@ object FileFormatWriter extends Logging {
new EmptyDirectoryDataWriter(description, taskAttemptContext, committer)
} else if (description.partitionColumns.isEmpty && description.bucketIdExpression.isEmpty) {
new SingleDirectoryDataWriter(description, taskAttemptContext, committer)
} else if (sparkPartitionId == 0 && description.partitionColumns.nonEmpty &&
!iterator.hasNext && staticPartitions.isDefined) {
new EmptyPartitionDataWriter(description, taskAttemptContext, committer,
staticPartitions.get)
} else {
concurrentOutputWriterSpec match {
case Some(spec) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ case class InsertIntoHadoopFsRelationCommand(
outputSpec = FileFormatWriter.OutputSpec(
committerOutputPath.toString, customPartitionLocations, outputColumns),
hadoopConf = hadoopConf,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
columnSpec = FileFormatWriter.ColumnSpec(
partitionColumns, bucketSpec, Some(staticPartitions)),
statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)),
options = options)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ class OrcFileFormat

val (requestedColIds, canPruneCols) = resultedColPruneInfo.get
val resultSchemaString = OrcUtils.orcResultSchemaString(canPruneCols,
dataSchema, resultSchema, partitionSchema, conf)
dataSchema, StructType(requiredSchema.fields), partitionSchema, conf)
assert(requestedColIds.length == requiredSchema.length,
"[BUG] requested column IDs do not match required schema")
val taskConf = new Configuration(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ object OrcUtils extends Logging {
val resultSchemaString = if (canPruneCols) {
OrcUtils.orcTypeDescriptionString(resultSchema)
} else {
OrcUtils.orcTypeDescriptionString(StructType(dataSchema.fields ++ partitionSchema.fields))
OrcUtils.orcTypeDescriptionString(StructType(dataSchema.fields))
}
OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString)
resultSchemaString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,7 @@ class FileStreamSink(
committer = committer,
outputSpec = FileFormatWriter.OutputSpec(path, Map.empty, qe.analyzed.output),
hadoopConf = hadoopConf,
partitionColumns = partitionColumns,
bucketSpec = None,
columnSpec = FileFormatWriter.ColumnSpec(partitionColumns, None, None),
statsTrackers = Seq(basicWriteJobStatsTracker),
options = options)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.{LocalFileSystem, Path}
import org.apache.spark.SparkException
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT}
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt, positiveInt}
import org.apache.spark.sql.catalyst.plans.logical.Filter
Expand Down Expand Up @@ -127,6 +128,31 @@ class FileBasedDataSourceSuite extends QueryTest
}
}

test(s"SPARK-35592 empty partitioned table when saved should write a metadata only file") {
Seq("orc", "parquet").foreach { format =>
withTempPath { outputPath =>
val df = spark.emptyDataFrame.select(
lit("").as("part_id"),
lit("").as("some_column")
)
df.write.format(format).partitionBy("part_id").save(outputPath.toString)
val partFiles = new File(outputPath,
s"part_id=${ExternalCatalogUtils.DEFAULT_PARTITION_NAME}")
.listFiles()
.filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_"))
assert(partFiles.length === 1)

// Now read the file.
val df1 = spark.read.format(format).load(outputPath.toString)
checkAnswer(df1, Seq.empty[Row])
val expectedSchema = StructType(
Seq(StructField("some_column", StringType), StructField("part_id", NullType))
)
assert(df1.schema.equals(expectedSchema))
}
}
}

allFileBasedDataSources.foreach { format =>
test(s"SPARK-23372 error while writing empty schema files using $format") {
withTempPath { outputPath =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
outputSpec =
FileFormatWriter.OutputSpec(outputLocation, customPartitionLocations, outputColumns),
hadoopConf = hadoopConf,
partitionColumns = partitionAttributes,
bucketSpec = None,
columnSpec = FileFormatWriter.ColumnSpec(partitionAttributes, None, None),
statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)),
options = Map.empty)
}
Expand Down