Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
import java.io.IOException
import java.util

import scala.collection.mutable
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -218,9 +219,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// table location for tables in default database, while we expect to use the location of
// default database.
storage = tableDefinition.storage.copy(locationUri = tableLocation),
// Here we follow data source tables and put table metadata like provider, schema, etc. in
// table properties, so that we can work around the Hive metastore issue about not case
// preserving and make Hive serde table support mixed-case column names.
// Here we follow data source tables and put table metadata like table schema, partition
// columns etc. in table properties, so that we can work around the Hive metastore issue
// about not case preserving and make Hive serde table support mixed-case column names.
properties = tableDefinition.properties ++ tableMetaToTableProps(tableDefinition))
client.createTable(tableWithDataSourceProps, ignoreIfExists)
} else {
Expand All @@ -232,17 +233,26 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}

private def createDataSourceTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = {
// data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`.
val provider = table.provider.get

// To work around some hive metastore issues, e.g. not case-preserving, bad decimal type
// support, no column nullability, etc., we should do some extra works before saving table
// metadata into Hive metastore:
// 1. Put table metadata like provider, schema, etc. in table properties.
// 1. Put table metadata like table schema, partition columns, etc. in table properties.
// 2. Check if this table is hive compatible.
// 2.1 If it's not hive compatible, set location URI, schema, partition columns and bucket
// spec to empty and save table metadata to Hive.
// 2.2 If it's hive compatible, set serde information in table metadata and try to save
// it to Hive. If it fails, treat it as not hive compatible and go back to 2.1
val tableProperties = tableMetaToTableProps(table)

// put table provider and partition provider in table properties.
tableProperties.put(DATASOURCE_PROVIDER, provider)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we putting the provider name in the table properties here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we store the provider in the code path for both data source and hive serde tables. Now I move it to the data source table only code path.

if (table.tracksPartitionsInCatalog) {
tableProperties.put(TABLE_PARTITION_PROVIDER, TABLE_PARTITION_PROVIDER_CATALOG)
}

// Ideally we should also put `locationUri` in table properties like provider, schema, etc.
// However, in older version of Spark we already store table location in storage properties
// with key "path". Here we keep this behaviour for backward compatibility.
Expand Down Expand Up @@ -289,7 +299,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}

val qualifiedTableName = table.identifier.quotedString
val maybeSerde = HiveSerDe.sourceToSerDe(table.provider.get)
val maybeSerde = HiveSerDe.sourceToSerDe(provider)
val skipHiveMetadata = table.storage.properties
.getOrElse("skipHiveMetadata", "false").toBoolean

Expand All @@ -314,7 +324,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
(Some(newHiveCompatibleMetastoreTable(serde)), message)

