Skip to content

Commit 0f0903d

Browse files
committed
[SPARK-18647][SQL] do not put provider in table properties for Hive serde table
## What changes were proposed in this pull request? In Spark 2.1, we make Hive serde tables case-preserving by putting the table metadata in table properties, like what we did for data source table. However, we should not put table provider, as it will break forward compatibility. e.g. if we create a Hive serde table with Spark 2.1, using `sql("create table test stored as parquet as select 1")`, we will fail to read it with Spark 2.0, as Spark 2.0 mistakenly treat it as data source table because there is a `provider` entry in table properties. Logically Hive serde table's provider is always hive, we don't need to store it in table properties, this PR removes it. ## How was this patch tested? manually test the forward compatibility issue. Author: Wenchen Fan <[email protected]> Closes #16080 from cloud-fan/hive. (cherry picked from commit a5f02b0) Signed-off-by: Wenchen Fan <[email protected]>
1 parent fce1be6 commit 0f0903d

File tree

3 files changed

+59
-41
lines changed

3 files changed

+59
-41
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala

Lines changed: 41 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.io.IOException
2121
import java.net.URI
2222
import java.util
2323

24+
import scala.collection.mutable
2425
import scala.util.control.NonFatal
2526

2627
import org.apache.hadoop.conf.Configuration
@@ -219,9 +220,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
219220
// table location for tables in default database, while we expect to use the location of
220221
// default database.
221222
storage = tableDefinition.storage.copy(locationUri = tableLocation),
222-
// Here we follow data source tables and put table metadata like provider, schema, etc. in
223-
// table properties, so that we can work around the Hive metastore issue about not case
224-
// preserving and make Hive serde table support mixed-case column names.
223+
// Here we follow data source tables and put table metadata like table schema, partition
224+
// columns etc. in table properties, so that we can work around the Hive metastore issue
225+
// about not case preserving and make Hive serde table support mixed-case column names.
225226
properties = tableDefinition.properties ++ tableMetaToTableProps(tableDefinition))
226227
client.createTable(tableWithDataSourceProps, ignoreIfExists)
227228
} else {
@@ -233,17 +234,26 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
233234
}
234235

235236
private def createDataSourceTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = {
237+
// data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`.
238+
val provider = table.provider.get
239+
236240
// To work around some hive metastore issues, e.g. not case-preserving, bad decimal type
237241
// support, no column nullability, etc., we should do some extra works before saving table
238242
// metadata into Hive metastore:
239-
// 1. Put table metadata like provider, schema, etc. in table properties.
243+
// 1. Put table metadata like table schema, partition columns, etc. in table properties.
240244
// 2. Check if this table is hive compatible.
241245
// 2.1 If it's not hive compatible, set location URI, schema, partition columns and bucket
242246
// spec to empty and save table metadata to Hive.
243247
// 2.2 If it's hive compatible, set serde information in table metadata and try to save
244248
// it to Hive. If it fails, treat it as not hive compatible and go back to 2.1
245249
val tableProperties = tableMetaToTableProps(table)
246250

251+
// put table provider and partition provider in table properties.
252+
tableProperties.put(DATASOURCE_PROVIDER, provider)
253+
if (table.tracksPartitionsInCatalog) {
254+
tableProperties.put(TABLE_PARTITION_PROVIDER, TABLE_PARTITION_PROVIDER_CATALOG)
255+
}
256+
247257
// Ideally we should also put `locationUri` in table properties like provider, schema, etc.
248258
// However, in older version of Spark we already store table location in storage properties
249259
// with key "path". Here we keep this behaviour for backward compatibility.
@@ -290,7 +300,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
290300
}
291301

292302
val qualifiedTableName = table.identifier.quotedString
293-
val maybeSerde = HiveSerDe.sourceToSerDe(table.provider.get)
303+
val maybeSerde = HiveSerDe.sourceToSerDe(provider)
294304
val skipHiveMetadata = table.storage.properties
295305
.getOrElse("skipHiveMetadata", "false").toBoolean
296306

@@ -315,7 +325,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
315325
(Some(newHiveCompatibleMetastoreTable(serde)), message)
316326

