From 442ffe01ca4abb3ba94ad4fe30604501befcb32b Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Thu, 12 Feb 2015 01:19:27 -0800 Subject: [PATCH 1/7] passdown the schema for Parquet File in HiveContext --- .../org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 0e43faa8afdaf..23458430dfc44 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -208,14 +208,15 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with ParquetRelation2( paths, Map(ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json), - None, + Some(metastoreSchema), Some(partitionSpec))(hive)) } else { val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) LogicalRelation( ParquetRelation2( paths, - Map(ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json))(hive)) + Map(ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json), + Some(metastoreSchema))(hive)) } } From a04930badb291e55ba4e6ba79ce781a89f827932 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Thu, 12 Feb 2015 19:06:47 -0800 Subject: [PATCH 2/7] fix bug of scan an empty parquet based table --- .../apache/spark/sql/parquet/newParquet.scala | 24 +++++++----- .../spark/sql/hive/HiveMetastoreCatalog.scala | 3 +- .../spark/sql/parquet/parquetSuites.scala | 37 +++++++++++++++++++ 3 files changed, 53 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 9bb34e2df9a26..6185306dd02aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -287,7 +287,11 @@ case class ParquetRelation2( } } - parquetSchema = maybeSchema.getOrElse(readSchema()) + try { + parquetSchema = readSchema().getOrElse(maybeSchema.get) + } catch { + case e => throw new SparkException(s"Failed to find schema for ${paths.mkString(",")}", e) + } partitionKeysIncludedInParquetSchema = isPartitioned && @@ -308,7 +312,7 @@ case class ParquetRelation2( } } - private def readSchema(): StructType = { + private def readSchema(): Option[StructType] = { // Sees which file(s) we need to touch in order to figure out the schema. val filesToTouch = // Always tries the summary files first if users don't require a merged schema. In this case, @@ -611,8 +615,9 @@ object ParquetRelation2 { // internally. private[sql] val METASTORE_SCHEMA = "metastoreSchema" - private[parquet] def readSchema(footers: Seq[Footer], sqlContext: SQLContext): StructType = { - footers.map { footer => + private[parquet] def readSchema( + footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = { + Option(footers.map { footer => val metadata = footer.getParquetMetadata.getFileMetaData val parquetSchema = metadata.getSchema val maybeSparkSchema = metadata @@ -630,11 +635,12 @@ object ParquetRelation2 { sqlContext.conf.isParquetBinaryAsString, sqlContext.conf.isParquetINT96AsTimestamp)) } - }.reduce { (left, right) => - try left.merge(right) catch { case e: Throwable => - throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e) - } - } + }.foldLeft[StructType](null) { + case (null, right) => right + case (left, right) => try left.merge(right) catch { case e: Throwable => + throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e) + } + }) } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 23458430dfc44..6805fdcc0a2c4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -212,8 +212,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Some(partitionSpec))(hive)) } else { val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) - LogicalRelation( - ParquetRelation2( + LogicalRelation(ParquetRelation2( paths, Map(ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json), Some(metastoreSchema))(hive)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala index 2acf1a7767c19..edff90f60e917 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala @@ -121,13 +121,50 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { override def beforeAll(): Unit = { super.beforeAll() + + sql(s""" + create table test_parquet + ( + intField INT, + stringField STRING + ) + ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + STORED AS + INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + """) + + val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) + jsonRDD(rdd).registerTempTable("jt") + sql(""" + create table test ROW FORMAT + | SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + | STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + | AS select * from jt""".stripMargin) + conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true") } override def afterAll(): Unit = { super.afterAll() + sql("DROP TABLE test_parquet") + sql("DROP TABLE test") + sql("DROP TABLE jt") + setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString) } + + test("scan from an empty parquet table") { + checkAnswer(sql("SELECT count(*) FROM test_parquet"), Row(0)) + } + + test("scan from an non empty parquet table") { + checkAnswer( + sql(s"SELECT a, b FROM jt WHERE a = '1'"), + Seq(Row(1, "str1")) + ) + } } class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase { From 36978d1835ab6e0266ad3787b33056b573fd59e8 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Mon, 16 Feb 2015 23:17:54 -0800 Subject: [PATCH 3/7] Update the code as feedback --- .../org/apache/spark/sql/parquet/newParquet.scala | 8 +++++--- .../org/apache/spark/sql/parquet/parquetSuites.scala | 12 ++++++++---- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 6185306dd02aa..c2410811ef688 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -288,7 +288,7 @@ case class ParquetRelation2( } try { - parquetSchema = readSchema().getOrElse(maybeSchema.get) + parquetSchema = readSchema().getOrElse(maybeSchema.getOrElse(maybeMetastoreSchema.get)) } catch { case e => throw new SparkException(s"Failed to find schema for ${paths.mkString(",")}", e) } @@ -617,7 +617,7 @@ object ParquetRelation2 { private[parquet] def readSchema( footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = { - Option(footers.map { footer => + val mergedSchema = footers.map { footer => val metadata = footer.getParquetMetadata.getFileMetaData val parquetSchema = metadata.getSchema val maybeSparkSchema = metadata @@ -640,7 +640,9 @@ object ParquetRelation2 { case (left, right) => try left.merge(right) catch { case e: Throwable => throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e) } - }) + } + + Option(mergedSchema) } /** diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala index edff90f60e917..d619704f4f767 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala @@ -137,7 +137,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) jsonRDD(rdd).registerTempTable("jt") sql(""" - create table test ROW FORMAT + create table test_parquet_jt ROW FORMAT | SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' | STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' @@ -149,8 +149,8 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { override def afterAll(): Unit = { super.afterAll() sql("DROP TABLE test_parquet") - sql("DROP TABLE test") sql("DROP TABLE jt") + sql("DROP TABLE test_parquet_jt") setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString) } @@ -159,9 +159,13 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { checkAnswer(sql("SELECT count(*) FROM test_parquet"), Row(0)) } - test("scan from an non empty parquet table") { + test("scan from an empty parquet table with upper case") { + checkAnswer(sql("SELECT count(INTFIELD) FROM TEST_parquet"), Row(0)) + } + + test("scan from an non empty parquet table #1") { checkAnswer( - sql(s"SELECT a, b FROM jt WHERE a = '1'"), + sql(s"SELECT a, b FROM test_parquet_jt WHERE a = '1'"), Seq(Row(1, "str1")) ) } From d7e230851a34bdd80ae622a99c6cc0c2b365eae7 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Tue, 17 Feb 2015 09:43:08 -0800 Subject: [PATCH 4/7] Revert changes in HiveMetastoreCatalog.scala. --- .../org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 6805fdcc0a2c4..0e43faa8afdaf 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -208,14 +208,14 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with ParquetRelation2( paths, Map(ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json), - Some(metastoreSchema), + None, Some(partitionSpec))(hive)) } else { val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) - LogicalRelation(ParquetRelation2( + LogicalRelation( + ParquetRelation2( paths, - Map(ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json), - Some(metastoreSchema))(hive)) + Map(ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json))(hive)) } } From 3db3d207dc5e58ce0666b52a71f98bfd646a35b2 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Tue, 17 Feb 2015 10:56:49 -0800 Subject: [PATCH 5/7] Minor update. --- .../apache/spark/sql/parquet/newParquet.scala | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index c2410811ef688..3aca9aaf2c5c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -287,11 +287,16 @@ case class ParquetRelation2( } } - try { - parquetSchema = readSchema().getOrElse(maybeSchema.getOrElse(maybeMetastoreSchema.get)) - } catch { - case e => throw new SparkException(s"Failed to find schema for ${paths.mkString(",")}", e) - } + // To get the schema. We first try to get the schema defined in maybeSchema. + // If maybeSchema is not defined, we will try to get the schema from existing parquet data + // (through readSchema). If data does not exist, we will try to get the schema defined in + // maybeMetastoreSchema (defined in the options of the data source). + // Finally, if we still could not get the schema. We throw an error. + parquetSchema = + maybeSchema + .orElse(readSchema()) + .orElse(maybeMetastoreSchema) + .getOrElse(sys.error("Failed to get the schema.")) partitionKeysIncludedInParquetSchema = isPartitioned && @@ -617,7 +622,7 @@ object ParquetRelation2 { private[parquet] def readSchema( footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = { - val mergedSchema = footers.map { footer => + footers.map { footer => val metadata = footer.getParquetMetadata.getFileMetaData val parquetSchema = metadata.getSchema val maybeSparkSchema = metadata @@ -635,14 +640,11 @@ object ParquetRelation2 { sqlContext.conf.isParquetBinaryAsString, sqlContext.conf.isParquetINT96AsTimestamp)) } - }.foldLeft[StructType](null) { - case (null, right) => right - case (left, right) => try left.merge(right) catch { case e: Throwable => - throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e) - } + }.reduceOption { (left, right) => + try left.merge(right) catch { case e: Throwable => + throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e) + } } - - Option(mergedSchema) } /** From 2ac94f73783840b58c1e05110397fe52fdb92db4 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Tue, 17 Feb 2015 11:09:06 -0800 Subject: [PATCH 6/7] Update tests. --- .../sql/hive/MetastoreDataSourcesSuite.scala | 38 +++++++++ .../spark/sql/parquet/parquetSuites.scala | 77 ++++++++++++------- 2 files changed, 86 insertions(+), 29 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 0263e3bb56617..732964a53d822 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive import java.io.File + import org.scalatest.BeforeAndAfterEach import org.apache.commons.io.FileUtils @@ -30,6 +31,8 @@ import org.apache.spark.util.Utils import org.apache.spark.sql.types._ import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ +import org.apache.spark.sql.parquet.ParquetRelation2 +import org.apache.spark.sql.sources.LogicalRelation /** * Tests for persisting tables created though the data sources API into the metastore. @@ -553,4 +556,39 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { sql("DROP TABLE savedJsonTable") conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource) } + + if (HiveShim.version == "0.13.1") { + test("scan a parquet table created through a CTAS statement") { + val originalConvertMetastore = getConf("spark.sql.hive.convertMetastoreParquet", "true") + val originalUseDataSource = getConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true") + setConf("spark.sql.hive.convertMetastoreParquet", "true") + setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true") + + val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) + jsonRDD(rdd).registerTempTable("jt") + sql( + """ + |create table test_parquet_ctas STORED AS parquET + |AS select tmp.a from jt tmp where tmp.a < 5 + """.stripMargin) + + checkAnswer( + sql(s"SELECT a FROM test_parquet_ctas WHERE a > 2 "), + Row(3) :: Row(4) :: Nil + ) + + table("test_parquet_ctas").queryExecution.analyzed match { + case LogicalRelation(p: ParquetRelation2) => // OK + case _ => + fail( + s"test_parquet_ctas should be converted to ${classOf[ParquetRelation2].getCanonicalName}") + } + + // Clenup and reset confs. + sql("DROP TABLE IF EXISTS jt") + sql("DROP TABLE IF EXISTS test_parquet_ctas") + setConf("spark.sql.hive.convertMetastoreParquet", originalConvertMetastore) + setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalUseDataSource) + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala index d619704f4f767..4130657a107b6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala @@ -20,15 +20,15 @@ package org.apache.spark.sql.parquet import java.io.File -import org.apache.spark.sql.catalyst.expressions.Row import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.{SQLConf, QueryTest} +import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.execution.PhysicalRDD import org.apache.spark.sql.hive.execution.HiveTableScan import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ - +import org.apache.spark.sql.sources.LogicalRelation // The data where the partitioning key exists only in the directory structure. case class ParquetData(intField: Int, stringField: String) @@ -122,52 +122,71 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { override def beforeAll(): Unit = { super.beforeAll() - sql(s""" - create table test_parquet - ( - intField INT, - stringField STRING - ) - ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' - STORED AS - INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' - OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' - """) - - val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) - jsonRDD(rdd).registerTempTable("jt") - sql(""" - create table test_parquet_jt ROW FORMAT - | SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' - | STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' - | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' - | AS select * from jt""".stripMargin) + sql( + """ + |create table test_parquet + |( + | intField INT, + | stringField STRING + |) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + """.stripMargin) conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true") } override def afterAll(): Unit = { super.afterAll() - sql("DROP TABLE test_parquet") - sql("DROP TABLE jt") - sql("DROP TABLE test_parquet_jt") + sql("DROP TABLE IF EXISTS test_parquet") setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString) } - test("scan from an empty parquet table") { + test("scan an empty parquet table") { checkAnswer(sql("SELECT count(*) FROM test_parquet"), Row(0)) } - test("scan from an empty parquet table with upper case") { + test("scan an empty parquet table with upper case") { checkAnswer(sql("SELECT count(INTFIELD) FROM TEST_parquet"), Row(0)) } - test("scan from an non empty parquet table #1") { + test("scan a parquet table created through a CTAS statement") { + val originalConvertMetastore = getConf("spark.sql.hive.convertMetastoreParquet", "true") + val originalUseDataSource = getConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true") + setConf("spark.sql.hive.convertMetastoreParquet", "true") + setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true") + + val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) + jsonRDD(rdd).registerTempTable("jt") + sql( + """ + |create table test_parquet_ctas ROW FORMAT + |SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + |AS select * from jt + """.stripMargin) + checkAnswer( - sql(s"SELECT a, b FROM test_parquet_jt WHERE a = '1'"), + sql(s"SELECT a, b FROM test_parquet_ctas WHERE a = 1"), Seq(Row(1, "str1")) ) + + table("test_parquet_ctas").queryExecution.analyzed match { + case LogicalRelation(p: ParquetRelation2) => // OK + case _ => + fail( + s"test_parquet_ctas should be converted to ${classOf[ParquetRelation2].getCanonicalName}") + } + + sql("DROP TABLE IF EXISTS jt") + sql("DROP TABLE IF EXISTS test_parquet_ctas") + setConf("spark.sql.hive.convertMetastoreParquet", originalConvertMetastore) + setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalUseDataSource) } } From b8b34501fb57ac33e36f55dc58a0094c8b40b68a Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Tue, 17 Feb 2015 11:23:10 -0800 Subject: [PATCH 7/7] Update tests. --- .../spark/sql/parquet/parquetSuites.scala | 70 ++++++++++++++++--- 1 file changed, 60 insertions(+), 10 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala index 4130657a107b6..653f4b47367c4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala @@ -122,6 +122,9 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { override def beforeAll(): Unit = { super.beforeAll() + val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) + jsonRDD(rdd).registerTempTable("jt") + sql( """ |create table test_parquet @@ -140,6 +143,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { override def afterAll(): Unit = { super.afterAll() + sql("DROP TABLE IF EXISTS jt") sql("DROP TABLE IF EXISTS test_parquet") setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString) @@ -153,14 +157,63 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { checkAnswer(sql("SELECT count(INTFIELD) FROM TEST_parquet"), Row(0)) } - test("scan a parquet table created through a CTAS statement") { - val originalConvertMetastore = getConf("spark.sql.hive.convertMetastoreParquet", "true") - val originalUseDataSource = getConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true") - setConf("spark.sql.hive.convertMetastoreParquet", "true") - setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true") + test("insert into an empty parquet table") { + sql( + """ + |create table test_insert_parquet + |( + | intField INT, + | stringField STRING + |) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + """.stripMargin) - val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) - jsonRDD(rdd).registerTempTable("jt") + // Insert into am empty table. + sql("insert into table test_insert_parquet select a, b from jt where jt.a > 5") + checkAnswer( + sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE intField < 8"), + Row(6, "str6") :: Row(7, "str7") :: Nil + ) + // Insert overwrite. + sql("insert overwrite table test_insert_parquet select a, b from jt where jt.a < 5") + checkAnswer( + sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE intField > 2"), + Row(3, "str3") :: Row(4, "str4") :: Nil + ) + sql("DROP TABLE IF EXISTS test_insert_parquet") + + // Create it again. + sql( + """ + |create table test_insert_parquet + |( + | intField INT, + | stringField STRING + |) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + """.stripMargin) + // Insert overwrite an empty table. + sql("insert overwrite table test_insert_parquet select a, b from jt where jt.a < 5") + checkAnswer( + sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE intField > 2"), + Row(3, "str3") :: Row(4, "str4") :: Nil + ) + // Insert into the table. + sql("insert into table test_insert_parquet select a, b from jt") + checkAnswer( + sql(s"SELECT intField, stringField FROM test_insert_parquet"), + (1 to 10).map(i => Row(i, s"str$i")) ++ (1 to 4).map(i => Row(i, s"str$i")) + ) + sql("DROP TABLE IF EXISTS test_insert_parquet") + } + + test("scan a parquet table created through a CTAS statement") { sql( """ |create table test_parquet_ctas ROW FORMAT @@ -183,10 +236,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { s"test_parquet_ctas should be converted to ${classOf[ParquetRelation2].getCanonicalName}") } - sql("DROP TABLE IF EXISTS jt") sql("DROP TABLE IF EXISTS test_parquet_ctas") - setConf("spark.sql.hive.convertMetastoreParquet", originalConvertMetastore) - setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalUseDataSource) } }