@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
2020import java .io .IOException
2121import java .util
2222
23+ import scala .collection .mutable
2324import scala .util .control .NonFatal
2425
2526import org .apache .hadoop .conf .Configuration
@@ -218,9 +219,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
218219 // table location for tables in default database, while we expect to use the location of
219220 // default database.
220221 storage = tableDefinition.storage.copy(locationUri = tableLocation),
221- // Here we follow data source tables and put table metadata like provider, schema, etc. in
222- // table properties, so that we can work around the Hive metastore issue about not case
223- // preserving and make Hive serde table support mixed-case column names.
222+ // Here we follow data source tables and put table metadata like table schema, partition
223+ // columns etc. in table properties, so that we can work around the Hive metastore issue
224+ // about not case preserving and make Hive serde table support mixed-case column names.
224225 properties = tableDefinition.properties ++ tableMetaToTableProps(tableDefinition))
225226 client.createTable(tableWithDataSourceProps, ignoreIfExists)
226227 } else {
@@ -232,17 +233,26 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
232233 }
233234
234235 private def createDataSourceTable (table : CatalogTable , ignoreIfExists : Boolean ): Unit = {
236+ // data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`.
237+ val provider = table.provider.get
238+
235239 // To work around some hive metastore issues, e.g. not case-preserving, bad decimal type
236240 // support, no column nullability, etc., we should do some extra works before saving table
237241 // metadata into Hive metastore:
238- // 1. Put table metadata like provider, schema, etc. in table properties.
242+ // 1. Put table metadata like table schema, partition columns , etc. in table properties.
239243 // 2. Check if this table is hive compatible.
240244 // 2.1 If it's not hive compatible, set location URI, schema, partition columns and bucket
241245 // spec to empty and save table metadata to Hive.
242246 // 2.2 If it's hive compatible, set serde information in table metadata and try to save
243247 // it to Hive. If it fails, treat it as not hive compatible and go back to 2.1
244248 val tableProperties = tableMetaToTableProps(table)
245249
250+ // put table provider and partition provider in table properties.
251+ tableProperties.put(DATASOURCE_PROVIDER , provider)
252+ if (table.tracksPartitionsInCatalog) {
253+ tableProperties.put(TABLE_PARTITION_PROVIDER , TABLE_PARTITION_PROVIDER_CATALOG )
254+ }
255+
246256 // Ideally we should also put `locationUri` in table properties like provider, schema, etc.
247257 // However, in older version of Spark we already store table location in storage properties
248258 // with key "path". Here we keep this behaviour for backward compatibility.
@@ -289,7 +299,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
289299 }
290300
291301 val qualifiedTableName = table.identifier.quotedString
292- val maybeSerde = HiveSerDe .sourceToSerDe(table. provider.get )
302+ val maybeSerde = HiveSerDe .sourceToSerDe(provider)
293303 val skipHiveMetadata = table.storage.properties
294304 .getOrElse(" skipHiveMetadata" , " false" ).toBoolean
295305
@@ -314,7 +324,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
314324 (Some (newHiveCompatibleMetastoreTable(serde)), message)
315325
316326 case _ =>
317- val provider = table.provider.get
318327 val message =
319328 s " Couldn't find corresponding Hive SerDe for data source provider $provider. " +
320329 s " Persisting data source table $qualifiedTableName into Hive metastore in " +
@@ -348,21 +357,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
348357 /**
349358 * Data source tables may be non Hive compatible and we need to store table metadata in table
350359 * properties to workaround some Hive metastore limitations.
351- * This method puts table provider, partition provider, schema, partition column names, bucket
352- * specification into a map, which can be used as table properties later.
360+ * This method puts table schema, partition column names, bucket specification into a map, which
361+ * can be used as table properties later.
353362 */
354- private def tableMetaToTableProps (table : CatalogTable ): scala.collection.Map [String , String ] = {
355- // data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`.
356- val provider = table.provider.get
363+ private def tableMetaToTableProps (table : CatalogTable ): mutable.Map [String , String ] = {
357364 val partitionColumns = table.partitionColumnNames
358365 val bucketSpec = table.bucketSpec
359366
360- val properties = new scala.collection.mutable.HashMap [String , String ]
361- properties.put(DATASOURCE_PROVIDER , provider)
362- if (table.tracksPartitionsInCatalog) {
363- properties.put(TABLE_PARTITION_PROVIDER , TABLE_PARTITION_PROVIDER_CATALOG )
364- }
365-
367+ val properties = new mutable.HashMap [String , String ]
366368 // Serialized JSON schema string may be too long to be stored into a single metastore table
367369 // property. In this case, we split the JSON string and store each part as a separate table
368370 // property.
@@ -616,14 +618,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
616618
617619 if (table.tableType != VIEW ) {
618620 table.properties.get(DATASOURCE_PROVIDER ) match {
619- // No provider in table properties, which means this table is created by Spark prior to 2.1,
620- // or is created at Hive side.
621+ // No provider in table properties, which means this is a Hive serde table.
621622 case None =>
622- table = table.copy(
623- provider = Some (DDLUtils .HIVE_PROVIDER ), tracksPartitionsInCatalog = true )
624-
625- // This is a Hive serde table created by Spark 2.1 or higher versions.
626- case Some (DDLUtils .HIVE_PROVIDER ) =>
627623 table = restoreHiveSerdeTable(table)
628624
629625 // This is a regular data source table.
@@ -636,7 +632,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
636632 val statsProps = table.properties.filterKeys(_.startsWith(STATISTICS_PREFIX ))
637633
638634 if (statsProps.nonEmpty) {
639- val colStats = new scala.collection. mutable.HashMap [String , ColumnStat ]
635+ val colStats = new mutable.HashMap [String , ColumnStat ]
640636
641637 // For each column, recover its column stats. Note that this is currently a O(n^2) operation,
642638 // but given the number of columns it usually not enormous, this is probably OK as a start.
@@ -673,21 +669,27 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
673669 provider = Some (DDLUtils .HIVE_PROVIDER ),
674670 tracksPartitionsInCatalog = true )
675671
676- val schemaFromTableProps = getSchemaFromTableProperties(table)
677- if (DataType .equalsIgnoreCaseAndNullability(schemaFromTableProps, table.schema)) {
678- hiveTable.copy(
679- schema = schemaFromTableProps,
680- partitionColumnNames = getPartitionColumnsFromTableProperties(table),
681- bucketSpec = getBucketSpecFromTableProperties(table))
672+ // If this is a Hive serde table created by Spark 2.1 or higher versions, we should restore its
673+ // schema from table properties.
674+ if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS )) {
675+ val schemaFromTableProps = getSchemaFromTableProperties(table)
676+ if (DataType .equalsIgnoreCaseAndNullability(schemaFromTableProps, table.schema)) {
677+ hiveTable.copy(
678+ schema = schemaFromTableProps,
679+ partitionColumnNames = getPartitionColumnsFromTableProperties(table),
680+ bucketSpec = getBucketSpecFromTableProperties(table))
681+ } else {
682+ // Hive metastore may change the table schema, e.g. schema inference. If the table
683+ // schema we read back is different(ignore case and nullability) from the one in table
684+ // properties which was written when creating table, we should respect the table schema
685+ // from hive.
686+ logWarning(s " The table schema given by Hive metastore( ${table.schema.simpleString}) is " +
687+ " different from the schema when this table was created by Spark SQL" +
688+ s " ( ${schemaFromTableProps.simpleString}). We have to fall back to the table schema " +
689+ " from Hive metastore which is not case preserving." )
690+ hiveTable
691+ }
682692 } else {
683- // Hive metastore may change the table schema, e.g. schema inference. If the table
684- // schema we read back is different(ignore case and nullability) from the one in table
685- // properties which was written when creating table, we should respect the table schema
686- // from hive.
687- logWarning(s " The table schema given by Hive metastore( ${table.schema.simpleString}) is " +
688- " different from the schema when this table was created by Spark SQL" +
689- s " ( ${schemaFromTableProps.simpleString}). We have to fall back to the table schema from " +
690- " Hive metastore which is not case preserving." )
691693 hiveTable
692694 }
693695 }
0 commit comments