From 5721b88c7c816f57ef39374ac9b335d870543628 Mon Sep 17 00:00:00 2001 From: Rajesh Balamohan Date: Mon, 8 Aug 2016 13:58:23 +0530 Subject: [PATCH 01/19] [SPARK-16948][SQL] Querying empty partitioned orc tables throws exception --- .../spark/sql/hive/HiveMetastoreCatalog.scala | 4 +++- .../spark/sql/hive/orc/OrcFileFormat.scala | 12 ++++++---- .../spark/sql/hive/orc/OrcQuerySuite.scala | 23 +++++++++++++++++++ 3 files changed, 34 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index c7c1acda25db..98d6d207e147 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -294,7 +294,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, inferred) }.getOrElse(metastoreSchema) } else { - defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()).get + val inferredSchema = + defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()) + inferredSchema.getOrElse(metastoreSchema) } val relation = HadoopFsRelation( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 1d3c4663c339..ebcdf2a0d56d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -17,9 +17,12 @@ package org.apache.spark.sql.hive.orc +import java.io.FileNotFoundException import java.net.URI import java.util.Properties +import scala.util.Try + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hive.conf.HiveConf.ConfVars @@ -56,10 +59,11 @@ private[sql] class OrcFileFormat sparkSession: SparkSession, options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { - OrcFileOperator.readSchema( - files.map(_.getPath.toUri.toString), - Some(sparkSession.sessionState.newHadoopConf()) - ) + val schema = Try(OrcFileOperator.readSchema( + files.map(_.getPath.toUri.toString), + Some(sparkSession.sessionState.newHadoopConf()))) + .recover { case _: FileNotFoundException => None } + schema.get } override def prepareWrite( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 49e963ee1294..bc3e5e22464a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -349,6 +349,29 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } } + test("SPARK-16948. Check empty orc partitioned tables in ORC") { + withSQLConf((HiveUtils.CONVERT_METASTORE_ORC.key, "true")) { + withTempPath { dir => + withTable("empty_orc_partitioned") { + spark.sql( + s"""CREATE TABLE empty_orc_partitioned(key INT, value STRING) + | PARTITIONED BY (p INT) STORED AS ORC + """.stripMargin) + + val emptyDF = Seq.empty[(Int, String)].toDF("key", "value").coalesce(1) + emptyDF.createOrReplaceTempView("empty") + + // Query empty table + val df = spark.sql( + s"""SELECT key, value FROM empty_orc_partitioned + | WHERE key > 10 + """.stripMargin) + checkAnswer(df, emptyDF) + } + } + } + } + test("SPARK-10623 Enable ORC PPD") { withTempPath { dir => withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") { From 4ae92d810720ab56081a03f1c222903af51ce751 Mon Sep 17 00:00:00 2001 From: Rajesh Balamohan Date: Tue, 9 Aug 2016 16:38:47 +0530 Subject: [PATCH 02/19] [SPARK-16948][SQL] Querying empty partitioned orc tables throws exception --- .../apache/spark/sql/hive/HiveMetastoreCatalog.scala | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 98d6d207e147..b82584235eb7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -287,16 +287,14 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log new Path(metastoreRelation.catalogTable.storage.locationUri.get), partitionSpec) + val schema = + defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()) val inferredSchema = if (fileType.equals("parquet")) { - val inferredSchema = - defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()) - inferredSchema.map { inferred => + schema.map { inferred => ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, inferred) }.getOrElse(metastoreSchema) } else { - val inferredSchema = - defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()) - inferredSchema.getOrElse(metastoreSchema) + schema.getOrElse(metastoreSchema) } val relation = HadoopFsRelation( From 75bca1b822e4c3fc6b894a9bc7fb6067b36900b7 Mon Sep 17 00:00:00 2001 From: Rajesh Balamohan Date: Wed, 10 Aug 2016 03:40:12 +0530 Subject: [PATCH 03/19] [SPARK-16948][SQL] Querying empty partitioned orc tables throws exception --- .../apache/spark/sql/hive/HiveMetastoreCatalog.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index b82584235eb7..ac185c2aa4e7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -287,21 +287,21 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log new Path(metastoreRelation.catalogTable.storage.locationUri.get), partitionSpec) - val schema = + val inferredSchema = defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()) - val inferredSchema = if (fileType.equals("parquet")) { - schema.map { inferred => - ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, inferred) + val schema = if (fileType == "parquet") { + inferredSchema.map { schema => + ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, schema) }.getOrElse(metastoreSchema) } else { - schema.getOrElse(metastoreSchema) + inferredSchema.getOrElse(metastoreSchema) } val relation = HadoopFsRelation( sparkSession = sparkSession, location = fileCatalog, partitionSchema = partitionSchema, - dataSchema = inferredSchema, + dataSchema = schema, bucketSpec = bucketSpec, fileFormat = defaultSource, options = options) From 4004c0a559a435ae2c4ef6a18c7b03408e6283fb Mon Sep 17 00:00:00 2001 From: Rajesh Balamohan Date: Mon, 22 Aug 2016 12:59:39 +0530 Subject: [PATCH 04/19] [SPARK-16948][SQL] Querying empty partitioned orc tables throws exception --- .../spark/sql/hive/HiveMetastoreCatalog.scala | 3 +++ .../spark/sql/hive/orc/OrcFileFormat.scala | 1 + .../spark/sql/hive/orc/OrcQuerySuite.scala | 27 ++++++++++++++++--- 3 files changed, 27 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index f18548b70cea..39e3864ac216 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -240,10 +240,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val inferredSchema = defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()) val schema = if (fileType == "parquet") { + // For Parquet, get correct schema by merging Metastore schema data types + // and Parquet schema field names. inferredSchema.map { schema => ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, schema) }.getOrElse(metastoreSchema) } else { + // For others (e.g orc), fall back to metastore schema if needed. inferredSchema.getOrElse(metastoreSchema) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 8e160413e8bc..b657fd597171 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -57,6 +57,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable sparkSession: SparkSession, options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { + // Safe to ignore FileNotFoundException in case no files are found. val schema = Try(OrcFileOperator.readSchema( files.map(_.getPath.toUri.toString), Some(sparkSession.sessionState.newHadoopConf()))) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index de3d6e0321d3..939b07bd5d3d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -379,16 +379,35 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { spark.sql( s"""CREATE TABLE empty_orc_partitioned(key INT, value STRING) | PARTITIONED BY (p INT) STORED AS ORC - """.stripMargin) + """.stripMargin) val emptyDF = Seq.empty[(Int, String)].toDF("key", "value").coalesce(1) emptyDF.createOrReplaceTempView("empty") // Query empty table val df = spark.sql( - s"""SELECT key, value FROM empty_orc_partitioned - | WHERE key > 10 - """.stripMargin) + s"""SELECT key, value FROM empty_orc_partitioned + | WHERE key > 10 + """.stripMargin) + checkAnswer(df, emptyDF) + } + } + + withTempPath { dir => + withTable("empty_text_partitioned") { + spark.sql( + s"""CREATE TABLE empty_text_partitioned(key INT, value STRING) + | PARTITIONED BY (p INT) STORED AS TEXTFILE + """.stripMargin) + + val emptyDF = Seq.empty[(Int, String)].toDF("key", "value").coalesce(1) + emptyDF.createOrReplaceTempView("empty_text") + + // Query empty table + val df = spark.sql( + s"""SELECT key, value FROM empty_text_partitioned + | WHERE key > 10 + """.stripMargin) checkAnswer(df, emptyDF) } } From 9a8838a962a347db7303944c0d911f1417fb80e7 Mon Sep 17 00:00:00 2001 From: Rajesh Balamohan Date: Tue, 23 Aug 2016 14:41:11 +0530 Subject: [PATCH 05/19] [SPARK-16948][SQL] Querying empty partitioned orc tables throws exception --- .../spark/sql/hive/HiveMetastoreCatalog.scala | 20 ++++++++++--------- .../spark/sql/hive/orc/OrcQuerySuite.scala | 19 ------------------ 2 files changed, 11 insertions(+), 28 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 39e3864ac216..24ca9715d0bd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -239,15 +239,17 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val inferredSchema = defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()) - val schema = if (fileType == "parquet") { - // For Parquet, get correct schema by merging Metastore schema data types - // and Parquet schema field names. - inferredSchema.map { schema => - ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, schema) - }.getOrElse(metastoreSchema) - } else { - // For others (e.g orc), fall back to metastore schema if needed. - inferredSchema.getOrElse(metastoreSchema) + val schema = fileType match { + case "parquet" => + // For Parquet, get correct schema by merging Metastore schema data types + // and Parquet schema field names. + inferredSchema.map { schema => + ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, schema) + }.getOrElse(metastoreSchema) + case "orc" => + inferredSchema.getOrElse(metastoreSchema) + case _ => + inferredSchema.get } val relation = HadoopFsRelation( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 939b07bd5d3d..1da0da706d0e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -392,25 +392,6 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { checkAnswer(df, emptyDF) } } - - withTempPath { dir => - withTable("empty_text_partitioned") { - spark.sql( - s"""CREATE TABLE empty_text_partitioned(key INT, value STRING) - | PARTITIONED BY (p INT) STORED AS TEXTFILE - """.stripMargin) - - val emptyDF = Seq.empty[(Int, String)].toDF("key", "value").coalesce(1) - emptyDF.createOrReplaceTempView("empty_text") - - // Query empty table - val df = spark.sql( - s"""SELECT key, value FROM empty_text_partitioned - | WHERE key > 10 - """.stripMargin) - checkAnswer(df, emptyDF) - } - } } } From eb8a955e9fd39b2c8b9b88cbc1f01b1937af1189 Mon Sep 17 00:00:00 2001 From: Rajesh Balamohan Date: Wed, 24 Aug 2016 04:27:41 +0530 Subject: [PATCH 06/19] [SPARK-16948][SQL] Querying empty partitioned orc tables throws exception --- .../scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 24ca9715d0bd..bf5bae830731 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -249,7 +249,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log case "orc" => inferredSchema.getOrElse(metastoreSchema) case _ => - inferredSchema.get + throw new RuntimeException(s"Cannot convert a $fileType to a HadoopFsRelation") } val relation = HadoopFsRelation( From 70cf84dc39ece6b2c3f700079c605f9d7dfa4904 Mon Sep 17 00:00:00 2001 From: Rajesh Balamohan Date: Wed, 24 Aug 2016 13:22:10 +0530 Subject: [PATCH 07/19] [SPARK-16948][SQL] Querying empty partitioned orc tables throws exception --- .../spark/sql/hive/HiveMetastoreCatalog.scala | 11 ++-- .../spark/sql/hive/orc/OrcFileFormat.scala | 40 ++++++++++--- .../spark/sql/hive/orc/OrcQuerySuite.scala | 57 ++++++++++++++++--- 3 files changed, 85 insertions(+), 23 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index bf5bae830731..bcfa43f05f98 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -191,7 +191,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log defaultSource: FileFormat, fileFormatClass: Class[_ <: FileFormat], fileType: String): LogicalRelation = { - val metastoreSchema = StructType.fromAttributes(metastoreRelation.output) + val metastoreSchema = metastoreRelation.schema val tableIdentifier = QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName) val bucketSpec = None // We don't support hive bucketed tables, only ones we write out. @@ -237,17 +237,18 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log new Path(metastoreRelation.catalogTable.storage.locationUri.get), partitionSpec) - val inferredSchema = - defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()) val schema = fileType match { case "parquet" => + val inferredSchema = + defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()) + // For Parquet, get correct schema by merging Metastore schema data types // and Parquet schema field names. inferredSchema.map { schema => ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, schema) }.getOrElse(metastoreSchema) case "orc" => - inferredSchema.getOrElse(metastoreSchema) + metastoreSchema case _ => throw new RuntimeException(s"Cannot convert a $fileType to a HadoopFsRelation") } @@ -286,7 +287,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log DataSource( sparkSession = sparkSession, paths = paths, - userSpecifiedSchema = Some(metastoreRelation.schema), + userSpecifiedSchema = Some(metastoreSchema), bucketSpec = bucketSpec, options = options, className = fileType).resolveRelation(), diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index b657fd597171..fa6e21f0f4e2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -103,6 +103,33 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable true } + def mapRequiredColumns( + conf: Configuration, + dataSchema: StructType, + physicalSchema: StructType, + requiredSchema: StructType): StructType = { + + // requiredSchema names might not match with physical schema names. + // This is especially true when data is generated via Hive wherein + // orc files would have column names as _col0, _col1 etc. This is + // fixed in Hive 2.0, where in physical col names would match that + // of metastore. To make it backward compatible, it is required to + // map physical names to that of requiredSchema. + + // for requiredSchema, get the ordinal from dataSchema + val ids = requiredSchema.map(a => dataSchema.fieldIndex(a.name): Integer).sorted + + // for ids, get corresponding name from physicalSchema (e.g _col1 in + // case of hive. otherwise it would match physical name) + val names = ids.map(i => physicalSchema.fieldNames(i)) + HiveShim.appendReadColumns(conf, ids, names) + + val mappedReqPhysicalSchemaStruct = + StructType(physicalSchema.filter(struct => names.contains(struct.name))) + + mappedReqPhysicalSchemaStruct + } + override def buildReader( sparkSession: SparkSession, dataSchema: StructType, @@ -133,7 +160,9 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable Iterator.empty } else { val physicalSchema = maybePhysicalSchema.get - OrcRelation.setRequiredColumns(conf, physicalSchema, requiredSchema) + // Get StructType for newly mapped schema + val mappedReqPhysicalSchema = + mapRequiredColumns(conf, dataSchema, physicalSchema, requiredSchema) val orcRecordReader = { val job = Job.getInstance(conf) @@ -154,7 +183,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable // Unwraps `OrcStruct`s to `UnsafeRow`s OrcRelation.unwrapOrcStructs( conf, - requiredSchema, + mappedReqPhysicalSchema, Some(orcRecordReader.getObjectInspector.asInstanceOf[StructObjectInspector]), new RecordReaderIterator[OrcStruct](orcRecordReader)) } @@ -309,11 +338,4 @@ private[orc] object OrcRelation extends HiveInspectors { maybeStructOI.map(unwrap).getOrElse(Iterator.empty) } - - def setRequiredColumns( - conf: Configuration, physicalSchema: StructType, requestedSchema: StructType): Unit = { - val ids = requestedSchema.map(a => physicalSchema.fieldIndex(a.name): Integer) - val (sortedIDs, sortedNames) = ids.zip(requestedSchema.fieldNames).sorted.unzip - HiveShim.appendReadColumns(conf, sortedIDs, sortedNames) - } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 1da0da706d0e..28174b542eeb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hive.orc +import java.io.File import java.nio.charset.StandardCharsets import org.scalatest.BeforeAndAfterAll @@ -372,24 +373,37 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } } - test("SPARK-16948. Check empty orc partitioned tables in ORC") { + test("SPARK-16948. Check empty orc tables in ORC") { withSQLConf((HiveUtils.CONVERT_METASTORE_ORC.key, "true")) { withTempPath { dir => withTable("empty_orc_partitioned") { spark.sql( - s"""CREATE TABLE empty_orc_partitioned(key INT, value STRING) - | PARTITIONED BY (p INT) STORED AS ORC + s""" + |CREATE TABLE empty_orc_partitioned(key INT, value STRING) + |PARTITIONED BY (p INT) STORED AS ORC + """.stripMargin) + + val emptyDF = Seq.empty[(Int, String)].toDF("key", "value").coalesce(1) + + // Query empty table + checkAnswer( + sql("SELECT key, value FROM empty_orc_partitioned WHERE key > 10"), + emptyDF) + } + + withTable("empty_orc") { + spark.sql( + s""" + |CREATE TABLE empty_orc(key INT, value STRING) + |STORED AS ORC """.stripMargin) val emptyDF = Seq.empty[(Int, String)].toDF("key", "value").coalesce(1) - emptyDF.createOrReplaceTempView("empty") // Query empty table - val df = spark.sql( - s"""SELECT key, value FROM empty_orc_partitioned - | WHERE key > 10 - """.stripMargin) - checkAnswer(df, emptyDF) + checkAnswer( + sql("SELECT key, value FROM empty_orc WHERE key > 10"), + emptyDF) } } } @@ -447,6 +461,31 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } } + def getHiveFile(path: String): File = { + new File(Thread.currentThread().getContextClassLoader.getResource(path).getFile) + } + + test("Verify ORC conversion parameter: CONVERT_METASTORE_ORC with Hive-1.x files") { + val singleRowDF = Seq((2415022, "AAAAAAAAOKJNECAA")).toDF("key", "value") + Seq("true", "false").foreach { orcConversion => + withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> orcConversion) { + withTable("dummy_orc") { + // Hive 1.x can have virtual columns as follows in ORC files + // Type: struct<_col0:int,_col1:string> in hive_1.x_orc + spark.sql( + s""" + |CREATE EXTERNAL TABLE dummy_orc(key INT, value STRING) + |STORED AS ORC + |LOCATION '${getHiveFile("data/files/hive_1.x_orc/")}' + """.stripMargin) + + val df = spark.sql("SELECT key, value FROM dummy_orc LIMIT 1") + checkAnswer(df, singleRowDF) + } + } + } + } + test("Verify the ORC conversion parameter: CONVERT_METASTORE_ORC") { withTempView("single") { val singleRowDF = Seq((0, "foo")).toDF("key", "value") From c9d677b0629b39e28bf6944cd3cf38457fcfe319 Mon Sep 17 00:00:00 2001 From: Rajesh Balamohan Date: Wed, 24 Aug 2016 13:23:22 +0530 Subject: [PATCH 08/19] [SPARK-16948][SQL] Querying empty partitioned orc tables throws exception --- .../data/files/hive_1.x_orc/dummy_orc.orc | Bin 0 -> 392 bytes 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 sql/hive/src/test/resources/data/files/hive_1.x_orc/dummy_orc.orc diff --git a/sql/hive/src/test/resources/data/files/hive_1.x_orc/dummy_orc.orc b/sql/hive/src/test/resources/data/files/hive_1.x_orc/dummy_orc.orc new file mode 100644 index 0000000000000000000000000000000000000000..fb35cb0df83ce92d7e46a033697b872e16a3bae5 GIT binary patch literal 392 zcmeYdau#G@;9?VE;b0A5umv(@xR@Cj7=%PQScUjG?mSz@B=7`CNIYmf#2vuk!@%&^ z=y6gi1EXRrUzlpC9;3U&hmW6%OUp_`nz~skoLyAJ4A=}982A|&4zPf9Gnp|k6qmFx z)aUElho@_>d$2C}C|euwbnb;8=K|~)SQ!|Y1ge0hCOu9NPMGlE8Pmlcp{|5ko=(CZ z#!NqBKrB8FW5&!dMKwM)g*2u_O~&JQt}Ht9#HDusl}|rT{8^O4Pyy8c>0#2t1m1)P zLQfM!A3fjkIY)Bx0bvEjj|)BumwQ3wt!B@kn7OjDtq+tK?+FNYvi30Y zFkAv^PY|;>rE&D=K^+0Dg{vPdIJipX&_U%RM^305m6|_ielo*Co~bh|W;D*Y+i Date: Thu, 25 Aug 2016 05:18:46 +0530 Subject: [PATCH 09/19] Revert "[SPARK-16948][SQL] Querying empty partitioned orc tables throws exception" This reverts commit c9d677b0629b39e28bf6944cd3cf38457fcfe319. --- .../data/files/hive_1.x_orc/dummy_orc.orc | Bin 392 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 sql/hive/src/test/resources/data/files/hive_1.x_orc/dummy_orc.orc diff --git a/sql/hive/src/test/resources/data/files/hive_1.x_orc/dummy_orc.orc b/sql/hive/src/test/resources/data/files/hive_1.x_orc/dummy_orc.orc deleted file mode 100644 index fb35cb0df83ce92d7e46a033697b872e16a3bae5..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 392 zcmeYdau#G@;9?VE;b0A5umv(@xR@Cj7=%PQScUjG?mSz@B=7`CNIYmf#2vuk!@%&^ z=y6gi1EXRrUzlpC9;3U&hmW6%OUp_`nz~skoLyAJ4A=}982A|&4zPf9Gnp|k6qmFx z)aUElho@_>d$2C}C|euwbnb;8=K|~)SQ!|Y1ge0hCOu9NPMGlE8Pmlcp{|5ko=(CZ z#!NqBKrB8FW5&!dMKwM)g*2u_O~&JQt}Ht9#HDusl}|rT{8^O4Pyy8c>0#2t1m1)P zLQfM!A3fjkIY)Bx0bvEjj|)BumwQ3wt!B@kn7OjDtq+tK?+FNYvi30Y zFkAv^PY|;>rE&D=K^+0Dg{vPdIJipX&_U%RM^305m6|_ielo*Co~bh|W;D*Y+i Date: Thu, 25 Aug 2016 05:22:40 +0530 Subject: [PATCH 10/19] Revert "Revert "[SPARK-16948][SQL] Querying empty partitioned orc tables throws exception"" This reverts commit 7385a06a990f6dc4afbdc346ff1ed03c53517258. --- .../data/files/hive_1.x_orc/dummy_orc.orc | Bin 0 -> 392 bytes 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 sql/hive/src/test/resources/data/files/hive_1.x_orc/dummy_orc.orc diff --git a/sql/hive/src/test/resources/data/files/hive_1.x_orc/dummy_orc.orc b/sql/hive/src/test/resources/data/files/hive_1.x_orc/dummy_orc.orc new file mode 100644 index 0000000000000000000000000000000000000000..fb35cb0df83ce92d7e46a033697b872e16a3bae5 GIT binary patch literal 392 zcmeYdau#G@;9?VE;b0A5umv(@xR@Cj7=%PQScUjG?mSz@B=7`CNIYmf#2vuk!@%&^ z=y6gi1EXRrUzlpC9;3U&hmW6%OUp_`nz~skoLyAJ4A=}982A|&4zPf9Gnp|k6qmFx z)aUElho@_>d$2C}C|euwbnb;8=K|~)SQ!|Y1ge0hCOu9NPMGlE8Pmlcp{|5ko=(CZ z#!NqBKrB8FW5&!dMKwM)g*2u_O~&JQt}Ht9#HDusl}|rT{8^O4Pyy8c>0#2t1m1)P zLQfM!A3fjkIY)Bx0bvEjj|)BumwQ3wt!B@kn7OjDtq+tK?+FNYvi30Y zFkAv^PY|;>rE&D=K^+0Dg{vPdIJipX&_U%RM^305m6|_ielo*Co~bh|W;D*Y+i Date: Thu, 25 Aug 2016 05:29:20 +0530 Subject: [PATCH 11/19] [SPARK-16948][SQL] Querying empty partitioned orc tables throws exception --- .../spark/sql/hive/orc/OrcFileFormat.scala | 40 ++++-------------- .../data/files/hive_1.x_orc/dummy_orc.orc | Bin 392 -> 0 bytes .../spark/sql/hive/orc/OrcQuerySuite.scala | 25 ----------- 3 files changed, 9 insertions(+), 56 deletions(-) delete mode 100644 sql/hive/src/test/resources/data/files/hive_1.x_orc/dummy_orc.orc diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index fa6e21f0f4e2..2de3e0b1314d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -103,33 +103,6 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable true } - def mapRequiredColumns( - conf: Configuration, - dataSchema: StructType, - physicalSchema: StructType, - requiredSchema: StructType): StructType = { - - // requiredSchema names might not match with physical schema names. - // This is especially true when data is generated via Hive wherein - // orc files would have column names as _col0, _col1 etc. This is - // fixed in Hive 2.0, where in physical col names would match that - // of metastore. To make it backward compatible, it is required to - // map physical names to that of requiredSchema. - - // for requiredSchema, get the ordinal from dataSchema - val ids = requiredSchema.map(a => dataSchema.fieldIndex(a.name): Integer).sorted - - // for ids, get corresponding name from physicalSchema (e.g _col1 in - // case of hive. otherwise it would match physical name) - val names = ids.map(i => physicalSchema.fieldNames(i)) - HiveShim.appendReadColumns(conf, ids, names) - - val mappedReqPhysicalSchemaStruct = - StructType(physicalSchema.filter(struct => names.contains(struct.name))) - - mappedReqPhysicalSchemaStruct - } - override def buildReader( sparkSession: SparkSession, dataSchema: StructType, @@ -160,9 +133,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable Iterator.empty } else { val physicalSchema = maybePhysicalSchema.get - // Get StructType for newly mapped schema - val mappedReqPhysicalSchema = - mapRequiredColumns(conf, dataSchema, physicalSchema, requiredSchema) + OrcRelation.setRequiredColumns(conf, physicalSchema, requiredSchema) val orcRecordReader = { val job = Job.getInstance(conf) @@ -183,7 +154,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable // Unwraps `OrcStruct`s to `UnsafeRow`s OrcRelation.unwrapOrcStructs( conf, - mappedReqPhysicalSchema, + requiredSchema, Some(orcRecordReader.getObjectInspector.asInstanceOf[StructObjectInspector]), new RecordReaderIterator[OrcStruct](orcRecordReader)) } @@ -338,4 +309,11 @@ private[orc] object OrcRelation extends HiveInspectors { maybeStructOI.map(unwrap).getOrElse(Iterator.empty) } + + def setRequiredColumns( + conf: Configuration, physicalSchema: StructType, requestedSchema: StructType): Unit = { + val ids = requestedSchema.map(a => physicalSchema.fieldIndex(a.name): Integer) + val (sortedIDs, sortedNames) = ids.zip(requestedSchema.fieldNames).sorted.unzip + HiveShim.appendReadColumns(conf, sortedIDs, sortedNames) + } } diff --git a/sql/hive/src/test/resources/data/files/hive_1.x_orc/dummy_orc.orc b/sql/hive/src/test/resources/data/files/hive_1.x_orc/dummy_orc.orc deleted file mode 100644 index fb35cb0df83ce92d7e46a033697b872e16a3bae5..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 392 zcmeYdau#G@;9?VE;b0A5umv(@xR@Cj7=%PQScUjG?mSz@B=7`CNIYmf#2vuk!@%&^ z=y6gi1EXRrUzlpC9;3U&hmW6%OUp_`nz~skoLyAJ4A=}982A|&4zPf9Gnp|k6qmFx z)aUElho@_>d$2C}C|euwbnb;8=K|~)SQ!|Y1ge0hCOu9NPMGlE8Pmlcp{|5ko=(CZ z#!NqBKrB8FW5&!dMKwM)g*2u_O~&JQt}Ht9#HDusl}|rT{8^O4Pyy8c>0#2t1m1)P zLQfM!A3fjkIY)Bx0bvEjj|)BumwQ3wt!B@kn7OjDtq+tK?+FNYvi30Y zFkAv^PY|;>rE&D=K^+0Dg{vPdIJipX&_U%RM^305m6|_ielo*Co~bh|W;D*Y+i - withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> orcConversion) { - withTable("dummy_orc") { - // Hive 1.x can have virtual columns as follows in ORC files - // Type: struct<_col0:int,_col1:string> in hive_1.x_orc - spark.sql( - s""" - |CREATE EXTERNAL TABLE dummy_orc(key INT, value STRING) - |STORED AS ORC - |LOCATION '${getHiveFile("data/files/hive_1.x_orc/")}' - """.stripMargin) - - val df = spark.sql("SELECT key, value FROM dummy_orc LIMIT 1") - checkAnswer(df, singleRowDF) - } - } - } - } - test("Verify the ORC conversion parameter: CONVERT_METASTORE_ORC") { withTempView("single") { val singleRowDF = Seq((0, "foo")).toDF("key", "value") From e8e2d707828715aa026970f3cb745439b8617f2d Mon Sep 17 00:00:00 2001 From: Rajesh Balamohan Date: Thu, 25 Aug 2016 05:32:58 +0530 Subject: [PATCH 12/19] [SPARK-16948][SQL] Querying empty partitioned orc tables throws exception --- .../org/apache/spark/sql/hive/orc/OrcFileFormat.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 2de3e0b1314d..de6e360ffffd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -311,9 +311,9 @@ private[orc] object OrcRelation extends HiveInspectors { } def setRequiredColumns( - conf: Configuration, physicalSchema: StructType, requestedSchema: StructType): Unit = { - val ids = requestedSchema.map(a => physicalSchema.fieldIndex(a.name): Integer) - val (sortedIDs, sortedNames) = ids.zip(requestedSchema.fieldNames).sorted.unzip - HiveShim.appendReadColumns(conf, sortedIDs, sortedNames) + conf: Configuration, physicalSchema: StructType, requestedSchema: StructType): Unit = { + val ids = requestedSchema.map(a => physicalSchema.fieldIndex(a.name): Integer) + val (sortedIDs, sortedNames) = ids.zip(requestedSchema.fieldNames).sorted.unzip + HiveShim.appendReadColumns(conf, sortedIDs, sortedNames) } } From 0f901aa48aea8c8a575578c30142eb057defe09b Mon Sep 17 00:00:00 2001 From: Rajesh Balamohan Date: Thu, 25 Aug 2016 05:39:36 +0530 Subject: [PATCH 13/19] [SPARK-16948][SQL] Querying empty partitioned orc tables throws exception --- .../org/apache/spark/sql/hive/orc/OrcFileFormat.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index de6e360ffffd..b657fd597171 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -312,8 +312,8 @@ private[orc] object OrcRelation extends HiveInspectors { def setRequiredColumns( conf: Configuration, physicalSchema: StructType, requestedSchema: StructType): Unit = { - val ids = requestedSchema.map(a => physicalSchema.fieldIndex(a.name): Integer) - val (sortedIDs, sortedNames) = ids.zip(requestedSchema.fieldNames).sorted.unzip - HiveShim.appendReadColumns(conf, sortedIDs, sortedNames) - } + val ids = requestedSchema.map(a => physicalSchema.fieldIndex(a.name): Integer) + val (sortedIDs, sortedNames) = ids.zip(requestedSchema.fieldNames).sorted.unzip + HiveShim.appendReadColumns(conf, sortedIDs, sortedNames) + } } From 0772a9630d56ecbe6ab0c5d9d850e55ca7b1b8bf Mon Sep 17 00:00:00 2001 From: Rajesh Balamohan Date: Thu, 25 Aug 2016 10:24:50 +0530 Subject: [PATCH 14/19] [SPARK-16948][SQL] Querying empty partitioned orc tables throws exception --- .../spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- .../spark/sql/hive/orc/OrcQuerySuite.scala | 56 +++++++++---------- 2 files changed, 28 insertions(+), 30 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index bcfa43f05f98..c62f6bd5ec5b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -250,7 +250,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log case "orc" => metastoreSchema case _ => - throw new RuntimeException(s"Cannot convert a $fileType to a HadoopFsRelation") + throw new RuntimeException(s"Cannot convert a $fileType to a data source table") } val relation = HadoopFsRelation( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index b0bd329aabc2..eb1cd96bc585 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -375,36 +375,34 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { test("SPARK-16948. Check empty orc tables in ORC") { withSQLConf((HiveUtils.CONVERT_METASTORE_ORC.key, "true")) { - withTempPath { dir => - withTable("empty_orc_partitioned") { - spark.sql( - s""" - |CREATE TABLE empty_orc_partitioned(key INT, value STRING) - |PARTITIONED BY (p INT) STORED AS ORC - """.stripMargin) - - val emptyDF = Seq.empty[(Int, String)].toDF("key", "value").coalesce(1) - - // Query empty table - checkAnswer( - sql("SELECT key, value FROM empty_orc_partitioned WHERE key > 10"), - emptyDF) - } - - withTable("empty_orc") { - spark.sql( - s""" - |CREATE TABLE empty_orc(key INT, value STRING) - |STORED AS ORC - """.stripMargin) - - val emptyDF = Seq.empty[(Int, String)].toDF("key", "value").coalesce(1) + withTable("empty_orc_partitioned") { + spark.sql( + """ + |CREATE TABLE empty_orc_partitioned(key INT, value STRING) + |PARTITIONED BY (p INT) STORED AS ORC + """.stripMargin) + + val emptyDF = Seq.empty[(Int, String)].toDF("key", "value").coalesce(1) + + // Query empty table + checkAnswer( + sql("SELECT key, value FROM empty_orc_partitioned"), + emptyDF) + } - // Query empty table - checkAnswer( - sql("SELECT key, value FROM empty_orc WHERE key > 10"), - emptyDF) - } + withTable("empty_orc") { + spark.sql( + """ + |CREATE TABLE empty_orc(key INT, value STRING) + |STORED AS ORC + """.stripMargin) + + val emptyDF = Seq.empty[(Int, String)].toDF("key", "value").coalesce(1) + + // Query empty table + checkAnswer( + sql("SELECT key, value FROM empty_orc"), + emptyDF) } } } From 6ff7e5d50de530a71df5c4a4b220a8119ca3a3f6 Mon Sep 17 00:00:00 2001 From: Rajesh Balamohan Date: Thu, 25 Aug 2016 10:38:25 +0530 Subject: [PATCH 15/19] [SPARK-16948][SQL] Querying empty partitioned orc tables throws exception --- .../scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index eb1cd96bc585..888a0a5eeb56 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -376,7 +376,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { test("SPARK-16948. Check empty orc tables in ORC") { withSQLConf((HiveUtils.CONVERT_METASTORE_ORC.key, "true")) { withTable("empty_orc_partitioned") { - spark.sql( + sql( """ |CREATE TABLE empty_orc_partitioned(key INT, value STRING) |PARTITIONED BY (p INT) STORED AS ORC @@ -391,7 +391,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } withTable("empty_orc") { - spark.sql( + sql( """ |CREATE TABLE empty_orc(key INT, value STRING) |STORED AS ORC From fc14e2d95cb95becf90a38e91e7725e483bae835 Mon Sep 17 00:00:00 2001 From: Rajesh Balamohan Date: Fri, 26 Aug 2016 05:27:33 +0530 Subject: [PATCH 16/19] [SPARK-16948][SQL] Querying empty partitioned orc tables throws exception --- .../scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 888a0a5eeb56..ec57a7c6846d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -373,7 +373,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } } - test("SPARK-16948. Check empty orc tables in ORC") { + test("support empty orc table when converting hive serde table to data source table") { withSQLConf((HiveUtils.CONVERT_METASTORE_ORC.key, "true")) { withTable("empty_orc_partitioned") { sql( From 9ecb2ed01db1daa19dfe837745d5468cc4990703 Mon Sep 17 00:00:00 2001 From: Rajesh Balamohan Date: Fri, 26 Aug 2016 06:41:35 +0530 Subject: [PATCH 17/19] [SPARK-16948][SQL] Querying empty partitioned orc tables throws exception --- .../apache/spark/sql/hive/orc/OrcFileFormat.scala | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index b657fd597171..286197b50e22 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -17,12 +17,9 @@ package org.apache.spark.sql.hive.orc -import java.io.FileNotFoundException import java.net.URI import java.util.Properties -import scala.util.Try - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hive.conf.HiveConf.ConfVars @@ -57,12 +54,10 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable sparkSession: SparkSession, options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { - // Safe to ignore FileNotFoundException in case no files are found. - val schema = Try(OrcFileOperator.readSchema( - files.map(_.getPath.toUri.toString), - Some(sparkSession.sessionState.newHadoopConf()))) - .recover { case _: FileNotFoundException => None } - schema.get + OrcFileOperator.readSchema( + files.map(_.getPath.toUri.toString), + Some(sparkSession.sessionState.newHadoopConf()) + ) } override def prepareWrite( From fa713700f853e78053ac0be5db49250951aaa715 Mon Sep 17 00:00:00 2001 From: Rajesh Balamohan Date: Thu, 22 Sep 2016 09:24:13 +0530 Subject: [PATCH 18/19] [SPARK-16948][SQL] Querying empty partitioned orc tables throws exception --- .../apache/spark/sql/hive/HiveMetastoreCatalog.scala | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index c62f6bd5ec5b..128608b379f6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -238,16 +238,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log partitionSpec) val schema = fileType match { - case "parquet" => - val inferredSchema = - defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()) - - // For Parquet, get correct schema by merging Metastore schema data types - // and Parquet schema field names. - inferredSchema.map { schema => - ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, schema) - }.getOrElse(metastoreSchema) - case "orc" => + case "orc" | "parquet" => metastoreSchema case _ => throw new RuntimeException(s"Cannot convert a $fileType to a data source table") From e39715e853e4f87e1657fcb491dd7ed47e9da686 Mon Sep 17 00:00:00 2001 From: Rajesh Balamohan Date: Thu, 22 Sep 2016 14:07:47 +0530 Subject: [PATCH 19/19] [SPARK-16948][SQL] Querying empty partitioned orc tables throws exception --- .../org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 128608b379f6..be43ac67e0ed 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -238,7 +238,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log partitionSpec) val schema = fileType match { - case "orc" | "parquet" => + case "parquet" => + val inferredSchema = + defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()) + inferredSchema.map { inferred => + ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, inferred) + }.getOrElse(metastoreSchema) + case "orc" => metastoreSchema case _ => throw new RuntimeException(s"Cannot convert a $fileType to a data source table")