Skip to content

Commit f02394d

Browse files
yhuailiancheng
authored andcommitted
[SPARK-6023][SQL] ParquetConversions fails to replace the destination MetastoreRelation of an InsertIntoTable node to ParquetRelation2
JIRA: https://issues.apache.org/jira/browse/SPARK-6023 Author: Yin Huai <[email protected]> Closes #4782 from yhuai/parquetInsertInto and squashes the following commits: ae7e806 [Yin Huai] Convert MetastoreRelation in InsertIntoTable and InsertIntoHiveTable. ba543cd [Yin Huai] More tests. 50b6d0f [Yin Huai] Update error messages. 346780c [Yin Huai] Failed test.
1 parent 51a6f90 commit f02394d

File tree

2 files changed

+152
-7
lines changed

2 files changed

+152
-7
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,17 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
440440
val attributedRewrites = relation.output.zip(parquetRelation.output)
441441
(relation, parquetRelation, attributedRewrites)
442442

443+
// Write path
444+
case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _)
445+
// Inserting into partitioned table is not supported in Parquet data source (yet).
446+
if !relation.hiveQlTable.isPartitioned &&
447+
hive.convertMetastoreParquet &&
448+
hive.conf.parquetUseDataSourceApi &&
449+
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
450+
val parquetRelation = convertToParquetRelation(relation)
451+
val attributedRewrites = relation.output.zip(parquetRelation.output)
452+
(relation, parquetRelation, attributedRewrites)
453+
443454
// Read path
444455
case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
445456
if hive.convertMetastoreParquet &&
@@ -464,6 +475,16 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
464475

465476
withAlias
466477
}
478+
case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite)
479+
if relationMap.contains(r) => {
480+
val parquetRelation = relationMap(r)
481+
InsertIntoTable(parquetRelation, partition, child, overwrite)
482+
}
483+
case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite)
484+
if relationMap.contains(r) => {
485+
val parquetRelation = relationMap(r)
486+
InsertIntoTable(parquetRelation, partition, child, overwrite)
487+
}
467488
case other => other.transformExpressions {
468489
case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
469490
}

sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala

Lines changed: 131 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ import org.scalatest.BeforeAndAfterAll
2424

2525
import org.apache.spark.sql.{SQLConf, QueryTest}
2626
import org.apache.spark.sql.catalyst.expressions.Row
27-
import org.apache.spark.sql.execution.PhysicalRDD
28-
import org.apache.spark.sql.hive.execution.HiveTableScan
27+
import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
28+
import org.apache.spark.sql.hive.execution.{InsertIntoHiveTable, HiveTableScan}
2929
import org.apache.spark.sql.hive.test.TestHive._
3030
import org.apache.spark.sql.hive.test.TestHive.implicits._
31-
import org.apache.spark.sql.sources.LogicalRelation
31+
import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation}
3232

3333
// The data where the partitioning key exists only in the directory structure.
3434
case class ParquetData(intField: Int, stringField: String)
@@ -93,13 +93,20 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
9393
sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)")
9494
}
9595

96+
val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
97+
jsonRDD(rdd1).registerTempTable("jt")
98+
val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":[$i, null]}"""))
99+
jsonRDD(rdd2).registerTempTable("jt_array")
100+
96101
setConf("spark.sql.hive.convertMetastoreParquet", "true")
97102
}
98103

99104
override def afterAll(): Unit = {
100105
sql("DROP TABLE partitioned_parquet")
101106
sql("DROP TABLE partitioned_parquet_with_key")
102107
sql("DROP TABLE normal_parquet")
108+
sql("DROP TABLE IF EXISTS jt")
109+
sql("DROP TABLE IF EXISTS jt_array")
103110
setConf("spark.sql.hive.convertMetastoreParquet", "false")
104111
}
105112

