From 17321a3da1c2060f300e92eea1517ad9f5e87112 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 3 Nov 2016 22:17:53 +0800 Subject: [PATCH 1/7] put hive serde table schema to table properties like data source table --- .../catalyst/catalog/ExternalCatalog.scala | 8 +- .../catalyst/catalog/InMemoryCatalog.scala | 6 - .../org/apache/spark/sql/types/DataType.scala | 24 +++ .../catalog/ExternalCatalogSuite.scala | 20 +++ .../spark/sql/hive/HiveExternalCatalog.scala | 156 ++++++++++++------ .../input1-2-d3aa54d5436b7b59ff5c7091b7ca6145 | 4 +- .../input2-1-e0efeda558cd0194f4764a5735147b16 | 4 +- .../input2-2-aa9ab0598e0cb7a12c719f9b3d98dbfd | 4 +- .../input2-4-235f92683416fab031e6e7490487b15b | 6 +- ...columns-2-b74990316ec4245fd8a7011e684b39da | 6 +- 10 files changed, 164 insertions(+), 74 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index a5e02523d288..14dd707fa0f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.catalog -import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException} +import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException} import org.apache.spark.sql.catalyst.expressions.Expression @@ -39,6 +39,12 @@ abstract class ExternalCatalog { } } + protected def requireTableExists(db: String, table: String): Unit = { + if (!tableExists(db, table)) { + throw new NoSuchTableException(db = db, table = table) + } + } + protected def requireFunctionExists(db: String, funcName: String): Unit = { if (!functionExists(db, funcName)) { throw new NoSuchFunctionException(db = db, func = funcName) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index ea675b76607d..bc396880f22a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -64,12 +64,6 @@ class InMemoryCatalog( catalog(db).tables(table).partitions.contains(spec) } - private def requireTableExists(db: String, table: String): Unit = { - if (!tableExists(db, table)) { - throw new NoSuchTableException(db = db, table = table) - } - } - private def requireTableNotExists(db: String, table: String): Unit = { if (tableExists(db, table)) { throw new TableAlreadyExistsException(db = db, table = table) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala index 312585df1516..2642d9395ba8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala @@ -250,4 +250,28 @@ object DataType { case (fromDataType, toDataType) => fromDataType == toDataType } } + + /** + * Compares two types, ignoring nullability of ArrayType, MapType, StructType, and ignoring case + * sensitivity of field names in StructType. + */ + private[sql] def equalsIgnoreCaseAndNullability(from: DataType, to: DataType): Boolean = { + (from, to) match { + case (ArrayType(fromElement, _), ArrayType(toElement, _)) => + equalsIgnoreCaseAndNullability(fromElement, toElement) + + case (MapType(fromKey, fromValue, _), MapType(toKey, toValue, _)) => + equalsIgnoreCaseAndNullability(fromKey, toKey) && + equalsIgnoreCaseAndNullability(fromValue, toValue) + + case (StructType(fromFields), StructType(toFields)) => + fromFields.length == toFields.length && + fromFields.zip(toFields).forall { case (l, r) => + l.name.equalsIgnoreCase(r.name) && + equalsIgnoreCaseAndNullability(l.dataType, r.dataType) + } + + case (fromDataType, toDataType) => fromDataType == toDataType + } + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index f283f4287c5b..66f92d1b1b0a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -270,6 +270,26 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac assert(catalog.listTables("db2", "*1").toSet == Set("tbl1")) } + test("column names should be case-preserving and column nullability should be retained") { + val catalog = newBasicCatalog() + val tbl = CatalogTable( + identifier = TableIdentifier("tbl", Some("db1")), + tableType = CatalogTableType.MANAGED, + storage = storageFormat, + schema = new StructType() + .add("HelLo", "int", nullable = false) + .add("WoRLd", "int", nullable = true), + provider = Some("hive"), + partitionColumnNames = Seq("WoRLd"), + bucketSpec = Some(BucketSpec(4, Seq("HelLo"), Nil))) + catalog.createTable(tbl, ignoreIfExists = false) + + val readBack = catalog.getTable("db1", "tbl") + assert(readBack.schema == tbl.schema) + assert(readBack.partitionColumnNames == tbl.partitionColumnNames) + assert(readBack.bucketSpec == tbl.bucketSpec) + } + // -------------------------------------------------------------------------- // Partitions // -------------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index ebba203ac593..6867686353bf 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -95,8 +95,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } } - private def requireTableExists(db: String, table: String): Unit = { - withClient { getTable(db, table) } + /** + * Get the raw table metadata from hive metastore directly. + */ + private def getRawTable(db: String, table: String): CatalogTable = withClient { + client.getTable(db, table) } /** @@ -187,14 +190,19 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat if (tableExists(db, table) && !ignoreIfExists) { throw new TableAlreadyExistsException(db = db, table = table) } - // Before saving data source table metadata into Hive metastore, we should: - // 1. Put table metadata like provider, schema, etc. in table properties. - // 2. Check if this table is hive compatible - // 2.1 If it's not hive compatible, set location URI, schema, partition columns and bucket - // spec to empty and save table metadata to Hive. - // 2.2 If it's hive compatible, set serde information in table metadata and try to save - // it to Hive. If it fails, treat it as not hive compatible and go back to 2.1 - if (DDLUtils.isDatasourceTable(tableDefinition)) { + + if (tableDefinition.tableType == VIEW) { + client.createTable(tableDefinition, ignoreIfExists) + } else { + // To work around some hive metastore issues, e.g. not case-preserving, bad decimal type + // support, no column nullability, etc., we should do some extra works before saving table + // metadata into Hive metastore: + // 1. Put table metadata like provider, schema, etc. in table properties. + // 2. Check if this table is hive compatible(Hive serde table is obviously Hive compatible) + // 2.1 If it's not hive compatible, set location URI, schema, partition columns and bucket + // spec to empty and save table metadata to Hive. + // 2.2 If it's hive compatible, set serde information in table metadata and try to save + // it to Hive. If it fails, treat it as not hive compatible and go back to 2.1 val tableProperties = tableMetaToTableProps(tableDefinition) val needDefaultTableLocation = tableDefinition.tableType == MANAGED && @@ -255,6 +263,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat .getOrElse("skipHiveMetadata", "false").toBoolean val (hiveCompatibleTable, logMessage) = maybeSerde match { + case _ if tableDefinition.provider.get == "hive" => + val message = s"Persisting Hive serde table $qualifiedTableName into Hive metastore." + val tableWithDataSourceProps = tableDefinition.copy( + properties = tableDefinition.properties ++ tableProperties) + (Some(tableWithDataSourceProps), message) + case _ if skipHiveMetadata => val message = s"Persisting data source table $qualifiedTableName into Hive metastore in" + @@ -304,14 +318,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat logWarning(message) saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists) } - } else { - client.createTable(tableDefinition, ignoreIfExists) } } /** - * Data source tables may be non Hive compatible and we need to store table metadata in table - * properties to workaround some Hive metastore limitations. * This method puts table provider, partition provider, schema, partition column names, bucket * specification into a map, which can be used as table properties later. */ @@ -372,9 +382,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } private def saveTableIntoHive(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = { - assert(DDLUtils.isDatasourceTable(tableDefinition), - "saveTableIntoHive only takes data source table.") - // If this is an external data source table... + // If this is an external table... if (tableDefinition.tableType == EXTERNAL && // ... that is not persisted as Hive compatible format (external tables in Hive compatible // format always set `locationUri` to the actual data location and should NOT be hacked as @@ -417,11 +425,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } override def renameTable(db: String, oldName: String, newName: String): Unit = withClient { - val rawTable = client.getTable(db, oldName) + val rawTable = getRawTable(db, oldName) - val storageWithNewPath = if (rawTable.tableType == MANAGED) { - // If it's a managed table and we are renaming it, then the path option becomes inaccurate - // and we need to update it according to the new table name. + val hasPathOption = new CaseInsensitiveMap(rawTable.storage.properties).contains("path") + val storageWithNewPath = if (rawTable.tableType == MANAGED && hasPathOption) { + // If it's a managed table with path option and we are renaming it, then the path option + // becomes inaccurate and we need to update it according to the new table name. val newTablePath = defaultTablePath(TableIdentifier(newName, Some(db))) updateLocationInStorageProps(rawTable, Some(newTablePath)) } else { @@ -442,7 +451,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat private def updateLocationInStorageProps( table: CatalogTable, newPath: Option[String]): CatalogStorageFormat = { - val propsWithoutPath = table.storage.properties.filterKeys(_.toLowerCase != "path") + val propsWithoutPath = table.storage.properties.filter { + case (k, v) => k.toLowerCase != "path" + } table.storage.copy(properties = propsWithoutPath ++ newPath.map("path" -> _)) } @@ -475,18 +486,27 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat tableDefinition } - if (DDLUtils.isDatasourceTable(withStatsProps)) { - val oldTableDef = client.getTable(db, withStatsProps.identifier.table) + if (tableDefinition.tableType == VIEW) { + client.alterTable(withStatsProps) + } else { + val oldTableDef = getRawTable(db, withStatsProps.identifier.table) - val oldLocation = getLocationFromStorageProps(oldTableDef) - val newLocation = tableDefinition.storage.locationUri - // Only update the `locationUri` field if the location is really changed, because this table - // may be not Hive-compatible and can not set the `locationUri` field. We should respect the - // old `locationUri` even it's None. - val storageWithNewLocation = if (oldLocation == newLocation) { - oldTableDef.storage + val newStorage = if (tableDefinition.provider.get == "hive") { + tableDefinition.storage } else { - updateLocationInStorageProps(oldTableDef, newLocation).copy(locationUri = newLocation) + val newLocation = tableDefinition.storage.locationUri + val storageWithPathOption = tableDefinition.storage.copy( + properties = tableDefinition.storage.properties ++ newLocation.map("path" -> _)) + + // For data source tables, only update the `locationUri` field if the location is really + // changed, because this table may be not Hive-compatible and can not set the `locationUri` + // field. We should respect the old `locationUri` even it's None. + val oldLocation = getLocationFromStorageProps(oldTableDef) + if (oldLocation == newLocation) { + storageWithPathOption.copy(locationUri = oldTableDef.storage.locationUri) + } else { + storageWithPathOption.copy(locationUri = newLocation) + } } val partitionProviderProp = if (tableDefinition.partitionProviderIsHive) { @@ -498,23 +518,21 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // Sets the `schema`, `partitionColumnNames` and `bucketSpec` from the old table definition, // to retain the spark specific format if it is. Also add old data source properties to table // properties, to retain the data source table format. - val oldDataSourceProps = oldTableDef.properties.filter(_._1.startsWith(SPARK_SQL_PREFIX)) + val oldDataSourceProps = oldTableDef.properties.filter(_._1.startsWith(DATASOURCE_PREFIX)) val newTableProps = oldDataSourceProps ++ withStatsProps.properties + partitionProviderProp val newDef = withStatsProps.copy( - storage = storageWithNewLocation, + storage = newStorage, schema = oldTableDef.schema, partitionColumnNames = oldTableDef.partitionColumnNames, bucketSpec = oldTableDef.bucketSpec, properties = newTableProps) client.alterTable(newDef) - } else { - client.alterTable(withStatsProps) } } override def getTable(db: String, table: String): CatalogTable = withClient { - restoreTableMetadata(client.getTable(db, table)) + restoreTableMetadata(getRawTable(db, table)) } override def getTableOption(db: String, table: String): Option[CatalogTable] = withClient { @@ -537,22 +555,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat table } else { getProviderFromTableProperties(table).map { provider => - assert(provider != "hive", "Hive serde table should not save provider in table properties.") - // Internally we store the table location in storage properties with key "path" for data - // source tables. Here we set the table location to `locationUri` field and filter out the - // path option in storage properties, to avoid exposing this concept externally. - val storageWithLocation = { - val tableLocation = getLocationFromStorageProps(table) - updateLocationInStorageProps(table, None).copy(locationUri = tableLocation) + if (provider == "hive") { + restoreHiveSerdeTable(table) + } else { + restoreDataSourceTable(table, provider) } - - table.copy( - storage = storageWithLocation, - schema = getSchemaFromTableProperties(table), - provider = Some(provider), - partitionColumnNames = getPartitionColumnsFromTableProperties(table), - bucketSpec = getBucketSpecFromTableProperties(table), - partitionProviderIsHive = table.properties.get(TABLE_PARTITION_PROVIDER) == Some("hive")) } getOrElse { table.copy(provider = Some("hive"), partitionProviderIsHive = true) } @@ -580,6 +587,41 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat tableWithStats.copy(properties = getOriginalTableProperties(table)) } + private def restoreHiveSerdeTable(table: CatalogTable): CatalogTable = { + val hiveTable = table.copy(provider = Some("hive"), partitionProviderIsHive = true) + + val schemaFromTableProps = getSchemaFromTableProperties(table) + if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, table.schema)) { + hiveTable.copy( + schema = schemaFromTableProps, + partitionColumnNames = getPartitionColumnsFromTableProperties(table), + bucketSpec = getBucketSpecFromTableProperties(table)) + } else { + // Hive metastore may change the table schema, e.g. schema inference. If the table + // schema we read back is different(ignore case and nullability) from the one in table + // properties which was written when creating table, we should respect the table schema + // from hive. + hiveTable + } + } + + private def restoreDataSourceTable(table: CatalogTable, provider: String): CatalogTable = { + // Internally we store the table location in storage properties with key "path" for data + // source tables. Here we set the table location to `locationUri` field and filter out the + // path option in storage properties, to avoid exposing this concept externally. + val storageWithLocation = { + val tableLocation = getLocationFromStorageProps(table) + updateLocationInStorageProps(table, None).copy(locationUri = tableLocation) + } + table.copy( + provider = Some(provider), + storage = storageWithLocation, + schema = getSchemaFromTableProperties(table), + partitionColumnNames = getPartitionColumnsFromTableProperties(table), + bucketSpec = getBucketSpecFromTableProperties(table), + partitionProviderIsHive = table.properties.get(TABLE_PARTITION_PROVIDER) == Some("hive")) + } + override def tableExists(db: String, table: String): Boolean = withClient { client.tableExists(db, table) } @@ -620,7 +662,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val orderedPartitionSpec = new util.LinkedHashMap[String, String]() getTable(db, table).partitionColumnNames.foreach { colName => - orderedPartitionSpec.put(colName, partition(colName)) + // Lowercase the partition column names before passing the partition spec to Hive client, as + // Hive metastore is not case preserving. + orderedPartitionSpec.put(colName.toLowerCase, partition(colName)) } client.loadPartition( @@ -645,7 +689,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val orderedPartitionSpec = new util.LinkedHashMap[String, String]() getTable(db, table).partitionColumnNames.foreach { colName => - orderedPartitionSpec.put(colName, partition(colName)) + // Lowercase the partition column names before passing the partition spec to Hive client, as + // Hive metastore is not case preserving. + orderedPartitionSpec.put(colName.toLowerCase, partition(colName)) } client.loadDynamicPartitions( @@ -751,7 +797,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat db: String, table: String, predicates: Seq[Expression]): Seq[CatalogTablePartition] = withClient { - val rawTable = client.getTable(db, table) + val rawTable = getRawTable(db, table) val catalogTable = restoreTableMetadata(rawTable) val partitionColumnNames = catalogTable.partitionColumnNames.toSet val nonPartitionPruningPredicates = predicates.filterNot { diff --git a/sql/hive/src/test/resources/golden/input1-2-d3aa54d5436b7b59ff5c7091b7ca6145 b/sql/hive/src/test/resources/golden/input1-2-d3aa54d5436b7b59ff5c7091b7ca6145 index d3ffb995aff4..93ba96ec8c15 100644 --- a/sql/hive/src/test/resources/golden/input1-2-d3aa54d5436b7b59ff5c7091b7ca6145 +++ b/sql/hive/src/test/resources/golden/input1-2-d3aa54d5436b7b59ff5c7091b7ca6145 @@ -1,2 +1,2 @@ -a int -b double +A int +B double diff --git a/sql/hive/src/test/resources/golden/input2-1-e0efeda558cd0194f4764a5735147b16 b/sql/hive/src/test/resources/golden/input2-1-e0efeda558cd0194f4764a5735147b16 index d3ffb995aff4..93ba96ec8c15 100644 --- a/sql/hive/src/test/resources/golden/input2-1-e0efeda558cd0194f4764a5735147b16 +++ b/sql/hive/src/test/resources/golden/input2-1-e0efeda558cd0194f4764a5735147b16 @@ -1,2 +1,2 @@ -a int -b double +A int +B double diff --git a/sql/hive/src/test/resources/golden/input2-2-aa9ab0598e0cb7a12c719f9b3d98dbfd b/sql/hive/src/test/resources/golden/input2-2-aa9ab0598e0cb7a12c719f9b3d98dbfd index d3ffb995aff4..93ba96ec8c15 100644 --- a/sql/hive/src/test/resources/golden/input2-2-aa9ab0598e0cb7a12c719f9b3d98dbfd +++ b/sql/hive/src/test/resources/golden/input2-2-aa9ab0598e0cb7a12c719f9b3d98dbfd @@ -1,2 +1,2 @@ -a int -b double +A int +B double diff --git a/sql/hive/src/test/resources/golden/input2-4-235f92683416fab031e6e7490487b15b b/sql/hive/src/test/resources/golden/input2-4-235f92683416fab031e6e7490487b15b index 77eaef91c9c3..d52fcf0ebbdb 100644 --- a/sql/hive/src/test/resources/golden/input2-4-235f92683416fab031e6e7490487b15b +++ b/sql/hive/src/test/resources/golden/input2-4-235f92683416fab031e6e7490487b15b @@ -1,3 +1,3 @@ -a array -b double -c map +A array +B double +C map diff --git a/sql/hive/src/test/resources/golden/show_columns-2-b74990316ec4245fd8a7011e684b39da b/sql/hive/src/test/resources/golden/show_columns-2-b74990316ec4245fd8a7011e684b39da index 70c14c3ef34a..2f7168cba930 100644 --- a/sql/hive/src/test/resources/golden/show_columns-2-b74990316ec4245fd8a7011e684b39da +++ b/sql/hive/src/test/resources/golden/show_columns-2-b74990316ec4245fd8a7011e684b39da @@ -1,3 +1,3 @@ -key -value -ds +KEY +VALUE +ds From 7b6fb133145cdba3fcf4d13add9452a993ce14a8 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 3 Nov 2016 22:40:19 +0800 Subject: [PATCH 2/7] follow up --- .../org/apache/spark/sql/hive/HiveExternalCatalog.scala | 7 ++++++- .../apache/spark/sql/hive/execution/SQLQuerySuite.scala | 4 ++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 6867686353bf..604518c52055 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -205,6 +205,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // it to Hive. If it fails, treat it as not hive compatible and go back to 2.1 val tableProperties = tableMetaToTableProps(tableDefinition) + // Ideally we should not create a managed table with location, but Hive serde table can + // specify location for managed table. And in [[CreateDataSourceTableAsSelectCommand]] we have + // to create the table directory and write out data before we create this table, to avoid + // exposing a partial written table. val needDefaultTableLocation = tableDefinition.tableType == MANAGED && tableDefinition.storage.locationUri.isEmpty val tableLocation = if (needDefaultTableLocation) { @@ -611,7 +615,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // path option in storage properties, to avoid exposing this concept externally. val storageWithLocation = { val tableLocation = getLocationFromStorageProps(table) - updateLocationInStorageProps(table, None).copy(locationUri = tableLocation) + // We pass None as `newPath` here, to remove the path option in storage properties. + updateLocationInStorageProps(table, newPath = None).copy(locationUri = tableLocation) } table.copy( provider = Some(provider), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index ad70835d06d9..fb905739d490 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -521,7 +521,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { val catalogTable = sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) relation match { - case LogicalRelation(r: HadoopFsRelation, _, Some(table)) => + case LogicalRelation(r: HadoopFsRelation, _, _) => if (!isDataSourceTable) { fail( s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " + @@ -529,7 +529,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } userSpecifiedLocation match { case Some(location) => - assert(table.storage.locationUri.get === location) + assert(r.options("path") === location) case None => // OK. } assert(catalogTable.provider.get === format) From 886a5af382d1da4ef6ad07b30258b51631c0fd8a Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 3 Nov 2016 22:45:21 +0800 Subject: [PATCH 3/7] minor --- .../scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 604518c52055..88069be73d9b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -386,8 +386,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } private def saveTableIntoHive(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = { - // If this is an external table... - if (tableDefinition.tableType == EXTERNAL && + // If this is an external data source table... + if (tableDefinition.provider.get != "hive" && tableDefinition.tableType == EXTERNAL && // ... that is not persisted as Hive compatible format (external tables in Hive compatible // format always set `locationUri` to the actual data location and should NOT be hacked as // following.) From afa5d85eb2d76a6ef699cc45ae53eb8225916efd Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 4 Nov 2016 15:41:41 +0800 Subject: [PATCH 4/7] address comments --- .../spark/sql/hive/HiveExternalCatalog.scala | 93 ++++++++++++++----- 1 file changed, 68 insertions(+), 25 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 4499c7ca37d5..ac1ea7a356e8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -96,7 +96,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } /** - * Get the raw table metadata from hive metastore directly. + * Get the raw table metadata from hive metastore directly. The raw table metadata may contains + * special data source properties and should not be exposed outside of `HiveExternalCatalog`. We + * should interpret these special data source properties and restore the original table metadata + * before returning it. */ private def getRawTable(db: String, table: String): CatalogTable = withClient { client.getTable(db, table) @@ -193,12 +196,19 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat if (tableDefinition.tableType == VIEW) { client.createTable(tableDefinition, ignoreIfExists) + } else if (tableDefinition.provider.get == "hive") { + // Here we follow data source tables and put table metadata like provider, schema, etc. in + // table properties, so that we can work around the Hive metastore issue about not case + // preserving and make Hive serde table support mixed-case column names. + val tableWithDataSourceProps = tableDefinition.copy( + properties = tableDefinition.properties ++ tableMetaToTableProps(tableDefinition)) + client.createTable(tableWithDataSourceProps, ignoreIfExists) } else { // To work around some hive metastore issues, e.g. not case-preserving, bad decimal type // support, no column nullability, etc., we should do some extra works before saving table // metadata into Hive metastore: // 1. Put table metadata like provider, schema, etc. in table properties. - // 2. Check if this table is hive compatible(Hive serde table is obviously Hive compatible) + // 2. Check if this table is hive compatible. // 2.1 If it's not hive compatible, set location URI, schema, partition columns and bucket // spec to empty and save table metadata to Hive. // 2.2 If it's hive compatible, set serde information in table metadata and try to save @@ -267,12 +277,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat .getOrElse("skipHiveMetadata", "false").toBoolean val (hiveCompatibleTable, logMessage) = maybeSerde match { - case _ if tableDefinition.provider.get == "hive" => - val message = s"Persisting Hive serde table $qualifiedTableName into Hive metastore." - val tableWithDataSourceProps = tableDefinition.copy( - properties = tableDefinition.properties ++ tableProperties) - (Some(tableWithDataSourceProps), message) - case _ if skipHiveMetadata => val message = s"Persisting data source table $qualifiedTableName into Hive metastore in" + @@ -326,6 +330,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } /** + * Data source tables may be non Hive compatible and we need to store table metadata in table + * properties to workaround some Hive metastore limitations. * This method puts table provider, partition provider, schema, partition column names, bucket * specification into a map, which can be used as table properties later. */ @@ -386,8 +392,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } private def saveTableIntoHive(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = { + assert(DDLUtils.isDatasourceTable(tableDefinition), + "saveTableIntoHive only takes data source table.") // If this is an external data source table... - if (tableDefinition.provider.get != "hive" && tableDefinition.tableType == EXTERNAL && + if (tableDefinition.tableType == EXTERNAL && // ... that is not persisted as Hive compatible format (external tables in Hive compatible // format always set `locationUri` to the actual data location and should NOT be hacked as // following.) @@ -431,6 +439,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat override def renameTable(db: String, oldName: String, newName: String): Unit = withClient { val rawTable = getRawTable(db, oldName) + // Note that Hive serde tables don't use path option in storage properties to store the value + // of table location, but use `locationUri` field to store it directly. And `locationUri` field + // will be updated automatically in Hive metastore by the `alterTable` call at the end of this + // method. Here we only update the path option if the path option already exists in storage + // properties, to avoid adding a unnecessary path option for Hive serde tables. val hasPathOption = new CaseInsensitiveMap(rawTable.storage.properties).contains("path") val storageWithNewPath = if (rawTable.tableType == MANAGED && hasPathOption) { // If it's a managed table with path option and we are renaming it, then the path option @@ -498,18 +511,42 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val newStorage = if (tableDefinition.provider.get == "hive") { tableDefinition.storage } else { + // We can't alter the table storage of data source table directly for 2 reasons: + // 1. internally we use path option in storage properties to store the value of table + // location, but the given `tableDefinition` is from outside and doesn't have the path + // option, we need to add it manually. + // 2. this data source table may be created on a file, not a directory, then we can't set + // the `locationUri` field and save it to Hive metastore, because Hive only allows + // directory as table location. + // + // For example, an external data source table is created with a single file '/path/to/file'. + // Internally, we will add a path option with value '/path/to/file' to storage properties, + // and set the `locationUri` to a special value due to SPARK-15269(please see + // `saveTableIntoHive` for more details). When users try to get the table metadata back, we + // will restore the `locationUri` field from the path option and remove the path option from + // storage properties. When users try to alter the table storage, the given + // `tableDefinition` will have `locationUri` field with value `/path/to/file` and the path + // option is not set. + // + // Here we need 2 extra steps: + // 1. add path option to storage properties, to match the internal format, i.e. using path + // option to store the value of table location. + // 2. set the `locationUri` field back to the old one from the existing table metadata, + // if users don't want to alter the table location. This step is necessary as the + // `locationUri` is not always same with the path option, e.g. in the above example + // `locationUri` is a special value and we should respect it. Note that, if users + // want to alter the table location to a file path, we will fail. This should be fixed + // in the future. + val newLocation = tableDefinition.storage.locationUri val storageWithPathOption = tableDefinition.storage.copy( properties = tableDefinition.storage.properties ++ newLocation.map("path" -> _)) - // For data source tables, only update the `locationUri` field if the location is really - // changed, because this table may be not Hive-compatible and can not set the `locationUri` - // field. We should respect the old `locationUri` even it's None. val oldLocation = getLocationFromStorageProps(oldTableDef) if (oldLocation == newLocation) { storageWithPathOption.copy(locationUri = oldTableDef.storage.locationUri) } else { - storageWithPathOption.copy(locationUri = newLocation) + storageWithPathOption } } @@ -558,14 +595,16 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val tableWithSchema = if (table.tableType == VIEW) { table } else { - getProviderFromTableProperties(table).map { provider => - if (provider == "hive") { - restoreHiveSerdeTable(table) - } else { - restoreDataSourceTable(table, provider) - } - } getOrElse { - table.copy(provider = Some("hive"), tracksPartitionsInCatalog = true) + getProviderFromTableProperties(table) match { + // No provider in table properties, which means this table is created by Spark prior to 2.1, + // or is created at Hive side. + case None => table.copy(provider = Some("hive"), tracksPartitionsInCatalog = true) + + // This is a Hive serde table created by Spark 2.1 or higher versions. + case Some("hive") => restoreHiveSerdeTable(table) + + // This is a regular data source table. + case Some(provider) => restoreDataSourceTable(table, provider) } } @@ -669,8 +708,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val orderedPartitionSpec = new util.LinkedHashMap[String, String]() getTable(db, table).partitionColumnNames.foreach { colName => - // Lowercase the partition column names before passing the partition spec to Hive client, as - // Hive metastore is not case preserving. + // Hive metastore is not case preserving and keeps partition columns with lower cased names, + // and Hive will validate the column names in partition spec to make sure they are partition + // columns. Here we Lowercase the column names before passing the partition spec to Hive + // client, to satisfy Hive. orderedPartitionSpec.put(colName.toLowerCase, partition(colName)) } @@ -696,8 +737,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val orderedPartitionSpec = new util.LinkedHashMap[String, String]() getTable(db, table).partitionColumnNames.foreach { colName => - // Lowercase the partition column names before passing the partition spec to Hive client, as - // Hive metastore is not case preserving. + // Hive metastore is not case preserving and keeps partition columns with lower cased names, + // and Hive will validate the column names in partition spec to make sure they are partition + // columns. Here we Lowercase the column names before passing the partition spec to Hive + // client, to satisfy Hive. orderedPartitionSpec.put(colName.toLowerCase, partition(colName)) } From b02cf0d4b1b1c782374328789bf534c17153b937 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 4 Nov 2016 22:25:51 +0800 Subject: [PATCH 5/7] enable a test --- .../spark/sql/hive/PartitionedTablePerfStatsSuite.scala | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala index d8e31c4e39a5..b41bc862e9bc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala @@ -105,12 +105,9 @@ class PartitionedTablePerfStatsSuite assert(df4.count() == 0) assert(df4.inputFiles.length == 0) - // TODO(ekl) enable for hive tables as well once SPARK-17983 is fixed - if (spec.isDatasourceTable) { - val df5 = spark.sql("select * from test where fieldOne = 4") - assert(df5.count() == 1) - assert(df5.inputFiles.length == 5) - } + val df5 = spark.sql("select * from test where fieldOne = 4") + assert(df5.count() == 1) + assert(df5.inputFiles.length == 5) } } } From c057d0c027199b43a24df853c26b755aeaf07c94 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sat, 5 Nov 2016 11:22:56 +0800 Subject: [PATCH 6/7] address comments --- .../apache/spark/sql/DataFrameWriter.scala | 10 +++++----- .../spark/sql/execution/SparkSqlParser.scala | 4 ++-- .../spark/sql/execution/SparkStrategies.scala | 6 ++++-- .../spark/sql/execution/command/ddl.scala | 4 +++- .../sql/execution/datasources/rules.scala | 5 +++-- .../spark/sql/hive/HiveExternalCatalog.scala | 19 ++++++++++++++----- 6 files changed, 31 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index f95362e29228..e0c89811ddbf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -24,10 +24,10 @@ import scala.collection.JavaConverters._ import org.apache.spark.annotation.InterfaceStability import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType} -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, OverwriteOptions, Union} -import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand -import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, CreateTable, DataSource, HadoopFsRelation} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, OverwriteOptions} +import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, DDLUtils} +import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, HadoopFsRelation} import org.apache.spark.sql.types.StructType /** @@ -359,7 +359,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } private def saveAsTable(tableIdent: TableIdentifier): Unit = { - if (source.toLowerCase == "hive") { + if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) { throw new AnalysisException("Cannot create hive serde table with saveAsTable API") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 634ffde3543c..b8be3d17ba44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -331,7 +331,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } val options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty) val provider = ctx.tableProvider.qualifiedName.getText - if (provider.toLowerCase == "hive") { + if (provider.toLowerCase == DDLUtils.HIVE_PROVIDER) { throw new AnalysisException("Cannot create hive serde table with CREATE TABLE USING") } val schema = Option(ctx.colTypeList()).map(createSchema) @@ -1034,7 +1034,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { tableType = tableType, storage = storage, schema = schema, - provider = Some("hive"), + provider = Some(DDLUtils.HIVE_PROVIDER), partitionColumnNames = partitionCols.map(_.name), properties = properties, comment = comment) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 5412aca95dcf..190fdd84343e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -415,7 +415,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object DDLStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case CreateTable(tableDesc, mode, None) if tableDesc.provider.get == "hive" => + case CreateTable(tableDesc, mode, None) + if tableDesc.provider.get == DDLUtils.HIVE_PROVIDER => val cmd = CreateTableCommand(tableDesc, ifNotExists = mode == SaveMode.Ignore) ExecutedCommandExec(cmd) :: Nil @@ -427,7 +428,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // CREATE TABLE ... AS SELECT ... for hive serde table is handled in hive module, by rule // `CreateTables` - case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get != "hive" => + case CreateTable(tableDesc, mode, Some(query)) + if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER => val cmd = CreateDataSourceTableAsSelectCommand( tableDesc, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index b4d3ca1f3707..8500ab460a1b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -687,8 +687,10 @@ case class AlterTableSetLocationCommand( object DDLUtils { + val HIVE_PROVIDER = "hive" + def isDatasourceTable(table: CatalogTable): Boolean = { - table.provider.isDefined && table.provider.get != "hive" + table.provider.isDefined && table.provider.get != HIVE_PROVIDER } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 4647b11af4df..5ba44ff9f5d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, RowOrd import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation} import org.apache.spark.sql.types.{AtomicType, StructType} @@ -127,7 +128,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl checkDuplication(normalizedPartitionCols, "partition") if (schema.nonEmpty && normalizedPartitionCols.length == schema.length) { - if (tableDesc.provider.get == "hive") { + if (tableDesc.provider.get == DDLUtils.HIVE_PROVIDER) { // When we hit this branch, it means users didn't specify schema for the table to be // created, as we always include partition columns in table schema for hive serde tables. // The real schema will be inferred at hive metastore by hive serde, plus the given @@ -292,7 +293,7 @@ object HiveOnlyCheck extends (LogicalPlan => Unit) { def apply(plan: LogicalPlan): Unit = { plan.foreach { case CreateTable(tableDesc, _, Some(_)) - if tableDesc.provider.get == "hive" => + if tableDesc.provider.get == DDLUtils.HIVE_PROVIDER => throw new AnalysisException("Hive support is required to use CREATE Hive TABLE AS SELECT") case _ => // OK diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index ac1ea7a356e8..d6c0baaea0d8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -196,7 +196,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat if (tableDefinition.tableType == VIEW) { client.createTable(tableDefinition, ignoreIfExists) - } else if (tableDefinition.provider.get == "hive") { + } else if (tableDefinition.provider.get == DDLUtils.HIVE_PROVIDER) { // Here we follow data source tables and put table metadata like provider, schema, etc. in // table properties, so that we can work around the Hive metastore issue about not case // preserving and make Hive serde table support mixed-case column names. @@ -468,6 +468,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat private def updateLocationInStorageProps( table: CatalogTable, newPath: Option[String]): CatalogStorageFormat = { + // We can't use `filterKeys` here, as the map returned by `filterKeys` is not serializable, + // while `CatalogTable` should be serializable. val propsWithoutPath = table.storage.properties.filter { case (k, v) => k.toLowerCase != "path" } @@ -508,7 +510,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } else { val oldTableDef = getRawTable(db, withStatsProps.identifier.table) - val newStorage = if (tableDefinition.provider.get == "hive") { + val newStorage = if (tableDefinition.provider.get == DDLUtils.HIVE_PROVIDER) { tableDefinition.storage } else { // We can't alter the table storage of data source table directly for 2 reasons: @@ -598,10 +600,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat getProviderFromTableProperties(table) match { // No provider in table properties, which means this table is created by Spark prior to 2.1, // or is created at Hive side. - case None => table.copy(provider = Some("hive"), tracksPartitionsInCatalog = true) + case None => + table.copy(provider = Some(DDLUtils.HIVE_PROVIDER), tracksPartitionsInCatalog = true) // This is a Hive serde table created by Spark 2.1 or higher versions. - case Some("hive") => restoreHiveSerdeTable(table) + case Some(DDLUtils.HIVE_PROVIDER) => restoreHiveSerdeTable(table) // This is a regular data source table. case Some(provider) => restoreDataSourceTable(table, provider) @@ -631,7 +634,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } private def restoreHiveSerdeTable(table: CatalogTable): CatalogTable = { - val hiveTable = table.copy(provider = Some("hive"), tracksPartitionsInCatalog = true) + val hiveTable = table.copy( + provider = Some(DDLUtils.HIVE_PROVIDER), + tracksPartitionsInCatalog = true) val schemaFromTableProps = getSchemaFromTableProperties(table) if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, table.schema)) { @@ -644,6 +649,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // schema we read back is different(ignore case and nullability) from the one in table // properties which was written when creating table, we should respect the table schema // from hive. + logWarning(s"The table schema given by Hive metastore(${table.schema.simpleString}) is " + + "different from the schema when this table was created by Spark SQL" + + s"(${schemaFromTableProps.simpleString}). We have to trust the table schema from Hive " + + "metastore which is not case preserving.") hiveTable } } From 3bd9362062b41ad64146cd24936501f4dfd65b68 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sat, 5 Nov 2016 12:53:41 +0800 Subject: [PATCH 7/7] minor --- .../scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index d6c0baaea0d8..b537061d0d22 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -651,8 +651,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // from hive. logWarning(s"The table schema given by Hive metastore(${table.schema.simpleString}) is " + "different from the schema when this table was created by Spark SQL" + - s"(${schemaFromTableProps.simpleString}). We have to trust the table schema from Hive " + - "metastore which is not case preserving.") + s"(${schemaFromTableProps.simpleString}). We have to fall back to the table schema from " + + "Hive metastore which is not case preserving.") hiveTable } }