From 346780c79c908d70788499159dd89821bc89e032 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Wed, 25 Feb 2015 21:24:41 -0800 Subject: [PATCH 1/4] Failed test. --- .../spark/sql/parquet/parquetSuites.scala | 70 +++++++++++++++++-- 1 file changed, 63 insertions(+), 7 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala index 653f4b47367c4..f84d332795d20 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala @@ -24,11 +24,11 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.{SQLConf, QueryTest} import org.apache.spark.sql.catalyst.expressions.Row -import org.apache.spark.sql.execution.PhysicalRDD -import org.apache.spark.sql.hive.execution.HiveTableScan +import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD} +import org.apache.spark.sql.hive.execution.{InsertIntoHiveTable, HiveTableScan} import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ -import org.apache.spark.sql.sources.LogicalRelation +import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation} // The data where the partitioning key exists only in the directory structure. case class ParquetData(intField: Int, stringField: String) @@ -93,6 +93,9 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest { sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)") } + val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) + jsonRDD(rdd).registerTempTable("jt") + setConf("spark.sql.hive.convertMetastoreParquet", "true") } @@ -100,6 +103,7 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest { sql("DROP TABLE partitioned_parquet") sql("DROP TABLE partitioned_parquet_with_key") sql("DROP TABLE normal_parquet") + sql("DROP TABLE IF EXISTS jt") setConf("spark.sql.hive.convertMetastoreParquet", "false") } @@ -122,9 +126,6 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { override def beforeAll(): Unit = { super.beforeAll() - val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) - jsonRDD(rdd).registerTempTable("jt") - sql( """ |create table test_parquet @@ -143,7 +144,6 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { override def afterAll(): Unit = { super.afterAll() - sql("DROP TABLE IF EXISTS jt") sql("DROP TABLE IF EXISTS test_parquet") setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString) @@ -238,6 +238,35 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { sql("DROP TABLE IF EXISTS test_parquet_ctas") } + + test("MetastoreRelation in insert into will be converted") { + sql( + """ + |create table test_insert_parquet + |( + | intField INT + |) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + """.stripMargin) + + val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt") + df.queryExecution.executedPlan match { + case ExecutedCommand( + InsertIntoDataSource( + LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK + case _ => fail("test_parquet_ctas should be converted to a data source relation.") + } + + checkAnswer( + sql("SELECT intField FROM test_insert_parquet WHERE test_insert_parquet.intField > 5"), + sql("SELECT a FROM jt WHERE jt.a > 5").collect() + ) + + sql("DROP TABLE IF EXISTS test_insert_parquet") + } } class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase { @@ -252,6 +281,33 @@ class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase { super.afterAll() setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString) } + + test("MetastoreRelation in insert into will not be converted") { + sql( + """ + |create table test_insert_parquet + |( + | intField INT + |) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + """.stripMargin) + + val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt") + df.queryExecution.executedPlan match { + case insert: InsertIntoHiveTable => // OK + case _ => fail("test_parquet_ctas should not be converted to a data source relation.") + } + + checkAnswer( + sql("SELECT intField FROM test_insert_parquet WHERE test_insert_parquet.intField > 5"), + sql("SELECT a FROM jt WHERE jt.a > 5").collect() + ) + + sql("DROP TABLE IF EXISTS test_insert_parquet") + } } /** From 50b6d0f0b8ee3296c470333473a49813974004ae Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Wed, 25 Feb 2015 21:35:33 -0800 Subject: [PATCH 2/4] Update error messages. --- .../org/apache/spark/sql/parquet/parquetSuites.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala index f84d332795d20..ae370bdb8aa4d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala @@ -257,7 +257,10 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { case ExecutedCommand( InsertIntoDataSource( LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK - case _ => fail("test_parquet_ctas should be converted to a data source relation.") + case o => fail("test_insert_parquet should be converted to a " + + s"${classOf[ParquetRelation2].getCanonicalName} and " + + s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." + + s"However, found a ${o.toString} ") } checkAnswer( @@ -298,7 +301,8 @@ class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase { val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt") df.queryExecution.executedPlan match { case insert: InsertIntoHiveTable => // OK - case _ => fail("test_parquet_ctas should not be converted to a data source relation.") + case o => fail(s"The SparkPlan should be ${classOf[InsertIntoHiveTable].getCanonicalName}. " + + s"However, found ${o.toString}.") } checkAnswer( From ba543cd09085a41d1b2a6d19dc6076c722da7ec5 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Wed, 25 Feb 2015 21:52:34 -0800 Subject: [PATCH 3/4] More tests. --- .../spark/sql/parquet/parquetSuites.scala | 72 +++++++++++++++++-- 1 file changed, 68 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala index ae370bdb8aa4d..80fd5cda20e20 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala @@ -93,8 +93,10 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest { sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)") } - val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) - jsonRDD(rdd).registerTempTable("jt") + val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) + jsonRDD(rdd1).registerTempTable("jt") + val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":[$i, null]}""")) + jsonRDD(rdd2).registerTempTable("jt_array") setConf("spark.sql.hive.convertMetastoreParquet", "true") } @@ -104,6 +106,7 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest { sql("DROP TABLE partitioned_parquet_with_key") sql("DROP TABLE normal_parquet") sql("DROP TABLE IF EXISTS jt") + sql("DROP TABLE IF EXISTS jt_array") setConf("spark.sql.hive.convertMetastoreParquet", "false") } @@ -239,7 +242,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { sql("DROP TABLE IF EXISTS test_parquet_ctas") } - test("MetastoreRelation in insert into will be converted") { + test("MetastoreRelation in InsertIntoTable will be converted") { sql( """ |create table test_insert_parquet @@ -270,6 +273,38 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { sql("DROP TABLE IF EXISTS test_insert_parquet") } + + test("MetastoreRelation in InsertIntoHiveTable will be converted") { + sql( + """ + |create table test_insert_parquet + |( + | int_array array + |) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + """.stripMargin) + + val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array") + df.queryExecution.executedPlan match { + case ExecutedCommand( + InsertIntoDataSource( + LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK + case o => fail("test_insert_parquet should be converted to a " + + s"${classOf[ParquetRelation2].getCanonicalName} and " + + s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." + + s"However, found a ${o.toString} ") + } + + checkAnswer( + sql("SELECT int_array FROM test_insert_parquet"), + sql("SELECT a FROM jt_array").collect() + ) + + sql("DROP TABLE IF EXISTS test_insert_parquet") + } } class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase { @@ -285,7 +320,7 @@ class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase { setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString) } - test("MetastoreRelation in insert into will not be converted") { + test("MetastoreRelation in InsertIntoTable will not be converted") { sql( """ |create table test_insert_parquet @@ -312,6 +347,35 @@ class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase { sql("DROP TABLE IF EXISTS test_insert_parquet") } + + // TODO: enable it after the fix of SPARK-5950. + ignore("MetastoreRelation in InsertIntoHiveTable will not be converted") { + sql( + """ + |create table test_insert_parquet + |( + | int_array array + |) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + """.stripMargin) + + val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array") + df.queryExecution.executedPlan match { + case insert: InsertIntoHiveTable => // OK + case o => fail(s"The SparkPlan should be ${classOf[InsertIntoHiveTable].getCanonicalName}. " + + s"However, found ${o.toString}.") + } + + checkAnswer( + sql("SELECT int_array FROM test_insert_parquet"), + sql("SELECT a FROM jt_array").collect() + ) + + sql("DROP TABLE IF EXISTS test_insert_parquet") + } } /** From ae7e80692ba5de84d7d4c5e367450f1d5f8d6dfa Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Wed, 25 Feb 2015 21:52:39 -0800 Subject: [PATCH 4/4] Convert MetastoreRelation in InsertIntoTable and InsertIntoHiveTable. --- .../spark/sql/hive/HiveMetastoreCatalog.scala | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 2cc8d65d3cb79..8af5a4848fd44 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -440,6 +440,17 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val attributedRewrites = relation.output.zip(parquetRelation.output) (relation, parquetRelation, attributedRewrites) + // Write path + case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _) + // Inserting into partitioned table is not supported in Parquet data source (yet). + if !relation.hiveQlTable.isPartitioned && + hive.convertMetastoreParquet && + hive.conf.parquetUseDataSourceApi && + relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => + val parquetRelation = convertToParquetRelation(relation) + val attributedRewrites = relation.output.zip(parquetRelation.output) + (relation, parquetRelation, attributedRewrites) + // Read path case p @ PhysicalOperation(_, _, relation: MetastoreRelation) if hive.convertMetastoreParquet && @@ -464,6 +475,16 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with withAlias } + case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite) + if relationMap.contains(r) => { + val parquetRelation = relationMap(r) + InsertIntoTable(parquetRelation, partition, child, overwrite) + } + case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite) + if relationMap.contains(r) => { + val parquetRelation = relationMap(r) + InsertIntoTable(parquetRelation, partition, child, overwrite) + } case other => other.transformExpressions { case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a) }