@@ -122,9 +129,6 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
122129
override def beforeAll(): Unit = {
123130
super.beforeAll()
124131

125-
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
126-
jsonRDD(rdd).registerTempTable("jt")
127-
128132
sql(
129133
"""
130134
|create table test_parquet
@@ -143,7 +147,6 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
143147

144148
override def afterAll(): Unit = {
145149
super.afterAll()
146-
sql("DROP TABLE IF EXISTS jt")
147150
sql("DROP TABLE IF EXISTS test_parquet")
148151

149152
setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
@@ -238,6 +241,70 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
238241

239242
sql("DROP TABLE IF EXISTS test_parquet_ctas")
240243
}
244+
245+
test("MetastoreRelation in InsertIntoTable will be converted") {
246+
sql(
247+
"""
248+
|create table test_insert_parquet
249+
|(
250+
| intField INT
251+
|)
252+
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
253+
|STORED AS
254+
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
255+
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
256+
""".stripMargin)
257+
258+
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
259+
df.queryExecution.executedPlan match {
260+
case ExecutedCommand(
261+
InsertIntoDataSource(
262+
LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK
263+
case o => fail("test_insert_parquet should be converted to a " +
264+
s"${classOf[ParquetRelation2].getCanonicalName} and " +
265+
s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." +
266+
s"However, found a ${o.toString} ")
267+
}
268+
269+
checkAnswer(
270+
sql("SELECT intField FROM test_insert_parquet WHERE test_insert_parquet.intField > 5"),
271+
sql("SELECT a FROM jt WHERE jt.a > 5").collect()
272+
)
273+
274+
sql("DROP TABLE IF EXISTS test_insert_parquet")
275+
}
276+
277+
test("MetastoreRelation in InsertIntoHiveTable will be converted") {
278+
sql(
279+
"""
280+
|create table test_insert_parquet
281+
|(
282+
| int_array array<int>
283+
|)
284+
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
285+
|STORED AS
286+
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
287+
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
288+
""".stripMargin)
289+
290+
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
291+
df.queryExecution.executedPlan match {
292+
case ExecutedCommand(
293+
InsertIntoDataSource(
294+
LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK
295+
case o => fail("test_insert_parquet should be converted to a " +
296+
s"${classOf[ParquetRelation2].getCanonicalName} and " +
297+
s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." +
298+
s"However, found a ${o.toString} ")
299+
}
300+
301+
checkAnswer(
302+
sql("SELECT int_array FROM test_insert_parquet"),
303+
sql("SELECT a FROM jt_array").collect()
304+
)
305+
306+
sql("DROP TABLE IF EXISTS test_insert_parquet")
307+
}
241308
}
242309

243310
class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {
@@ -252,6 +319,63 @@ class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {
252319
super.afterAll()
253320
setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
254321
}
322+
323+
test("MetastoreRelation in InsertIntoTable will not be converted") {
324+
sql(
325+
"""
326+
|create table test_insert_parquet
327+
|(
328+
| intField INT
329+
|)
330+
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
331+
|STORED AS
332+
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
333+
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
334+
""".stripMargin)
335+
336+
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
337+
df.queryExecution.executedPlan match {
338+
case insert: InsertIntoHiveTable => // OK
339+
case o => fail(s"The SparkPlan should be ${classOf[InsertIntoHiveTable].getCanonicalName}. " +
340+
s"However, found ${o.toString}.")
341+
}
342+
343+
checkAnswer(
344+
sql("SELECT intField FROM test_insert_parquet WHERE test_insert_parquet.intField > 5"),
345+
sql("SELECT a FROM jt WHERE jt.a > 5").collect()
346+
)
347+
348+
sql("DROP TABLE IF EXISTS test_insert_parquet")
349+
}
350+
351+
// TODO: enable it after the fix of SPARK-5950.
352+
ignore("MetastoreRelation in InsertIntoHiveTable will not be converted") {
353+
sql(
354+
"""
355+
|create table test_insert_parquet
356+
|(
357+
| int_array array<int>
358+
|)
359+
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
360+
|STORED AS
361+
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
362+
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
363+
""".stripMargin)
364+
365+
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
366+
df.queryExecution.executedPlan match {
367+
case insert: InsertIntoHiveTable => // OK
368+
case o => fail(s"The SparkPlan should be ${classOf[InsertIntoHiveTable].getCanonicalName}. " +
369+
s"However, found ${o.toString}.")
370+
}
371+
372+
checkAnswer(
373+
sql("SELECT int_array FROM test_insert_parquet"),
374+
sql("SELECT a FROM jt_array").collect()
375+
)
376+
377+
sql("DROP TABLE IF EXISTS test_insert_parquet")
378+
}
255379
}
256380

257381
/**

0 commit comments

Comments
 (0)