Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,16 @@ case class ParquetRelation2(
}
}

parquetSchema = maybeSchema.getOrElse(readSchema())
// To get the schema. We first try to get the schema defined in maybeSchema.
// If maybeSchema is not defined, we will try to get the schema from existing parquet data
// (through readSchema). If data does not exist, we will try to get the schema defined in
// maybeMetastoreSchema (defined in the options of the data source).
// Finally, if we still could not get the schema. We throw an error.
parquetSchema =
maybeSchema
.orElse(readSchema())
.orElse(maybeMetastoreSchema)
.getOrElse(sys.error("Failed to get the schema."))

partitionKeysIncludedInParquetSchema =
isPartitioned &&
Expand All @@ -308,7 +317,7 @@ case class ParquetRelation2(
}
}

private def readSchema(): StructType = {
private def readSchema(): Option[StructType] = {
// Sees which file(s) we need to touch in order to figure out the schema.
val filesToTouch =
// Always tries the summary files first if users don't require a merged schema. In this case,
Expand Down Expand Up @@ -611,7 +620,8 @@ object ParquetRelation2 {
// internally.
private[sql] val METASTORE_SCHEMA = "metastoreSchema"

private[parquet] def readSchema(footers: Seq[Footer], sqlContext: SQLContext): StructType = {
private[parquet] def readSchema(
footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
footers.map { footer =>
val metadata = footer.getParquetMetadata.getFileMetaData
val parquetSchema = metadata.getSchema
Expand All @@ -630,7 +640,7 @@ object ParquetRelation2 {
sqlContext.conf.isParquetBinaryAsString,
sqlContext.conf.isParquetINT96AsTimestamp))
}
}.reduce { (left, right) =>
}.reduceOption { (left, right) =>
try left.merge(right) catch { case e: Throwable =>
throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hive

import java.io.File

import org.scalatest.BeforeAndAfterEach

import org.apache.commons.io.FileUtils
Expand All @@ -30,6 +31,8 @@ import org.apache.spark.util.Utils
import org.apache.spark.sql.types._
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.parquet.ParquetRelation2
import org.apache.spark.sql.sources.LogicalRelation

/**
* Tests for persisting tables created though the data sources API into the metastore.
Expand Down Expand Up @@ -553,4 +556,39 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
sql("DROP TABLE savedJsonTable")
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource)
}

if (HiveShim.version == "0.13.1") {
test("scan a parquet table created through a CTAS statement") {
val originalConvertMetastore = getConf("spark.sql.hive.convertMetastoreParquet", "true")
val originalUseDataSource = getConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
setConf("spark.sql.hive.convertMetastoreParquet", "true")
setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")

val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
jsonRDD(rdd).registerTempTable("jt")
sql(
"""
|create table test_parquet_ctas STORED AS parquET
|AS select tmp.a from jt tmp where tmp.a < 5
""".stripMargin)

checkAnswer(
sql(s"SELECT a FROM test_parquet_ctas WHERE a > 2 "),
Row(3) :: Row(4) :: Nil
)

table("test_parquet_ctas").queryExecution.analyzed match {
case LogicalRelation(p: ParquetRelation2) => // OK
case _ =>
fail(
s"test_parquet_ctas should be converted to ${classOf[ParquetRelation2].getCanonicalName}")
}

// Clenup and reset confs.
sql("DROP TABLE IF EXISTS jt")
sql("DROP TABLE IF EXISTS test_parquet_ctas")
setConf("spark.sql.hive.convertMetastoreParquet", originalConvertMetastore)
setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalUseDataSource)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ package org.apache.spark.sql.parquet

import java.io.File

import org.apache.spark.sql.catalyst.expressions.Row
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql.{SQLConf, QueryTest}
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.execution.PhysicalRDD
import org.apache.spark.sql.hive.execution.HiveTableScan
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._

import org.apache.spark.sql.sources.LogicalRelation

// The data where the partitioning key exists only in the directory structure.
case class ParquetData(intField: Int, stringField: String)
Expand Down Expand Up @@ -121,13 +121,123 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {

override def beforeAll(): Unit = {
super.beforeAll()

val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
jsonRDD(rdd).registerTempTable("jt")

sql(
"""
|create table test_parquet
|(
| intField INT,
| stringField STRING
|)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
|STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
""".stripMargin)

conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
}

override def afterAll(): Unit = {
super.afterAll()
sql("DROP TABLE IF EXISTS jt")
sql("DROP TABLE IF EXISTS test_parquet")

setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
}

test("scan an empty parquet table") {
checkAnswer(sql("SELECT count(*) FROM test_parquet"), Row(0))
}

test("scan an empty parquet table with upper case") {
checkAnswer(sql("SELECT count(INTFIELD) FROM TEST_parquet"), Row(0))
}

test("insert into an empty parquet table") {
sql(
"""
|create table test_insert_parquet
|(
| intField INT,
| stringField STRING
|)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
|STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
""".stripMargin)

// Insert into am empty table.
sql("insert into table test_insert_parquet select a, b from jt where jt.a > 5")
checkAnswer(
sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE intField < 8"),
Row(6, "str6") :: Row(7, "str7") :: Nil
)
// Insert overwrite.
sql("insert overwrite table test_insert_parquet select a, b from jt where jt.a < 5")
checkAnswer(
sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE intField > 2"),
Row(3, "str3") :: Row(4, "str4") :: Nil
)
sql("DROP TABLE IF EXISTS test_insert_parquet")

// Create it again.
sql(
"""
|create table test_insert_parquet
|(
| intField INT,
| stringField STRING
|)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
|STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
""".stripMargin)
// Insert overwrite an empty table.
sql("insert overwrite table test_insert_parquet select a, b from jt where jt.a < 5")
checkAnswer(
sql(s"SELECT intField, stringField FROM test_insert_parquet WHERE intField > 2"),
Row(3, "str3") :: Row(4, "str4") :: Nil
)
// Insert into the table.
sql("insert into table test_insert_parquet select a, b from jt")
checkAnswer(
sql(s"SELECT intField, stringField FROM test_insert_parquet"),
(1 to 10).map(i => Row(i, s"str$i")) ++ (1 to 4).map(i => Row(i, s"str$i"))
)
sql("DROP TABLE IF EXISTS test_insert_parquet")
}

test("scan a parquet table created through a CTAS statement") {
sql(
"""
|create table test_parquet_ctas ROW FORMAT
|SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
|STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
|AS select * from jt
""".stripMargin)

checkAnswer(
sql(s"SELECT a, b FROM test_parquet_ctas WHERE a = 1"),
Seq(Row(1, "str1"))
)

table("test_parquet_ctas").queryExecution.analyzed match {
case LogicalRelation(p: ParquetRelation2) => // OK
case _ =>
fail(
s"test_parquet_ctas should be converted to ${classOf[ParquetRelation2].getCanonicalName}")
}

sql("DROP TABLE IF EXISTS test_parquet_ctas")
}
}

class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {
Expand Down