diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 5c0a6aa724bf0..2cab552dd6963 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -2356,18 +2356,9 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { }.getMessage assert(e.contains("Found duplicate column(s)")) } else { - if (isUsingHiveMetastore) { - // hive catalog will still complains that c1 is duplicate column name because hive - // identifiers are case insensitive. - val e = intercept[AnalysisException] { - sql("ALTER TABLE t1 ADD COLUMNS (C1 string)") - }.getMessage - assert(e.contains("HiveException")) - } else { - sql("ALTER TABLE t1 ADD COLUMNS (C1 string)") - assert(spark.table("t1").schema - .equals(new StructType().add("c1", IntegerType).add("C1", StringType))) - } + sql("ALTER TABLE t1 ADD COLUMNS (C1 string)") + assert(spark.table("t1").schema + .equals(new StructType().add("c1", IntegerType).add("C1", StringType))) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 70d7dd23d908a..964e1c175a653 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -238,9 +238,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } if (DDLUtils.isDatasourceTable(tableDefinition)) { - createDataSourceTable( - tableDefinition.withNewStorage(locationUri = tableLocation), - ignoreIfExists) + saveDataSourceTable(tableDefinition.withNewStorage(locationUri = tableLocation)) { table => + saveTableIntoHive(table, ignoreIfExists) + } } else { val tableWithDataSourceProps = tableDefinition.copy( // We can't leave `locationUri` empty and count on Hive metastore to set a default table @@ -257,7 +257,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } } - private def createDataSourceTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = { + private def saveDataSourceTable(table: CatalogTable)(saveFn: CatalogTable => Unit): Unit = { // data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`. val provider = table.provider.get @@ -363,19 +363,19 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // specific way. try { logInfo(message) - saveTableIntoHive(table, ignoreIfExists) + saveFn(table) } catch { case NonFatal(e) => val warningMessage = s"Could not persist ${table.identifier.quotedString} in a Hive " + "compatible way. Persisting it into Hive metastore in Spark SQL specific format." logWarning(warningMessage, e) - saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists) + saveFn(newSparkSQLSpecificMetastoreTable()) } case (None, message) => logWarning(message) - saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists) + saveFn(newSparkSQLSpecificMetastoreTable()) } } @@ -610,21 +610,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat override def alterTableSchema(db: String, table: String, schema: StructType): Unit = withClient { requireTableExists(db, table) - val rawTable = getRawTable(db, table) - val withNewSchema = rawTable.copy(schema = schema) - verifyColumnNames(withNewSchema) - // Add table metadata such as table schema, partition columns, etc. to table properties. - val updatedTable = withNewSchema.copy( - properties = withNewSchema.properties ++ tableMetaToTableProps(withNewSchema)) - try { + val updatedTable = getTable(db, table).copy(schema = schema) + verifyColumnNames(updatedTable) + if (DDLUtils.isDatasourceTable(updatedTable)) { + saveDataSourceTable(updatedTable) { table => + client.alterTable(table) + } + } else { client.alterTable(updatedTable) - } catch { - case NonFatal(e) => - val warningMessage = - s"Could not alter schema of table ${rawTable.identifier.quotedString} in a Hive " + - "compatible way. Updating Hive metastore in Spark SQL specific format." - logWarning(warningMessage, e) - client.alterTable(updatedTable.copy(schema = updatedTable.partitionSchema)) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index bde9a81c65a4e..871d6965a9008 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -49,6 +49,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.client.HiveClientImpl._ import org.apache.spark.sql.types._ import org.apache.spark.util.{CircularBuffer, Utils} @@ -413,7 +414,7 @@ private[hive] class HiveClientImpl( unsupportedFeatures += "partitioned view" } - val properties = Option(h.getParameters).map(_.asScala.toMap).orNull + val properties = Option(h.getParameters).map(_.asScala.toMap).getOrElse(Map()) // Hive-generated Statistics are also recorded in ignoredProperties val ignoredProperties = scala.collection.mutable.Map.empty[String, String] @@ -528,7 +529,7 @@ private[hive] class HiveClientImpl( table.copy(properties = table.ignoredProperties ++ table.properties), Some(userName)) // Do not use `table.qualifiedName` here because this may be a rename val qualifiedTableName = s"${table.database}.$tableName" - shim.alterTable(client, qualifiedTableName, hiveTable) + shim.alterTable(client, qualifiedTableName, hiveTable, table.storage.locationUri.isDefined) } override def createPartitions( @@ -607,8 +608,10 @@ private[hive] class HiveClientImpl( db: String, table: String, newParts: Seq[CatalogTablePartition]): Unit = withHiveState { - val hiveTable = toHiveTable(getTable(db, table), Some(userName)) - shim.alterPartitions(client, table, newParts.map { p => toHivePartition(p, hiveTable) }.asJava) + val sparkTable = getTable(db, table) + val hiveTable = toHiveTable(sparkTable, Some(userName)) + shim.alterPartitions(client, table, newParts.map { p => toHivePartition(p, hiveTable) }.asJava, + sparkTable.storage.locationUri.isDefined) } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 449a303b59eed..d04a2f732709e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.hive.client import java.lang.{Boolean => JBoolean, Integer => JInteger, Long => JLong} import java.lang.reflect.{InvocationTargetException, Method, Modifier} import java.net.URI -import java.util.{ArrayList => JArrayList, List => JList, Locale, Map => JMap, Set => JSet} +import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, Locale, Map => JMap, Set => JSet} import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ @@ -86,9 +86,13 @@ private[client] sealed abstract class Shim { def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long - def alterTable(hive: Hive, tableName: String, table: Table): Unit + def alterTable(hive: Hive, tableName: String, table: Table, allowGatherStats: Boolean): Unit - def alterPartitions(hive: Hive, tableName: String, newParts: JList[Partition]): Unit + def alterPartitions( + hive: Hive, + tableName: String, + newParts: JList[Partition], + allowGatherStats: Boolean): Unit def createPartitions( hive: Hive, @@ -397,11 +401,19 @@ private[client] class Shim_v0_12 extends Shim with Logging { hive.dropTable(dbName, tableName, deleteData, ignoreIfNotExists) } - override def alterTable(hive: Hive, tableName: String, table: Table): Unit = { + override def alterTable( + hive: Hive, + tableName: String, + table: Table, + allowGatherStats: Boolean): Unit = { alterTableMethod.invoke(hive, tableName, table) } - override def alterPartitions(hive: Hive, tableName: String, newParts: JList[Partition]): Unit = { + override def alterPartitions( + hive: Hive, + tableName: String, + newParts: JList[Partition], + allowGatherStats: Boolean): Unit = { alterPartitionsMethod.invoke(hive, tableName, newParts) } @@ -1008,9 +1020,6 @@ private[client] class Shim_v2_1 extends Shim_v2_0 { // true if there is any following stats task protected lazy val hasFollowingStatsTask = JBoolean.FALSE - // TODO: Now, always set environmentContext to null. In the future, we should avoid setting - // hive-generated stats to -1 when altering tables by using environmentContext. See Hive-12730 - protected lazy val environmentContextInAlterTable = null private lazy val loadPartitionMethod = findMethod( @@ -1102,11 +1111,35 @@ private[client] class Shim_v2_1 extends Shim_v2_0 { hasFollowingStatsTask, AcidUtils.Operation.NOT_ACID) } - override def alterTable(hive: Hive, tableName: String, table: Table): Unit = { - alterTableMethod.invoke(hive, tableName, table, environmentContextInAlterTable) + override def alterTable( + hive: Hive, + tableName: String, + table: Table, + allowGatherStats: Boolean): Unit = { + alterTableMethod.invoke(hive, tableName, table, createEnvironmentContext(allowGatherStats)) + } + + override def alterPartitions( + hive: Hive, + tableName: String, + newParts: JList[Partition], + allowGatherStats: Boolean): Unit = { + alterPartitionsMethod.invoke(hive, tableName, newParts, + createEnvironmentContext(allowGatherStats)) } - override def alterPartitions(hive: Hive, tableName: String, newParts: JList[Partition]): Unit = { - alterPartitionsMethod.invoke(hive, tableName, newParts, environmentContextInAlterTable) + // TODO: In the future, we should avoid setting hive-generated stats to -1 when altering tables. + // See HIVE-12730. + private def createEnvironmentContext(allowGatherStats: Boolean): EnvironmentContext = { + if (!allowGatherStats) { + val properties = new JHashMap[String, String]() + properties.put("DO_NOT_UPDATE_STATS", "true") + + val ctx = new EnvironmentContext() + ctx.setProperties(properties) + ctx + } else { + null + } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 5b62e37311d88..688fda0f0c318 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -22,22 +22,25 @@ import java.net.URI import scala.language.existentials +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.scalatest.BeforeAndAfterEach +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} -import org.apache.spark.SparkException -import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} +import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} +import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils} -import org.apache.spark.sql.hive.HiveExternalCatalog +import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} import org.apache.spark.sql.hive.orc.OrcFileOperator import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} -import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION +import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ +import org.apache.spark.tags.ExtendedHiveTest import org.apache.spark.util.Utils // TODO(gatorsmile): combine HiveCatalogedDDLSuite and HiveDDLSuite @@ -1998,3 +2001,61 @@ class HiveDDLSuite } } } + +/** + * A separate set of DDL tests that uses Hive 2.1 libraries, which behave a little differently + * from the built-in ones. + */ +@ExtendedHiveTest +class Hive_2_1_DDLSuite extends SparkFunSuite with TestHiveSingleton with BeforeAndAfterEach + with BeforeAndAfterAll { + + // Create a custom HiveExternalCatalog instance with the desired configuration. We cannot + // use SparkSession here since there's already an active on managed by the TestHive object. + private var catalog = { + val warehouse = Utils.createTempDir() + val metastore = Utils.createTempDir() + metastore.delete() + val sparkConf = new SparkConf() + .set(SparkLauncher.SPARK_MASTER, "local") + .set(WAREHOUSE_PATH.key, warehouse.toURI().toString()) + .set(CATALOG_IMPLEMENTATION.key, "hive") + .set(HiveUtils.HIVE_METASTORE_VERSION.key, "2.1") + .set(HiveUtils.HIVE_METASTORE_JARS.key, "maven") + + val hadoopConf = new Configuration() + hadoopConf.set("hive.metastore.warehouse.dir", warehouse.toURI().toString()) + hadoopConf.set("javax.jdo.option.ConnectionURL", + s"jdbc:derby:;databaseName=${metastore.getAbsolutePath()};create=true") + // These options are needed since the defaults in Hive 2.1 cause exceptions with an + // empty metastore db. + hadoopConf.set("datanucleus.schema.autoCreateAll", "true") + hadoopConf.set("hive.metastore.schema.verification", "false") + + new HiveExternalCatalog(sparkConf, hadoopConf) + } + + override def afterEach: Unit = { + catalog.listTables("default").foreach { t => + catalog.dropTable("default", t, true, false) + } + spark.sessionState.catalog.reset() + } + + override def afterAll(): Unit = { + catalog = null + } + + test("SPARK-21617: ALTER TABLE..ADD COLUMNS for DataSource tables") { + spark.sql("CREATE TABLE t1 (c1 int) USING json") + val oldTable = spark.sessionState.catalog.externalCatalog.getTable("default", "t1") + catalog.createTable(oldTable, true) + + val newSchema = StructType(oldTable.schema.fields ++ Array(StructField("c2", IntegerType))) + catalog.alterTableSchema("default", "t1", newSchema) + + val updatedTable = catalog.getTable("default", "t1") + assert(updatedTable.schema.fieldNames === Array("c1", "c2")) + } + +}