317327
case _ =>
318-
val provider = table.provider.get
319328
val message =
320329
s"Couldn't find corresponding Hive SerDe for data source provider $provider. " +
321330
s"Persisting data source table $qualifiedTableName into Hive metastore in " +
@@ -349,21 +358,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
349358
/**
350359
* Data source tables may be non Hive compatible and we need to store table metadata in table
351360
* properties to workaround some Hive metastore limitations.
352-
* This method puts table provider, partition provider, schema, partition column names, bucket
353-
* specification into a map, which can be used as table properties later.
361+
* This method puts table schema, partition column names, bucket specification into a map, which
362+
* can be used as table properties later.
354363
*/
355-
private def tableMetaToTableProps(table: CatalogTable): scala.collection.Map[String, String] = {
356-
// data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`.
357-
val provider = table.provider.get
364+
private def tableMetaToTableProps(table: CatalogTable): mutable.Map[String, String] = {
358365
val partitionColumns = table.partitionColumnNames
359366
val bucketSpec = table.bucketSpec
360367

361-
val properties = new scala.collection.mutable.HashMap[String, String]
362-
properties.put(DATASOURCE_PROVIDER, provider)
363-
if (table.tracksPartitionsInCatalog) {
364-
properties.put(TABLE_PARTITION_PROVIDER, TABLE_PARTITION_PROVIDER_CATALOG)
365-
}
366-
368+
val properties = new mutable.HashMap[String, String]
367369
// Serialized JSON schema string may be too long to be stored into a single metastore table
368370
// property. In this case, we split the JSON string and store each part as a separate table
369371
// property.
@@ -617,14 +619,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
617619

618620
if (table.tableType != VIEW) {
619621
table.properties.get(DATASOURCE_PROVIDER) match {
620-
// No provider in table properties, which means this table is created by Spark prior to 2.1,
621-
// or is created at Hive side.
622+
// No provider in table properties, which means this is a Hive serde table.
622623
case None =>
623-
table = table.copy(
624-
provider = Some(DDLUtils.HIVE_PROVIDER), tracksPartitionsInCatalog = true)
625-
626-
// This is a Hive serde table created by Spark 2.1 or higher versions.
627-
case Some(DDLUtils.HIVE_PROVIDER) =>
628624
table = restoreHiveSerdeTable(table)
629625

630626
// This is a regular data source table.
@@ -637,7 +633,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
637633
val statsProps = table.properties.filterKeys(_.startsWith(STATISTICS_PREFIX))
638634

639635
if (statsProps.nonEmpty) {
640-
val colStats = new scala.collection.mutable.HashMap[String, ColumnStat]
636+
val colStats = new mutable.HashMap[String, ColumnStat]
641637

642638
// For each column, recover its column stats. Note that this is currently a O(n^2) operation,
643639
// but given the number of columns it usually not enormous, this is probably OK as a start.
@@ -674,21 +670,27 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
674670
provider = Some(DDLUtils.HIVE_PROVIDER),
675671
tracksPartitionsInCatalog = true)
676672

677-
val schemaFromTableProps = getSchemaFromTableProperties(table)
678-
if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, table.schema)) {
679-
hiveTable.copy(
680-
schema = schemaFromTableProps,
681-
partitionColumnNames = getPartitionColumnsFromTableProperties(table),
682-
bucketSpec = getBucketSpecFromTableProperties(table))
673+
// If this is a Hive serde table created by Spark 2.1 or higher versions, we should restore its
674+
// schema from table properties.
675+
if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) {
676+
val schemaFromTableProps = getSchemaFromTableProperties(table)
677+
if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, table.schema)) {
678+
hiveTable.copy(
679+
schema = schemaFromTableProps,
680+
partitionColumnNames = getPartitionColumnsFromTableProperties(table),
681+
bucketSpec = getBucketSpecFromTableProperties(table))
682+
} else {
683+
// Hive metastore may change the table schema, e.g. schema inference. If the table
684+
// schema we read back is different(ignore case and nullability) from the one in table
685+
// properties which was written when creating table, we should respect the table schema
686+
// from hive.
687+
logWarning(s"The table schema given by Hive metastore(${table.schema.simpleString}) is " +
688+
"different from the schema when this table was created by Spark SQL" +
689+
s"(${schemaFromTableProps.simpleString}). We have to fall back to the table schema " +
690+
"from Hive metastore which is not case preserving.")
691+
hiveTable
692+
}
683693
} else {
684-
// Hive metastore may change the table schema, e.g. schema inference. If the table
685-
// schema we read back is different(ignore case and nullability) from the one in table
686-
// properties which was written when creating table, we should respect the table schema
687-
// from hive.
688-
logWarning(s"The table schema given by Hive metastore(${table.schema.simpleString}) is " +
689-
"different from the schema when this table was created by Spark SQL" +
690-
s"(${schemaFromTableProps.simpleString}). We have to fall back to the table schema from " +
691-
"Hive metastore which is not case preserving.")
692694
hiveTable
693695
}
694696
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@ package org.apache.spark.sql.hive
2020
import org.apache.hadoop.conf.Configuration
2121

2222
import org.apache.spark.SparkConf
23+
import org.apache.spark.sql.catalyst.TableIdentifier
2324
import org.apache.spark.sql.catalyst.catalog._
2425
import org.apache.spark.sql.catalyst.dsl.expressions._
26+
import org.apache.spark.sql.execution.command.DDLUtils
27+
import org.apache.spark.sql.types.StructType
2528

2629
/**
2730
* Test suite for the [[HiveExternalCatalog]].
@@ -52,4 +55,19 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
5255
assert(selectedPartitions.length == 1)
5356
assert(selectedPartitions.head.spec == part1.spec)
5457
}
58+
59+
test("SPARK-18647: do not put provider in table properties for Hive serde table") {
60+
val catalog = newBasicCatalog()
61+
val hiveTable = CatalogTable(
62+
identifier = TableIdentifier("hive_tbl", Some("db1")),
63+
tableType = CatalogTableType.MANAGED,
64+
storage = storageFormat,
65+
schema = new StructType().add("col1", "int").add("col2", "string"),
66+
provider = Some("hive"))
67+
catalog.createTable(hiveTable, ignoreIfExists = false)
68+
69+
val rawTable = externalCatalog.client.getTable("db1", "hive_tbl")
70+
assert(!rawTable.properties.contains(HiveExternalCatalog.DATASOURCE_PROVIDER))
71+
assert(externalCatalog.getTable("db1", "hive_tbl").provider == Some(DDLUtils.HIVE_PROVIDER))
72+
}
5573
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.sql.hive
1919

20-
import java.io.File
21-
2220
import org.apache.spark.sql.{QueryTest, Row, SaveMode}
2321
import org.apache.spark.sql.catalyst.TableIdentifier
2422
import org.apache.spark.sql.catalyst.catalog.CatalogTableType

0 commit comments

Comments
 (0)