Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,17 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val attributedRewrites = relation.output.zip(parquetRelation.output)
(relation, parquetRelation, attributedRewrites)

// Write path
case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _)
// Inserting into partitioned table is not supported in Parquet data source (yet).
if !relation.hiveQlTable.isPartitioned &&
hive.convertMetastoreParquet &&
hive.conf.parquetUseDataSourceApi &&
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
(relation, parquetRelation, attributedRewrites)

// Read path
case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
if hive.convertMetastoreParquet &&
Expand All @@ -464,6 +475,16 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with

withAlias
}
case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite)
if relationMap.contains(r) => {
val parquetRelation = relationMap(r)
InsertIntoTable(parquetRelation, partition, child, overwrite)
}
case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite)
if relationMap.contains(r) => {
val parquetRelation = relationMap(r)
InsertIntoTable(parquetRelation, partition, child, overwrite)
}
case other => other.transformExpressions {
case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql.{SQLConf, QueryTest}
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.execution.PhysicalRDD
import org.apache.spark.sql.hive.execution.HiveTableScan
import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
import org.apache.spark.sql.hive.execution.{InsertIntoHiveTable, HiveTableScan}
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.sources.LogicalRelation
import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation}

// The data where the partitioning key exists only in the directory structure.
case class ParquetData(intField: Int, stringField: String)
Expand Down Expand Up @@ -93,13 +93,20 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)")
}

val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
jsonRDD(rdd1).registerTempTable("jt")
val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":[$i, null]}"""))
jsonRDD(rdd2).registerTempTable("jt_array")

setConf("spark.sql.hive.convertMetastoreParquet", "true")
}

override def afterAll(): Unit = {
sql("DROP TABLE partitioned_parquet")
sql("DROP TABLE partitioned_parquet_with_key")
sql("DROP TABLE normal_parquet")
sql("DROP TABLE IF EXISTS jt")
sql("DROP TABLE IF EXISTS jt_array")
setConf("spark.sql.hive.convertMetastoreParquet", "false")
}

Expand All @@ -122,9 +129,6 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
override def beforeAll(): Unit = {
super.beforeAll()

val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
jsonRDD(rdd).registerTempTable("jt")

sql(
"""
|create table test_parquet
Expand All @@ -143,7 +147,6 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {

override def afterAll(): Unit = {
super.afterAll()
sql("DROP TABLE IF EXISTS jt")
sql("DROP TABLE IF EXISTS test_parquet")

setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
Expand Down Expand Up @@ -238,6 +241,70 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {

sql("DROP TABLE IF EXISTS test_parquet_ctas")
}

test("MetastoreRelation in InsertIntoTable will be converted") {
sql(
"""
|create table test_insert_parquet
|(
| intField INT
|)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
|STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
""".stripMargin)

val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
df.queryExecution.executedPlan match {
case ExecutedCommand(
InsertIntoDataSource(
LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK
case o => fail("test_insert_parquet should be converted to a " +
s"${classOf[ParquetRelation2].getCanonicalName} and " +
s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." +
s"However, found a ${o.toString} ")
}

checkAnswer(
sql("SELECT intField FROM test_insert_parquet WHERE test_insert_parquet.intField > 5"),
sql("SELECT a FROM jt WHERE jt.a > 5").collect()
)

sql("DROP TABLE IF EXISTS test_insert_parquet")
}

test("MetastoreRelation in InsertIntoHiveTable will be converted") {
sql(
"""
|create table test_insert_parquet
|(
| int_array array<int>
|)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
|STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
""".stripMargin)

val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
df.queryExecution.executedPlan match {
case ExecutedCommand(
InsertIntoDataSource(
LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK
case o => fail("test_insert_parquet should be converted to a " +
s"${classOf[ParquetRelation2].getCanonicalName} and " +
s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." +
s"However, found a ${o.toString} ")
}

checkAnswer(
sql("SELECT int_array FROM test_insert_parquet"),
sql("SELECT a FROM jt_array").collect()
)

sql("DROP TABLE IF EXISTS test_insert_parquet")
}
}

class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {
Expand All @@ -252,6 +319,63 @@ class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {
super.afterAll()
setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
}

test("MetastoreRelation in InsertIntoTable will not be converted") {
sql(
"""
|create table test_insert_parquet
|(
| intField INT
|)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
|STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
""".stripMargin)

val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
df.queryExecution.executedPlan match {
case insert: InsertIntoHiveTable => // OK
case o => fail(s"The SparkPlan should be ${classOf[InsertIntoHiveTable].getCanonicalName}. " +
s"However, found ${o.toString}.")
}

checkAnswer(
sql("SELECT intField FROM test_insert_parquet WHERE test_insert_parquet.intField > 5"),
sql("SELECT a FROM jt WHERE jt.a > 5").collect()
)

sql("DROP TABLE IF EXISTS test_insert_parquet")
}

// TODO: enable it after the fix of SPARK-5950.
ignore("MetastoreRelation in InsertIntoHiveTable will not be converted") {
sql(
"""
|create table test_insert_parquet
|(
| int_array array<int>
|)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
|STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
""".stripMargin)

val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
df.queryExecution.executedPlan match {
case insert: InsertIntoHiveTable => // OK
case o => fail(s"The SparkPlan should be ${classOf[InsertIntoHiveTable].getCanonicalName}. " +
s"However, found ${o.toString}.")
}

checkAnswer(
sql("SELECT int_array FROM test_insert_parquet"),
sql("SELECT a FROM jt_array").collect()
)

sql("DROP TABLE IF EXISTS test_insert_parquet")
}
}

/**
Expand Down