From aae3abd673adc7ff939d842e49d566fa722403a3 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 2 Aug 2017 14:47:34 -0700 Subject: [PATCH 01/12] [SPARK-21617][SQL] Store correct metadata in Hive for altered DS table. This change fixes two issues: - when loading table metadata from Hive, restore the "provider" field of CatalogTable so DS tables can be identified. - when altering a DS table in the Hive metastore, make sure to not alter the table's schema, since the DS table's schema is stored as a table property in those cases. Also added a new unit test for this issue which fails without this change. --- .../sql/execution/command/DDLSuite.scala | 15 ++---- .../spark/sql/hive/HiveExternalCatalog.scala | 13 ++++- .../sql/hive/client/HiveClientImpl.scala | 7 ++- .../sql/hive/execution/HiveDDLSuite.scala | 54 +++++++++++++++++-- 4 files changed, 69 insertions(+), 20 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 5c0a6aa724bf0..2cab552dd6963 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -2356,18 +2356,9 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { }.getMessage assert(e.contains("Found duplicate column(s)")) } else { - if (isUsingHiveMetastore) { - // hive catalog will still complains that c1 is duplicate column name because hive - // identifiers are case insensitive. - val e = intercept[AnalysisException] { - sql("ALTER TABLE t1 ADD COLUMNS (C1 string)") - }.getMessage - assert(e.contains("HiveException")) - } else { - sql("ALTER TABLE t1 ADD COLUMNS (C1 string)") - assert(spark.table("t1").schema - .equals(new StructType().add("c1", IntegerType).add("C1", StringType))) - } + sql("ALTER TABLE t1 ADD COLUMNS (C1 string)") + assert(spark.table("t1").schema + .equals(new StructType().add("c1", IntegerType).add("C1", StringType))) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 70d7dd23d908a..766841a0c3d06 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -616,15 +616,24 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // Add table metadata such as table schema, partition columns, etc. to table properties. val updatedTable = withNewSchema.copy( properties = withNewSchema.properties ++ tableMetaToTableProps(withNewSchema)) + + // If it's a data source table, make sure the original schema is left unchanged; the + // actual schema is recorded as a table property. + val tableToStore = if (DDLUtils.isDatasourceTable(updatedTable)) { + updatedTable.copy(schema = rawTable.schema) + } else { + updatedTable + } + try { - client.alterTable(updatedTable) + client.alterTable(tableToStore) } catch { case NonFatal(e) => val warningMessage = s"Could not alter schema of table ${rawTable.identifier.quotedString} in a Hive " + "compatible way. Updating Hive metastore in Spark SQL specific format." logWarning(warningMessage, e) - client.alterTable(updatedTable.copy(schema = updatedTable.partitionSchema)) + client.alterTable(updatedTable.copy(schema = tableToStore.partitionSchema)) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index bde9a81c65a4e..c71b9f46eb3e1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -49,6 +49,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.client.HiveClientImpl._ import org.apache.spark.sql.types._ import org.apache.spark.util.{CircularBuffer, Utils} @@ -413,7 +414,10 @@ private[hive] class HiveClientImpl( unsupportedFeatures += "partitioned view" } - val properties = Option(h.getParameters).map(_.asScala.toMap).orNull + val properties = Option(h.getParameters).map(_.asScala.toMap).getOrElse(Map()) + + val provider = properties.get(HiveExternalCatalog.DATASOURCE_PROVIDER) + .orElse(Some(DDLUtils.HIVE_PROVIDER)) // Hive-generated Statistics are also recorded in ignoredProperties val ignoredProperties = scala.collection.mutable.Map.empty[String, String] @@ -468,6 +472,7 @@ private[hive] class HiveClientImpl( throw new AnalysisException("Hive index table is not supported.") }, schema = schema, + provider = provider, partitionColumnNames = partCols.map(_.name), // If the table is written by Spark, we will put bucketing information in table properties, // and will always overwrite the bucket spec in hive metastore by the bucketing information diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 5b62e37311d88..2cef3abfa6021 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -23,19 +23,20 @@ import java.net.URI import scala.language.existentials import org.apache.hadoop.fs.Path -import org.scalatest.BeforeAndAfterEach +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} -import org.apache.spark.SparkException -import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} +import org.apache.spark.{SparkException, SparkFunSuite} +import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils} -import org.apache.spark.sql.hive.HiveExternalCatalog +import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} import org.apache.spark.sql.hive.orc.OrcFileOperator import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} -import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION +import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -1998,3 +1999,46 @@ class HiveDDLSuite } } } + +/** + * A separate set of DDL tests that uses Hive 2.1 libraries, which behave a little differently + * from the built-in ones. + */ +class HiveDDLSuite_2_1 extends SparkFunSuite with BeforeAndAfterEach with BeforeAndAfterAll { + + private val spark = { + val warehouse = Utils.createTempDir() + val metastore = Utils.createTempDir() + metastore.delete() + SparkSession.builder() + .config(SparkLauncher.SPARK_MASTER, "local") + .config(WAREHOUSE_PATH.key, warehouse.toURI().toString()) + .config(CATALOG_IMPLEMENTATION.key, "hive") + .config(HiveUtils.HIVE_METASTORE_VERSION.key, "2.1") + .config(HiveUtils.HIVE_METASTORE_JARS.key, "maven") + .config("spark.hadoop.javax.jdo.option.ConnectionURL", + s"jdbc:derby:;databaseName=${metastore.getAbsolutePath()};create=true") + // These options are needed since the defaults in Hive 2.1 cause exceptions with an + // empty metastore db. + .config("spark.hadoop.datanucleus.schema.autoCreateAll", "true") + .config("spark.hadoop.hive.metastore.schema.verification", "false") + .getOrCreate() + } + + override def afterEach: Unit = { + spark.sessionState.catalog.reset() + } + + override def afterAll(): Unit = { + spark.close() + } + + test("SPARK-21617: ALTER TABLE..ADD COLUMNS for DataSource tables") { + spark.sql("CREATE TABLE t1 (c1 int) USING json") + spark.sql("ALTER TABLE t1 ADD COLUMNS (c2 int)") + + val df = spark.table("t1") + assert(df.schema.fieldNames === Array("c1", "c2")) + } + +} From 2350b105a599dde849e44bde50aa6d13812e4f83 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 4 Aug 2017 15:49:31 -0700 Subject: [PATCH 02/12] Fix 2.1 DDL suite to not use SparkSession. --- .../sql/hive/execution/HiveDDLSuite.scala | 57 ++++++++++++------- 1 file changed, 37 insertions(+), 20 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 2cef3abfa6021..688fda0f0c318 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -22,10 +22,11 @@ import java.net.URI import scala.language.existentials +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} -import org.apache.spark.{SparkException, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} import org.apache.spark.launcher.SparkLauncher import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException} @@ -39,6 +40,7 @@ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ +import org.apache.spark.tags.ExtendedHiveTest import org.apache.spark.util.Utils // TODO(gatorsmile): combine HiveCatalogedDDLSuite and HiveDDLSuite @@ -2004,41 +2006,56 @@ class HiveDDLSuite * A separate set of DDL tests that uses Hive 2.1 libraries, which behave a little differently * from the built-in ones. */ -class HiveDDLSuite_2_1 extends SparkFunSuite with BeforeAndAfterEach with BeforeAndAfterAll { +@ExtendedHiveTest +class Hive_2_1_DDLSuite extends SparkFunSuite with TestHiveSingleton with BeforeAndAfterEach + with BeforeAndAfterAll { - private val spark = { + // Create a custom HiveExternalCatalog instance with the desired configuration. We cannot + // use SparkSession here since there's already an active on managed by the TestHive object. + private var catalog = { val warehouse = Utils.createTempDir() val metastore = Utils.createTempDir() metastore.delete() - SparkSession.builder() - .config(SparkLauncher.SPARK_MASTER, "local") - .config(WAREHOUSE_PATH.key, warehouse.toURI().toString()) - .config(CATALOG_IMPLEMENTATION.key, "hive") - .config(HiveUtils.HIVE_METASTORE_VERSION.key, "2.1") - .config(HiveUtils.HIVE_METASTORE_JARS.key, "maven") - .config("spark.hadoop.javax.jdo.option.ConnectionURL", - s"jdbc:derby:;databaseName=${metastore.getAbsolutePath()};create=true") - // These options are needed since the defaults in Hive 2.1 cause exceptions with an - // empty metastore db. - .config("spark.hadoop.datanucleus.schema.autoCreateAll", "true") - .config("spark.hadoop.hive.metastore.schema.verification", "false") - .getOrCreate() + val sparkConf = new SparkConf() + .set(SparkLauncher.SPARK_MASTER, "local") + .set(WAREHOUSE_PATH.key, warehouse.toURI().toString()) + .set(CATALOG_IMPLEMENTATION.key, "hive") + .set(HiveUtils.HIVE_METASTORE_VERSION.key, "2.1") + .set(HiveUtils.HIVE_METASTORE_JARS.key, "maven") + + val hadoopConf = new Configuration() + hadoopConf.set("hive.metastore.warehouse.dir", warehouse.toURI().toString()) + hadoopConf.set("javax.jdo.option.ConnectionURL", + s"jdbc:derby:;databaseName=${metastore.getAbsolutePath()};create=true") + // These options are needed since the defaults in Hive 2.1 cause exceptions with an + // empty metastore db. + hadoopConf.set("datanucleus.schema.autoCreateAll", "true") + hadoopConf.set("hive.metastore.schema.verification", "false") + + new HiveExternalCatalog(sparkConf, hadoopConf) } override def afterEach: Unit = { + catalog.listTables("default").foreach { t => + catalog.dropTable("default", t, true, false) + } spark.sessionState.catalog.reset() } override def afterAll(): Unit = { - spark.close() + catalog = null } test("SPARK-21617: ALTER TABLE..ADD COLUMNS for DataSource tables") { spark.sql("CREATE TABLE t1 (c1 int) USING json") - spark.sql("ALTER TABLE t1 ADD COLUMNS (c2 int)") + val oldTable = spark.sessionState.catalog.externalCatalog.getTable("default", "t1") + catalog.createTable(oldTable, true) + + val newSchema = StructType(oldTable.schema.fields ++ Array(StructField("c2", IntegerType))) + catalog.alterTableSchema("default", "t1", newSchema) - val df = spark.table("t1") - assert(df.schema.fieldNames === Array("c1", "c2")) + val updatedTable = catalog.getTable("default", "t1") + assert(updatedTable.schema.fieldNames === Array("c1", "c2")) } } From 7ccf4743024a8a447a4b05369f6ebf237cf88c4f Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 4 Aug 2017 15:57:44 -0700 Subject: [PATCH 03/12] Proper fix. HiveExternalCatalog.alterTableSchema takes a shortcut by modifying the raw Hive table metadata instead of the full Spark view; that means it needs to be aware of whether the table is Hive-compatible or not. For compatible tables, the current "replace the schema" code is the correct path, except that an exception in that path should result in an error, and not in retrying in a different way. For non-compatible tables, Spark should just update the table properties, and leave the schema stored in the raw table untouched. Because Spark doesn't explicitly store metadata about whether a table is Hive-compatible or not, a new property was added just to make that explicit. The code tries to detect old DS tables that don't have the property and do the right thing. These changes also uncovered a problem with the way case-sensitive DS tables were being saved to the Hive metastore; the metastore is case-insensitive, and the code was treating these tables as Hive-compatible if the data source had a Hive counterpart (e.g. for parquet). In this scenario, the schema could be corrupted when being updated from Spark if conflicting columns existed ignoring case. The change fixes this by making case-sensitive DS-tables not Hive-compatible. --- .../spark/sql/hive/HiveExternalCatalog.scala | 87 ++++++++++++++----- .../sql/hive/client/HiveClientImpl.scala | 14 +-- 2 files changed, 71 insertions(+), 30 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 766841a0c3d06..e8f3963054406 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -32,7 +32,8 @@ import org.apache.thrift.TException import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging -import org.apache.spark.sql.AnalysisException +import org.apache.spark.internal.config.ConfigEntry +import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog._ @@ -43,7 +44,7 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.internal.HiveSerDe +import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.sql.types.{DataType, StructType} @@ -257,6 +258,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } } + /** + * Retrieve a configuration value for the current active session, if any. + */ + private def currentSessionConf[T](entry: ConfigEntry[T]): T = { + SparkSession.getActiveSession.orElse(SparkSession.getDefaultSession).map { session => + session.conf.get(entry) + }.getOrElse { + // If there's no active session, try to read from the SparkConf object instead. Normally + // there should be an active session, but unit tests invoke methods on the catalog directly, + // so that might not be true in some cases. + conf.get(entry) + } + } + private def createDataSourceTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = { // data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`. val provider = table.provider.get @@ -288,6 +303,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // bucket specification to empty. Note that partition columns are retained, so that we can // call partition-related Hive API later. def newSparkSQLSpecificMetastoreTable(): CatalogTable = { + val hiveCompatible = Map(DATASOURCE_HIVE_COMPATIBLE -> "false") table.copy( // Hive only allows directory paths as location URIs while Spark SQL data source tables // also allow file paths. For non-hive-compatible format, we should not set location URI @@ -297,11 +313,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat properties = storagePropsWithLocation), schema = table.partitionSchema, bucketSpec = None, - properties = table.properties ++ tableProperties) + properties = table.properties ++ tableProperties ++ hiveCompatible) } // converts the table metadata to Hive compatible format, i.e. set the serde information. def newHiveCompatibleMetastoreTable(serde: HiveSerDe): CatalogTable = { + val hiveCompatible = Map(DATASOURCE_HIVE_COMPATIBLE -> "true") val location = if (table.tableType == EXTERNAL) { // When we hit this branch, we are saving an external data source table with hive // compatible format, which means the data source is file-based and must have a `path`. @@ -320,7 +337,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat serde = serde.serde, properties = storagePropsWithLocation ), - properties = table.properties ++ tableProperties) + properties = table.properties ++ tableProperties ++ hiveCompatible) } val qualifiedTableName = table.identifier.quotedString @@ -342,6 +359,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " (None, message) + case _ if currentSessionConf(SQLConf.CASE_SENSITIVE) => + val message = + s"Persisting case sensitive data source table $qualifiedTableName into " + + "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " + (None, message) + case Some(serde) => val message = s"Persisting file based data source table $qualifiedTableName into " + @@ -386,6 +409,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat * can be used as table properties later. */ private def tableMetaToTableProps(table: CatalogTable): mutable.Map[String, String] = { + tableMetaToTableProps(table, table.schema) + } + + private def tableMetaToTableProps( + table: CatalogTable, + schema: StructType): mutable.Map[String, String] = { val partitionColumns = table.partitionColumnNames val bucketSpec = table.bucketSpec @@ -394,7 +423,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // property. In this case, we split the JSON string and store each part as a separate table // property. val threshold = conf.get(SCHEMA_STRING_LENGTH_THRESHOLD) - val schemaJsonString = table.schema.json + val schemaJsonString = schema.json // Split the JSON string. val parts = schemaJsonString.grouped(threshold).toSeq properties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString) @@ -611,30 +640,39 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat override def alterTableSchema(db: String, table: String, schema: StructType): Unit = withClient { requireTableExists(db, table) val rawTable = getRawTable(db, table) - val withNewSchema = rawTable.copy(schema = schema) - verifyColumnNames(withNewSchema) // Add table metadata such as table schema, partition columns, etc. to table properties. - val updatedTable = withNewSchema.copy( - properties = withNewSchema.properties ++ tableMetaToTableProps(withNewSchema)) - - // If it's a data source table, make sure the original schema is left unchanged; the - // actual schema is recorded as a table property. - val tableToStore = if (DDLUtils.isDatasourceTable(updatedTable)) { - updatedTable.copy(schema = rawTable.schema) + val updatedProperties = rawTable.properties ++ tableMetaToTableProps(rawTable, schema) + + // Detect whether this is a Hive-compatible table. + val provider = rawTable.properties.get(DATASOURCE_PROVIDER) + val isHiveCompatible = if (provider.isDefined && provider != Some(DDLUtils.HIVE_PROVIDER)) { + rawTable.properties.get(DATASOURCE_HIVE_COMPATIBLE) match { + case Some(value) => + value.toBoolean + case _ => + // If the property is not set, the table may have been created by an old version + // of Spark. Those versions set a "path" property in the table's storage descriptor + // for non-Hive-compatible tables, so use that to detect compatibility. + rawTable.storage.properties.get("path").isDefined + } } else { - updatedTable + // All non-DS tables are treated as regular Hive tables. + true } - try { - client.alterTable(tableToStore) - } catch { - case NonFatal(e) => - val warningMessage = - s"Could not alter schema of table ${rawTable.identifier.quotedString} in a Hive " + - "compatible way. Updating Hive metastore in Spark SQL specific format." - logWarning(warningMessage, e) - client.alterTable(updatedTable.copy(schema = tableToStore.partitionSchema)) + val updatedTable = if (isHiveCompatible) { + val _updated = rawTable.copy(properties = updatedProperties, schema = schema) + verifyColumnNames(_updated) + _updated + } else { + // If the table is not Hive-compatible, the schema of the table should not be overwritten with + // the updated schema. The previous value stored in the metastore should be preserved; that + // will be either the table's original partition schema, or a placeholder schema inserted by + // the Hive client wrapper if the partition schema was empty. + rawTable.copy(properties = updatedProperties) } + + client.alterTable(updatedTable) } override def alterTableStats( @@ -1202,6 +1240,7 @@ object HiveExternalCatalog { val DATASOURCE_SCHEMA_PARTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "partCol." val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol." val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol." + val DATASOURCE_HIVE_COMPATIBLE = SPARK_SQL_PREFIX + "hive.compatibility" val STATISTICS_PREFIX = SPARK_SQL_PREFIX + "statistics." val STATISTICS_TOTAL_SIZE = STATISTICS_PREFIX + "totalSize" diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index c71b9f46eb3e1..dfddb3b112257 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -414,10 +414,7 @@ private[hive] class HiveClientImpl( unsupportedFeatures += "partitioned view" } - val properties = Option(h.getParameters).map(_.asScala.toMap).getOrElse(Map()) - - val provider = properties.get(HiveExternalCatalog.DATASOURCE_PROVIDER) - .orElse(Some(DDLUtils.HIVE_PROVIDER)) + val properties = Option(h.getParameters).map(_.asScala.toMap).orNull // Hive-generated Statistics are also recorded in ignoredProperties val ignoredProperties = scala.collection.mutable.Map.empty[String, String] @@ -472,7 +469,6 @@ private[hive] class HiveClientImpl( throw new AnalysisException("Hive index table is not supported.") }, schema = schema, - provider = provider, partitionColumnNames = partCols.map(_.name), // If the table is written by Spark, we will put bucketing information in table properties, // and will always overwrite the bucket spec in hive metastore by the bucketing information @@ -913,7 +909,13 @@ private[hive] object HiveClientImpl { } // after SPARK-19279, it is not allowed to create a hive table with an empty schema, // so here we should not add a default col schema - if (schema.isEmpty && DDLUtils.isDatasourceTable(table)) { + // + // Because HiveExternalCatalog sometimes writes back "raw" tables that have not been + // completely translated to Spark's view, the provider information needs to be looked + // up in two places. + val provider = table.provider.orElse( + table.properties.get(HiveExternalCatalog.DATASOURCE_PROVIDER)) + if (schema.isEmpty && provider != Some(DDLUtils.HIVE_PROVIDER)) { // This is a hack to preserve existing behavior. Before Spark 2.0, we do not // set a default serde here (this was done in Hive), and so if the user provides // an empty schema Hive would automatically populate the schema with a single From 40ebc966ff8e9ad742f2cdbf631a289b8388560a Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 7 Aug 2017 11:31:11 -0700 Subject: [PATCH 04/12] Check for compatibility using serde, not path. --- .../org/apache/spark/sql/hive/HiveExternalCatalog.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index e8f3963054406..aa241ad55b1a2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -651,9 +651,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat value.toBoolean case _ => // If the property is not set, the table may have been created by an old version - // of Spark. Those versions set a "path" property in the table's storage descriptor - // for non-Hive-compatible tables, so use that to detect compatibility. - rawTable.storage.properties.get("path").isDefined + // of Spark. Detect Hive compatibility by comparing the table's serde with the + // serde for the table's data source. If they match, the table is Hive-compatible. + // If they don't, they're not, because of some other table property that made it + // not initially Hive-compatible. + HiveSerDe.sourceToSerDe(provider.get) == rawTable.storage.serde } } else { // All non-DS tables are treated as regular Hive tables. From 2f57a3c1db2b4f8e58456b48bbc62ef01fa14633 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 8 Aug 2017 11:00:13 -0700 Subject: [PATCH 05/12] equals -> == --- .../org/apache/spark/sql/execution/command/DDLSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 2cab552dd6963..b890ad0e350bd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -2357,8 +2357,8 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { assert(e.contains("Found duplicate column(s)")) } else { sql("ALTER TABLE t1 ADD COLUMNS (C1 string)") - assert(spark.table("t1").schema - .equals(new StructType().add("c1", IntegerType).add("C1", StringType))) + assert(spark.table("t1").schema == + new StructType().add("c1", IntegerType).add("C1", StringType)) } } } From 0b272094c8a490e16066e85fc48ad59c4ccf0468 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 8 Aug 2017 13:27:56 -0700 Subject: [PATCH 06/12] Break out and expand Hive_2_1_DDLSuite. --- .../spark/sql/hive/HiveExternalCatalog.scala | 2 +- .../sql/hive/execution/HiveDDLSuite.scala | 69 +-------- .../hive/execution/Hive_2_1_DDLSuite.scala | 131 ++++++++++++++++++ 3 files changed, 136 insertions(+), 66 deletions(-) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index aa241ad55b1a2..39afa1af590b1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -115,7 +115,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat * should interpret these special data source properties and restore the original table metadata * before returning it. */ - private def getRawTable(db: String, table: String): CatalogTable = withClient { + private[hive] def getRawTable(db: String, table: String): CatalogTable = withClient { client.getTable(db, table) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 688fda0f0c318..4474f834d4e14 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -22,25 +22,22 @@ import java.net.URI import scala.language.existentials -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.scalatest.BeforeAndAfterEach -import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} -import org.apache.spark.launcher.SparkLauncher -import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode, SparkSession} +import org.apache.spark.SparkException +import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils} -import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} +import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.orc.OrcFileOperator import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ -import org.apache.spark.tags.ExtendedHiveTest import org.apache.spark.util.Utils // TODO(gatorsmile): combine HiveCatalogedDDLSuite and HiveDDLSuite @@ -2001,61 +1998,3 @@ class HiveDDLSuite } } } - -/** - * A separate set of DDL tests that uses Hive 2.1 libraries, which behave a little differently - * from the built-in ones. - */ -@ExtendedHiveTest -class Hive_2_1_DDLSuite extends SparkFunSuite with TestHiveSingleton with BeforeAndAfterEach - with BeforeAndAfterAll { - - // Create a custom HiveExternalCatalog instance with the desired configuration. We cannot - // use SparkSession here since there's already an active on managed by the TestHive object. - private var catalog = { - val warehouse = Utils.createTempDir() - val metastore = Utils.createTempDir() - metastore.delete() - val sparkConf = new SparkConf() - .set(SparkLauncher.SPARK_MASTER, "local") - .set(WAREHOUSE_PATH.key, warehouse.toURI().toString()) - .set(CATALOG_IMPLEMENTATION.key, "hive") - .set(HiveUtils.HIVE_METASTORE_VERSION.key, "2.1") - .set(HiveUtils.HIVE_METASTORE_JARS.key, "maven") - - val hadoopConf = new Configuration() - hadoopConf.set("hive.metastore.warehouse.dir", warehouse.toURI().toString()) - hadoopConf.set("javax.jdo.option.ConnectionURL", - s"jdbc:derby:;databaseName=${metastore.getAbsolutePath()};create=true") - // These options are needed since the defaults in Hive 2.1 cause exceptions with an - // empty metastore db. - hadoopConf.set("datanucleus.schema.autoCreateAll", "true") - hadoopConf.set("hive.metastore.schema.verification", "false") - - new HiveExternalCatalog(sparkConf, hadoopConf) - } - - override def afterEach: Unit = { - catalog.listTables("default").foreach { t => - catalog.dropTable("default", t, true, false) - } - spark.sessionState.catalog.reset() - } - - override def afterAll(): Unit = { - catalog = null - } - - test("SPARK-21617: ALTER TABLE..ADD COLUMNS for DataSource tables") { - spark.sql("CREATE TABLE t1 (c1 int) USING json") - val oldTable = spark.sessionState.catalog.externalCatalog.getTable("default", "t1") - catalog.createTable(oldTable, true) - - val newSchema = StructType(oldTable.schema.fields ++ Array(StructField("c2", IntegerType))) - catalog.alterTableSchema("default", "t1", newSchema) - - val updatedTable = catalog.getTable("default", "t1") - assert(updatedTable.schema.fieldNames === Array("c1", "c2")) - } - -} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala new file mode 100644 index 0000000000000..177b6dcec3bc9 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import scala.language.existentials + +import org.apache.hadoop.conf.Configuration +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.StaticSQLConf._ +import org.apache.spark.sql.types._ +import org.apache.spark.tags.ExtendedHiveTest +import org.apache.spark.util.Utils + +/** + * A separate set of DDL tests that uses Hive 2.1 libraries, which behave a little differently + * from the built-in ones. + */ +@ExtendedHiveTest +class Hive_2_1_DDLSuite extends SparkFunSuite with TestHiveSingleton with BeforeAndAfterEach + with BeforeAndAfterAll { + + // Create a custom HiveExternalCatalog instance with the desired configuration. We cannot + // use SparkSession here since there's already an active on managed by the TestHive object. + private var catalog = { + val warehouse = Utils.createTempDir() + val metastore = Utils.createTempDir() + metastore.delete() + val sparkConf = new SparkConf() + .set(SparkLauncher.SPARK_MASTER, "local") + .set(WAREHOUSE_PATH.key, warehouse.toURI().toString()) + .set(CATALOG_IMPLEMENTATION.key, "hive") + .set(HiveUtils.HIVE_METASTORE_VERSION.key, "2.1") + .set(HiveUtils.HIVE_METASTORE_JARS.key, "maven") + + val hadoopConf = new Configuration() + hadoopConf.set("hive.metastore.warehouse.dir", warehouse.toURI().toString()) + hadoopConf.set("javax.jdo.option.ConnectionURL", + s"jdbc:derby:;databaseName=${metastore.getAbsolutePath()};create=true") + // These options are needed since the defaults in Hive 2.1 cause exceptions with an + // empty metastore db. + hadoopConf.set("datanucleus.schema.autoCreateAll", "true") + hadoopConf.set("hive.metastore.schema.verification", "false") + + new HiveExternalCatalog(sparkConf, hadoopConf) + } + + override def afterEach: Unit = { + catalog.listTables("default").foreach { t => + catalog.dropTable("default", t, true, false) + } + spark.sessionState.catalog.reset() + } + + override def afterAll(): Unit = { + catalog = null + } + + test("SPARK-21617: ALTER TABLE for non-compatible DataSource tables") { + testAlterTable( + "t1", + "CREATE TABLE t1 (c1 int) USING json", + StructType(Array(StructField("c1", IntegerType), StructField("c2", IntegerType))), + hiveCompatible = false) + } + + test("SPARK-21617: ALTER TABLE for Hive-compatible DataSource tables") { + testAlterTable( + "t1", + "CREATE TABLE t1 (c1 int) USING parquet", + StructType(Array(StructField("c1", IntegerType), StructField("c2", IntegerType)))) + } + + test("SPARK-21617: ALTER TABLE for Hive tables") { + testAlterTable( + "t1", + "CREATE TABLE t1 (c1 int) STORED AS parquet", + StructType(Array(StructField("c1", IntegerType), StructField("c2", IntegerType)))) + } + + test("SPARK-21617: ALTER TABLE with incompatible schema on Hive-compatible table") { + val exception = intercept[AnalysisException] { + testAlterTable( + "t1", + "CREATE TABLE t1 (c1 string) USING parquet", + StructType(Array(StructField("c2", IntegerType)))) + } + assert(exception.getMessage().contains("types incompatible with the existing columns")) + } + + private def testAlterTable( + tableName: String, + createTableStmt: String, + updatedSchema: StructType, + hiveCompatible: Boolean = true): Unit = { + spark.sql(createTableStmt) + val oldTable = spark.sessionState.catalog.externalCatalog.getTable("default", tableName) + catalog.createTable(oldTable, true) + catalog.alterTableSchema("default", tableName, updatedSchema) + + val updatedTable = catalog.getTable("default", tableName) + assert(updatedTable.schema.fieldNames === updatedSchema.fieldNames) + + val rawTable = catalog.getRawTable("default", tableName) + val compatibility = rawTable.properties.get(HiveExternalCatalog.DATASOURCE_HIVE_COMPATIBLE) + .map(_.toBoolean).getOrElse(true) + assert(hiveCompatible === compatibility) + } + +} From 6824e358d231b7c56031c946d983cde0b89fd574 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 8 Aug 2017 16:43:36 -0700 Subject: [PATCH 07/12] Feedback. --- .../scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala | 2 +- .../org/apache/spark/sql/hive/execution/HiveDDLSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 39afa1af590b1..0ae39344d02d8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -1242,7 +1242,7 @@ object HiveExternalCatalog { val DATASOURCE_SCHEMA_PARTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "partCol." val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol." val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol." - val DATASOURCE_HIVE_COMPATIBLE = SPARK_SQL_PREFIX + "hive.compatibility" + val DATASOURCE_HIVE_COMPATIBLE = DATASOURCE_PREFIX + "hive.compatibility" val STATISTICS_PREFIX = SPARK_SQL_PREFIX + "statistics." val STATISTICS_TOTAL_SIZE = STATISTICS_PREFIX + "totalSize" diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 4474f834d4e14..5b62e37311d88 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.orc.OrcFileOperator import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} -import org.apache.spark.sql.internal.StaticSQLConf._ +import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ import org.apache.spark.util.Utils From 7b777edccddf91f2dfb99b30855265284188e00b Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 8 Aug 2017 18:25:57 -0700 Subject: [PATCH 08/12] Move `isHiveCompatible` to its own method. --- .../spark/sql/hive/HiveExternalCatalog.scala | 42 ++++++++++--------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 0ae39344d02d8..24a6ff85f069c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -643,26 +643,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // Add table metadata such as table schema, partition columns, etc. to table properties. val updatedProperties = rawTable.properties ++ tableMetaToTableProps(rawTable, schema) - // Detect whether this is a Hive-compatible table. - val provider = rawTable.properties.get(DATASOURCE_PROVIDER) - val isHiveCompatible = if (provider.isDefined && provider != Some(DDLUtils.HIVE_PROVIDER)) { - rawTable.properties.get(DATASOURCE_HIVE_COMPATIBLE) match { - case Some(value) => - value.toBoolean - case _ => - // If the property is not set, the table may have been created by an old version - // of Spark. Detect Hive compatibility by comparing the table's serde with the - // serde for the table's data source. If they match, the table is Hive-compatible. - // If they don't, they're not, because of some other table property that made it - // not initially Hive-compatible. - HiveSerDe.sourceToSerDe(provider.get) == rawTable.storage.serde - } - } else { - // All non-DS tables are treated as regular Hive tables. - true - } - - val updatedTable = if (isHiveCompatible) { + val updatedTable = if (isHiveCompatible(rawTable)) { val _updated = rawTable.copy(properties = updatedProperties, schema = schema) verifyColumnNames(_updated) _updated @@ -1224,6 +1205,27 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat client.listFunctions(db, pattern) } + /** Detect whether a table is stored with Hive-compatible metadata. */ + private def isHiveCompatible(table: CatalogTable): Boolean = { + val provider = table.provider.orElse(table.properties.get(DATASOURCE_PROVIDER)) + if (provider.isDefined && provider != Some(DDLUtils.HIVE_PROVIDER)) { + table.properties.get(DATASOURCE_HIVE_COMPATIBLE) match { + case Some(value) => + value.toBoolean + case _ => + // If the property is not set, the table may have been created by an old version + // of Spark. Detect Hive compatibility by comparing the table's serde with the + // serde for the table's data source. If they match, the table is Hive-compatible. + // If they don't, they're not, because of some other table property that made it + // not initially Hive-compatible. + HiveSerDe.sourceToSerDe(provider.get) == table.storage.serde + } + } else { + // All non-DS tables are treated as regular Hive tables. + true + } + } + } object HiveExternalCatalog { From abd6cf119c0b400fda9c96fcb6432b090c2505bf Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 9 Aug 2017 10:35:46 -0700 Subject: [PATCH 09/12] Move comment. --- .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index dfddb3b112257..c06680658b39d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -907,14 +907,13 @@ private[hive] object HiveClientImpl { val (partCols, schema) = table.schema.map(toHiveColumn).partition { c => table.partitionColumnNames.contains(c.getName) } - // after SPARK-19279, it is not allowed to create a hive table with an empty schema, - // so here we should not add a default col schema - // // Because HiveExternalCatalog sometimes writes back "raw" tables that have not been // completely translated to Spark's view, the provider information needs to be looked // up in two places. val provider = table.provider.orElse( table.properties.get(HiveExternalCatalog.DATASOURCE_PROVIDER)) + // after SPARK-19279, it is not allowed to create a hive table with an empty schema, + // so here we should not add a default col schema if (schema.isEmpty && provider != Some(DDLUtils.HIVE_PROVIDER)) { // This is a hack to preserve existing behavior. Before Spark 2.0, we do not // set a default serde here (this was done in Hive), and so if the user provides From 4a05b55b5c23755cba384cf85a0af2b802b8a9bd Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 15 Aug 2017 09:49:00 -0700 Subject: [PATCH 10/12] Remove special handling of case-sensitive tables. --- .../sql/execution/command/DDLSuite.scala | 15 ++++++++--- .../spark/sql/hive/HiveExternalCatalog.scala | 25 ++----------------- 2 files changed, 14 insertions(+), 26 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index b890ad0e350bd..5c0a6aa724bf0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -2356,9 +2356,18 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { }.getMessage assert(e.contains("Found duplicate column(s)")) } else { - sql("ALTER TABLE t1 ADD COLUMNS (C1 string)") - assert(spark.table("t1").schema == - new StructType().add("c1", IntegerType).add("C1", StringType)) + if (isUsingHiveMetastore) { + // hive catalog will still complains that c1 is duplicate column name because hive + // identifiers are case insensitive. + val e = intercept[AnalysisException] { + sql("ALTER TABLE t1 ADD COLUMNS (C1 string)") + }.getMessage + assert(e.contains("HiveException")) + } else { + sql("ALTER TABLE t1 ADD COLUMNS (C1 string)") + assert(spark.table("t1").schema + .equals(new StructType().add("c1", IntegerType).add("C1", StringType))) + } } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 24a6ff85f069c..30b98576b483e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -32,8 +32,7 @@ import org.apache.thrift.TException import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.ConfigEntry -import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog._ @@ -44,7 +43,7 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} +import org.apache.spark.sql.internal.HiveSerDe import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.sql.types.{DataType, StructType} @@ -258,20 +257,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } } - /** - * Retrieve a configuration value for the current active session, if any. - */ - private def currentSessionConf[T](entry: ConfigEntry[T]): T = { - SparkSession.getActiveSession.orElse(SparkSession.getDefaultSession).map { session => - session.conf.get(entry) - }.getOrElse { - // If there's no active session, try to read from the SparkConf object instead. Normally - // there should be an active session, but unit tests invoke methods on the catalog directly, - // so that might not be true in some cases. - conf.get(entry) - } - } - private def createDataSourceTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = { // data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`. val provider = table.provider.get @@ -359,12 +344,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " (None, message) - case _ if currentSessionConf(SQLConf.CASE_SENSITIVE) => - val message = - s"Persisting case sensitive data source table $qualifiedTableName into " + - "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " - (None, message) - case Some(serde) => val message = s"Persisting file based data source table $qualifiedTableName into " + From cef66ac783a096cc5c623fa52ac9276321e554f9 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 21 Aug 2017 11:23:17 -0700 Subject: [PATCH 11/12] Only retry writing to metastore for data source tables. This avoids corrupting Hive tables, but allows data source tables to become non-Hive-compatible depending on what the user does. --- .../sql/execution/command/DDLSuite.scala | 15 +--- .../spark/sql/hive/HiveExternalCatalog.scala | 70 +++++++++---------- .../sql/hive/client/HiveClientImpl.scala | 7 +- .../hive/execution/Hive_2_1_DDLSuite.scala | 5 -- 4 files changed, 36 insertions(+), 61 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 9332f773430e7..0c7715678ed6b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -2357,18 +2357,9 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { }.getMessage assert(e.contains("Found duplicate column(s)")) } else { - if (isUsingHiveMetastore) { - // hive catalog will still complains that c1 is duplicate column name because hive - // identifiers are case insensitive. - val e = intercept[AnalysisException] { - sql("ALTER TABLE t1 ADD COLUMNS (C1 string)") - }.getMessage - assert(e.contains("HiveException")) - } else { - sql("ALTER TABLE t1 ADD COLUMNS (C1 string)") - assert(spark.table("t1").schema - .equals(new StructType().add("c1", IntegerType).add("C1", StringType))) - } + sql("ALTER TABLE t1 ADD COLUMNS (C1 string)") + assert(spark.table("t1").schema + .equals(new StructType().add("c1", IntegerType).add("C1", StringType))) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 80bc278bed819..bdbb8bccbc5cd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -288,7 +288,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // bucket specification to empty. Note that partition columns are retained, so that we can // call partition-related Hive API later. def newSparkSQLSpecificMetastoreTable(): CatalogTable = { - val hiveCompatible = Map(DATASOURCE_HIVE_COMPATIBLE -> "false") table.copy( // Hive only allows directory paths as location URIs while Spark SQL data source tables // also allow file paths. For non-hive-compatible format, we should not set location URI @@ -298,12 +297,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat properties = storagePropsWithLocation), schema = table.partitionSchema, bucketSpec = None, - properties = table.properties ++ tableProperties ++ hiveCompatible) + properties = table.properties ++ tableProperties) } // converts the table metadata to Hive compatible format, i.e. set the serde information. def newHiveCompatibleMetastoreTable(serde: HiveSerDe): CatalogTable = { - val hiveCompatible = Map(DATASOURCE_HIVE_COMPATIBLE -> "true") val location = if (table.tableType == EXTERNAL) { // When we hit this branch, we are saving an external data source table with hive // compatible format, which means the data source is file-based and must have a `path`. @@ -322,7 +320,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat serde = serde.serde, properties = storagePropsWithLocation ), - properties = table.properties ++ tableProperties ++ hiveCompatible) + properties = table.properties ++ tableProperties) } val qualifiedTableName = table.identifier.quotedString @@ -625,20 +623,28 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val rawTable = getRawTable(db, table) // Add table metadata such as table schema, partition columns, etc. to table properties. val updatedProperties = rawTable.properties ++ tableMetaToTableProps(rawTable, schema) - - val updatedTable = if (isHiveCompatible(rawTable)) { - val _updated = rawTable.copy(properties = updatedProperties, schema = schema) - verifyColumnNames(_updated) - _updated + val withNewSchema = rawTable.copy(properties = updatedProperties, schema = schema) + verifyColumnNames(withNewSchema) + + if (isDatasourceTable(rawTable)) { + // For data source tables, first try to write it with the schema set; if that does not work, + // try again with updated properties and the partition schema. This is a simplified version of + // what createDataSourceTable() does, and may leave the table in a state unreadable by Hive + // (for example, the schema does not match the data source schema, or does not match the + // storage descriptor). + try { + client.alterTable(withNewSchema) + } catch { + case NonFatal(e) => + val warningMessage = + s"Could not alter schema of table ${rawTable.identifier.quotedString} in a Hive " + + "compatible way. Updating Hive metastore in Spark SQL specific format." + logWarning(warningMessage, e) + client.alterTable(withNewSchema.copy(schema = rawTable.partitionSchema)) + } } else { - // If the table is not Hive-compatible, the schema of the table should not be overwritten with - // the updated schema. The previous value stored in the metastore should be preserved; that - // will be either the table's original partition schema, or a placeholder schema inserted by - // the Hive client wrapper if the partition schema was empty. - rawTable.copy(properties = updatedProperties) + client.alterTable(withNewSchema) } - - client.alterTable(updatedTable) } override def alterTableStats( @@ -1260,27 +1266,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat client.listFunctions(db, pattern) } - /** Detect whether a table is stored with Hive-compatible metadata. */ - private def isHiveCompatible(table: CatalogTable): Boolean = { - val provider = table.provider.orElse(table.properties.get(DATASOURCE_PROVIDER)) - if (provider.isDefined && provider != Some(DDLUtils.HIVE_PROVIDER)) { - table.properties.get(DATASOURCE_HIVE_COMPATIBLE) match { - case Some(value) => - value.toBoolean - case _ => - // If the property is not set, the table may have been created by an old version - // of Spark. Detect Hive compatibility by comparing the table's serde with the - // serde for the table's data source. If they match, the table is Hive-compatible. - // If they don't, they're not, because of some other table property that made it - // not initially Hive-compatible. - HiveSerDe.sourceToSerDe(provider.get) == table.storage.serde - } - } else { - // All non-DS tables are treated as regular Hive tables. - true - } - } - } object HiveExternalCatalog { @@ -1299,7 +1284,6 @@ object HiveExternalCatalog { val DATASOURCE_SCHEMA_PARTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "partCol." val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol." val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol." - val DATASOURCE_HIVE_COMPATIBLE = DATASOURCE_PREFIX + "hive.compatibility" val STATISTICS_PREFIX = SPARK_SQL_PREFIX + "statistics." val STATISTICS_TOTAL_SIZE = STATISTICS_PREFIX + "totalSize" @@ -1382,4 +1366,14 @@ object HiveExternalCatalog { getColumnNamesByType(metadata.properties, "sort", "sorting columns")) } } + + /** + * Detects a data source table. This checks both the table provider and the table properties, + * unlike DDLUtils which just checks the former. + */ + private[spark] def isDatasourceTable(table: CatalogTable): Boolean = { + val provider = table.provider.orElse(table.properties.get(DATASOURCE_PROVIDER)) + provider.isDefined && provider != Some(DDLUtils.HIVE_PROVIDER) + } + } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 64687017cfaa2..7c0b9bf19bf30 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -882,14 +882,9 @@ private[hive] object HiveClientImpl { val (partCols, schema) = table.schema.map(toHiveColumn).partition { c => table.partitionColumnNames.contains(c.getName) } - // Because HiveExternalCatalog sometimes writes back "raw" tables that have not been - // completely translated to Spark's view, the provider information needs to be looked - // up in two places. - val provider = table.provider.orElse( - table.properties.get(HiveExternalCatalog.DATASOURCE_PROVIDER)) // after SPARK-19279, it is not allowed to create a hive table with an empty schema, // so here we should not add a default col schema - if (schema.isEmpty && provider != Some(DDLUtils.HIVE_PROVIDER)) { + if (schema.isEmpty && HiveExternalCatalog.isDatasourceTable(table)) { // This is a hack to preserve existing behavior. Before Spark 2.0, we do not // set a default serde here (this was done in Hive), and so if the user provides // an empty schema Hive would automatically populate the schema with a single diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala index 177b6dcec3bc9..5c248b9acd04f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala @@ -121,11 +121,6 @@ class Hive_2_1_DDLSuite extends SparkFunSuite with TestHiveSingleton with Before val updatedTable = catalog.getTable("default", tableName) assert(updatedTable.schema.fieldNames === updatedSchema.fieldNames) - - val rawTable = catalog.getRawTable("default", tableName) - val compatibility = rawTable.properties.get(HiveExternalCatalog.DATASOURCE_HIVE_COMPATIBLE) - .map(_.toBoolean).getOrElse(true) - assert(hiveCompatible === compatibility) } } From c41683a907cfd9dad9b16d462e7d8f6b5f78c200 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 21 Aug 2017 11:28:33 -0700 Subject: [PATCH 12/12] Redo style change. --- .../org/apache/spark/sql/execution/command/DDLSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 0c7715678ed6b..ad6fc20df1f02 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -2358,8 +2358,8 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { assert(e.contains("Found duplicate column(s)")) } else { sql("ALTER TABLE t1 ADD COLUMNS (C1 string)") - assert(spark.table("t1").schema - .equals(new StructType().add("c1", IntegerType).add("C1", StringType))) + assert(spark.table("t1").schema == + new StructType().add("c1", IntegerType).add("C1", StringType)) } } }