case _ =>
val provider = table.provider.get
val message =
s"Couldn't find corresponding Hive SerDe for data source provider $provider. " +
s"Persisting data source table $qualifiedTableName into Hive metastore in " +
Expand Down Expand Up @@ -348,21 +357,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
/**
* Data source tables may be non Hive compatible and we need to store table metadata in table
* properties to workaround some Hive metastore limitations.
* This method puts table provider, partition provider, schema, partition column names, bucket
* specification into a map, which can be used as table properties later.
* This method puts table schema, partition column names, bucket specification into a map, which
* can be used as table properties later.
*/
private def tableMetaToTableProps(table: CatalogTable): scala.collection.Map[String, String] = {
// data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`.
val provider = table.provider.get
private def tableMetaToTableProps(table: CatalogTable): mutable.Map[String, String] = {
val partitionColumns = table.partitionColumnNames
val bucketSpec = table.bucketSpec

val properties = new scala.collection.mutable.HashMap[String, String]
properties.put(DATASOURCE_PROVIDER, provider)
if (table.tracksPartitionsInCatalog) {
properties.put(TABLE_PARTITION_PROVIDER, TABLE_PARTITION_PROVIDER_CATALOG)
}

val properties = new mutable.HashMap[String, String]
// Serialized JSON schema string may be too long to be stored into a single metastore table
// property. In this case, we split the JSON string and store each part as a separate table
// property.
Expand Down Expand Up @@ -616,14 +618,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat

if (table.tableType != VIEW) {
table.properties.get(DATASOURCE_PROVIDER) match {
// No provider in table properties, which means this table is created by Spark prior to 2.1,
// or is created at Hive side.
// No provider in table properties, which means this is a Hive serde table.
case None =>
table = table.copy(
provider = Some(DDLUtils.HIVE_PROVIDER), tracksPartitionsInCatalog = true)

// This is a Hive serde table created by Spark 2.1 or higher versions.
case Some(DDLUtils.HIVE_PROVIDER) =>
table = restoreHiveSerdeTable(table)

// This is a regular data source table.
Expand All @@ -636,7 +632,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val statsProps = table.properties.filterKeys(_.startsWith(STATISTICS_PREFIX))

if (statsProps.nonEmpty) {
val colStats = new scala.collection.mutable.HashMap[String, ColumnStat]
val colStats = new mutable.HashMap[String, ColumnStat]

// For each column, recover its column stats. Note that this is currently a O(n^2) operation,
// but given the number of columns it usually not enormous, this is probably OK as a start.
Expand Down Expand Up @@ -673,21 +669,27 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
provider = Some(DDLUtils.HIVE_PROVIDER),
tracksPartitionsInCatalog = true)

val schemaFromTableProps = getSchemaFromTableProperties(table)
if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, table.schema)) {
hiveTable.copy(
schema = schemaFromTableProps,
partitionColumnNames = getPartitionColumnsFromTableProperties(table),
bucketSpec = getBucketSpecFromTableProperties(table))
// If this is a Hive serde table created by Spark 2.1 or higher versions, we should restore its
// schema from table properties.
if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) {
val schemaFromTableProps = getSchemaFromTableProperties(table)
if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, table.schema)) {
hiveTable.copy(
schema = schemaFromTableProps,
partitionColumnNames = getPartitionColumnsFromTableProperties(table),
bucketSpec = getBucketSpecFromTableProperties(table))
} else {
// Hive metastore may change the table schema, e.g. schema inference. If the table
// schema we read back is different(ignore case and nullability) from the one in table
// properties which was written when creating table, we should respect the table schema
// from hive.
logWarning(s"The table schema given by Hive metastore(${table.schema.simpleString}) is " +
"different from the schema when this table was created by Spark SQL" +
s"(${schemaFromTableProps.simpleString}). We have to fall back to the table schema " +
"from Hive metastore which is not case preserving.")
hiveTable
}
} else {
// Hive metastore may change the table schema, e.g. schema inference. If the table
// schema we read back is different(ignore case and nullability) from the one in table
// properties which was written when creating table, we should respect the table schema
// from hive.
logWarning(s"The table schema given by Hive metastore(${table.schema.simpleString}) is " +
"different from the schema when this table was created by Spark SQL" +
s"(${schemaFromTableProps.simpleString}). We have to fall back to the table schema from " +
"Hive metastore which is not case preserving.")
hiveTable
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ package org.apache.spark.sql.hive
import org.apache.hadoop.conf.Configuration

import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.types.StructType

/**
* Test suite for the [[HiveExternalCatalog]].
Expand Down Expand Up @@ -52,4 +55,19 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
assert(selectedPartitions.length == 1)
assert(selectedPartitions.head.spec == part1.spec)
}

test("SPARK-18647: do not put provider in table properties for Hive serde table") {
val catalog = newBasicCatalog()
val hiveTable = CatalogTable(
identifier = TableIdentifier("hive_tbl", Some("db1")),
tableType = CatalogTableType.MANAGED,
storage = storageFormat,
schema = new StructType().add("col1", "int").add("col2", "string"),
provider = Some("hive"))
catalog.createTable(hiveTable, ignoreIfExists = false)

val rawTable = externalCatalog.client.getTable("db1", "hive_tbl")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also add one more check for another API externalCatalog.getTable("db1", "hive_tbl")? The provider should contain DDLUtils.HIVE_PROVIDER

assert(!rawTable.properties.contains(HiveExternalCatalog.DATASOURCE_PROVIDER))
assert(externalCatalog.getTable("db1", "hive_tbl").provider == Some(DDLUtils.HIVE_PROVIDER))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.hive

import java.io.File

import org.apache.spark.sql.{QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
Expand Down