From 8d212f049ccd176e5d6800d620929eed20844415 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 24 Oct 2017 19:50:33 -0700 Subject: [PATCH] [SPARK-15474][SQL] Write and read back non-emtpy schema with empty dataframe --- .../datasources/orc/OrcFileFormat.scala | 40 +++++++++++++++++-- .../spark/sql/hive/orc/OrcFileFormat.scala | 17 +++++--- .../sql/hive/execution/SQLQuerySuite.scala | 14 +++++++ 3 files changed, 63 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 2eeb0065455f..8691be2478be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -17,12 +17,18 @@ package org.apache.spark.sql.execution.datasources.orc -import org.apache.orc.TypeDescription +import java.io._ -import org.apache.spark.sql.AnalysisException +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.orc.{OrcFile, TypeDescription} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.types.StructType -private[sql] object OrcFileFormat { +private[sql] object OrcFileFormat extends Logging { private def checkFieldName(name: String): Unit = { try { TypeDescription.fromString(s"struct<$name:int>") @@ -39,4 +45,32 @@ private[sql] object OrcFileFormat { schema.fieldNames.foreach(checkFieldName) schema } + + def getSchemaString(schema: StructType): String = { + schema.fields.map(f => s"${f.name}:${f.dataType.catalogString}").mkString("struct<", ",", ">") + } + + private def readSchema(file: Path, conf: Configuration, fs: FileSystem) = { + try { + val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) + val reader = OrcFile.createReader(file, readerOptions) + val schema = reader.getSchema + if (schema.getFieldNames.size == 0) { + None + } else { + Some(schema) + } + } catch { + case _: IOException => None + } + } + + def readSchema(sparkSession: SparkSession, files: Seq[FileStatus]): Option[StructType] = { + val conf = sparkSession.sparkContext.hadoopConfiguration + val fs = FileSystem.get(conf) + files.map(_.getPath).flatMap(readSchema(_, conf, fs)).headOption.map { schema => + logDebug(s"Reading schema from file $files, got Hive schema string: $schema") + CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType] + } + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index d26ec15410d9..d665aa92c7f0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -32,7 +32,7 @@ import org.apache.hadoop.io.{NullWritable, Writable} import org.apache.hadoop.mapred.{JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter} import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} -import org.apache.orc.OrcConf.COMPRESS +import org.apache.orc.OrcConf.{COMPRESS, MAPRED_OUTPUT_SCHEMA} import org.apache.spark.TaskContext import org.apache.spark.sql.SparkSession @@ -58,10 +58,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable sparkSession: SparkSession, options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { - OrcFileOperator.readSchema( - files.map(_.getPath.toString), - Some(sparkSession.sessionState.newHadoopConf()) - ) + org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.readSchema(sparkSession, files) } override def prepareWrite( @@ -73,6 +70,10 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable val configuration = job.getConfiguration + configuration.set( + MAPRED_OUTPUT_SCHEMA.getAttribute, + org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.getSchemaString(dataSchema)) + configuration.set(COMPRESS.getAttribute, orcOptions.compressionCodec) configuration match { case conf: JobConf => @@ -252,6 +253,12 @@ private[orc] class OrcOutputWriter( override def close(): Unit = { if (recordWriterInstantiated) { recordWriter.close(Reporter.NULL) + } else { + // SPARK-15474 Write empty orc file with correct schema + val conf = context.getConfiguration() + val writer = org.apache.orc.OrcFile.createWriter( + new Path(path), org.apache.orc.mapred.OrcOutputFormat.buildOptions(conf)) + new org.apache.orc.mapreduce.OrcMapreduceRecordWriter(writer).close(context) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index c11e37a51664..b1ca9fa79169 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2153,4 +2153,18 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } } + + Seq("orc", "parquet").foreach { format => + test(s"SPARK-15474 Write and read back non-emtpy schema with empty dataframe - $format") { + withTempPath { file => + val path = file.getCanonicalPath + val emptyDf = Seq((true, 1, "str")).toDF.limit(0) + emptyDf.write.format(format).save(path) + + val df = spark.read.format(format).load(path) + assert(df.schema.sameType(emptyDf.schema)) + checkAnswer(df, emptyDf) + } + } + } }