diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 9bb34e2df9a26..c2410811ef688 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -287,7 +287,11 @@ case class ParquetRelation2( } } - parquetSchema = maybeSchema.getOrElse(readSchema()) + try { + parquetSchema = readSchema().getOrElse(maybeSchema.getOrElse(maybeMetastoreSchema.get)) + } catch { + case e => throw new SparkException(s"Failed to find schema for ${paths.mkString(",")}", e) + } partitionKeysIncludedInParquetSchema = isPartitioned && @@ -308,7 +312,7 @@ case class ParquetRelation2( } } - private def readSchema(): StructType = { + private def readSchema(): Option[StructType] = { // Sees which file(s) we need to touch in order to figure out the schema. val filesToTouch = // Always tries the summary files first if users don't require a merged schema. In this case, @@ -611,8 +615,9 @@ object ParquetRelation2 { // internally. private[sql] val METASTORE_SCHEMA = "metastoreSchema" - private[parquet] def readSchema(footers: Seq[Footer], sqlContext: SQLContext): StructType = { - footers.map { footer => + private[parquet] def readSchema( + footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = { + val mergedSchema = footers.map { footer => val metadata = footer.getParquetMetadata.getFileMetaData val parquetSchema = metadata.getSchema val maybeSparkSchema = metadata @@ -630,11 +635,14 @@ object ParquetRelation2 { sqlContext.conf.isParquetBinaryAsString, sqlContext.conf.isParquetINT96AsTimestamp)) } - }.reduce { (left, right) => - try left.merge(right) catch { case e: Throwable => - throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e) - } + }.foldLeft[StructType](null) { + case (null, right) => right + case (left, right) => try left.merge(right) catch { case e: Throwable => + throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e) + } } + + Option(mergedSchema) } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 0e43faa8afdaf..6805fdcc0a2c4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -208,14 +208,14 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with ParquetRelation2( paths, Map(ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json), - None, + Some(metastoreSchema), Some(partitionSpec))(hive)) } else { val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) - LogicalRelation( - ParquetRelation2( + LogicalRelation(ParquetRelation2( paths, - Map(ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json))(hive)) + Map(ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json), + Some(metastoreSchema))(hive)) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala index 2acf1a7767c19..d619704f4f767 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala @@ -121,13 +121,54 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { override def beforeAll(): Unit = { super.beforeAll() + + sql(s""" + create table test_parquet + ( + intField INT, + stringField STRING + ) + ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + STORED AS + INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + """) + + val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) + jsonRDD(rdd).registerTempTable("jt") + sql(""" + create table test_parquet_jt ROW FORMAT + | SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + | STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + | AS select * from jt""".stripMargin) + conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true") } override def afterAll(): Unit = { super.afterAll() + sql("DROP TABLE test_parquet") + sql("DROP TABLE jt") + sql("DROP TABLE test_parquet_jt") + setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString) } + + test("scan from an empty parquet table") { + checkAnswer(sql("SELECT count(*) FROM test_parquet"), Row(0)) + } + + test("scan from an empty parquet table with upper case") { + checkAnswer(sql("SELECT count(INTFIELD) FROM TEST_parquet"), Row(0)) + } + + test("scan from an non empty parquet table #1") { + checkAnswer( + sql(s"SELECT a, b FROM test_parquet_jt WHERE a = '1'"), + Seq(Row(1, "str1")) + ) + } } class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {