From aae3abd673adc7ff939d842e49d566fa722403a3 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 2 Aug 2017 14:47:34 -0700 Subject: [PATCH 1/2] [SPARK-21617][SQL] Store correct metadata in Hive for altered DS table. This change fixes two issues: - when loading table metadata from Hive, restore the "provider" field of CatalogTable so DS tables can be identified. - when altering a DS table in the Hive metastore, make sure to not alter the table's schema, since the DS table's schema is stored as a table property in those cases. Also added a new unit test for this issue which fails without this change. --- .../sql/execution/command/DDLSuite.scala | 15 ++---- .../spark/sql/hive/HiveExternalCatalog.scala | 13 ++++- .../sql/hive/client/HiveClientImpl.scala | 7 ++- .../sql/hive/execution/HiveDDLSuite.scala | 54 +++++++++++++++++-- 4 files changed, 69 insertions(+), 20 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 5c0a6aa724bf0..2cab552dd6963 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -2356,18 +2356,9 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { }.getMessage assert(e.contains("Found duplicate column(s)")) } else { - if (isUsingHiveMetastore) { - // hive catalog will still complains that c1 is duplicate column name because hive - // identifiers are case insensitive. - val e = intercept[AnalysisException] { - sql("ALTER TABLE t1 ADD COLUMNS (C1 string)") - }.getMessage - assert(e.contains("HiveException")) - } else { - sql("ALTER TABLE t1 ADD COLUMNS (C1 string)") - assert(spark.table("t1").schema - .equals(new StructType().add("c1", IntegerType).add("C1", StringType))) - } + sql("ALTER TABLE t1 ADD COLUMNS (C1 string)") + assert(spark.table("t1").schema + .equals(new StructType().add("c1", IntegerType).add("C1", StringType))) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 70d7dd23d908a..766841a0c3d06 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -616,15 +616,24 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // Add table metadata such as table schema, partition columns, etc. to table properties. val updatedTable = withNewSchema.copy( properties = withNewSchema.properties ++ tableMetaToTableProps(withNewSchema)) + + // If it's a data source table, make sure the original schema is left unchanged; the + // actual schema is recorded as a table property. + val tableToStore = if (DDLUtils.isDatasourceTable(updatedTable)) { + updatedTable.copy(schema = rawTable.schema) + } else { + updatedTable + } + try { - client.alterTable(updatedTable) + client.alterTable(tableToStore) } catch { case NonFatal(e) => val warningMessage = s"Could not alter schema of table ${rawTable.identifier.quotedString} in a Hive " + "compatible way. Updating Hive metastore in Spark SQL specific format." logWarning(warningMessage, e) - client.alterTable(updatedTable.copy(schema = updatedTable.partitionSchema)) + client.alterTable(updatedTable.copy(schema = tableToStore.partitionSchema)) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index bde9a81c65a4e..c71b9f46eb3e1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -49,6 +49,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.client.HiveClientImpl._ import org.apache.spark.sql.types._ import org.apache.spark.util.{CircularBuffer, Utils} @@ -413,7 +414,10 @@ private[hive] class HiveClientImpl( unsupportedFeatures += "partitioned view" } - val properties = Option(h.getParameters).map(_.asScala.toMap).orNull + val properties = Option(h.getParameters).map(_.asScala.toMap).getOrElse(Map()) + + val provider = properties.get(HiveExternalCatalog.DATASOURCE_PROVIDER) + .orElse(Some(DDLUtils.HIVE_PROVIDER)) // Hive-generated Statistics are also recorded in ignoredProperties val ignoredProperties = scala.collection.mutable.Map.empty[String, String] @@ -468,6 +472,7 @@ private[hive] class HiveClientImpl( throw new AnalysisException("Hive index table is not supported.") }, schema = schema, + provider = provider, partitionColumnNames = partCols.map(_.name), // If the table is written by Spark, we will put bucketing information in table properties, // and will always overwrite the bucket spec in hive metastore by the bucketing information diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 5b62e37311d88..2cef3abfa6021 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -23,19 +23,20 @@ import java.net.URI import scala.language.existentials import org.apache.hadoop.fs.Path -import org.scalatest.BeforeAndAfterEach +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} -import org.apache.spark.SparkException -import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} +import org.apache.spark.{SparkException, SparkFunSuite} +import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils} -import org.apache.spark.sql.hive.HiveExternalCatalog +import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} import org.apache.spark.sql.hive.orc.OrcFileOperator import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} -import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION +import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -1998,3 +1999,46 @@ class HiveDDLSuite } } } + +/** + * A separate set of DDL tests that uses Hive 2.1 libraries, which behave a little differently + * from the built-in ones. + */ +class HiveDDLSuite_2_1 extends SparkFunSuite with BeforeAndAfterEach with BeforeAndAfterAll { + + private val spark = { + val warehouse = Utils.createTempDir() + val metastore = Utils.createTempDir() + metastore.delete() + SparkSession.builder() + .config(SparkLauncher.SPARK_MASTER, "local") + .config(WAREHOUSE_PATH.key, warehouse.toURI().toString()) + .config(CATALOG_IMPLEMENTATION.key, "hive") + .config(HiveUtils.HIVE_METASTORE_VERSION.key, "2.1") + .config(HiveUtils.HIVE_METASTORE_JARS.key, "maven") + .config("spark.hadoop.javax.jdo.option.ConnectionURL", + s"jdbc:derby:;databaseName=${metastore.getAbsolutePath()};create=true") + // These options are needed since the defaults in Hive 2.1 cause exceptions with an + // empty metastore db. + .config("spark.hadoop.datanucleus.schema.autoCreateAll", "true") + .config("spark.hadoop.hive.metastore.schema.verification", "false") + .getOrCreate() + } + + override def afterEach: Unit = { + spark.sessionState.catalog.reset() + } + + override def afterAll(): Unit = { + spark.close() + } + + test("SPARK-21617: ALTER TABLE..ADD COLUMNS for DataSource tables") { + spark.sql("CREATE TABLE t1 (c1 int) USING json") + spark.sql("ALTER TABLE t1 ADD COLUMNS (c2 int)") + + val df = spark.table("t1") + assert(df.schema.fieldNames === Array("c1", "c2")) + } + +} From cc7cd955e262e4b45730a6a909129b0c059b15ac Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 3 Aug 2017 11:35:09 -0700 Subject: [PATCH 2/2] Rework the patch. - Use the same code to translate between Spark and Hive tables when creating or altering the table. - Fix the test so that it doesn't try to create a new SparkSession, which conflicts with TestHiveSingleton. - Use 2.1's EnvironmentContext to disable auto updating of stats for DS tables. --- .../spark/sql/hive/HiveExternalCatalog.scala | 44 +++++--------- .../sql/hive/client/HiveClientImpl.scala | 12 ++-- .../spark/sql/hive/client/HiveShim.scala | 57 +++++++++++++++---- .../sql/hive/execution/HiveDDLSuite.scala | 57 ++++++++++++------- 4 files changed, 101 insertions(+), 69 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 766841a0c3d06..964e1c175a653 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -238,9 +238,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } if (DDLUtils.isDatasourceTable(tableDefinition)) { - createDataSourceTable( - tableDefinition.withNewStorage(locationUri = tableLocation), - ignoreIfExists) + saveDataSourceTable(tableDefinition.withNewStorage(locationUri = tableLocation)) { table => + saveTableIntoHive(table, ignoreIfExists) + } } else { val tableWithDataSourceProps = tableDefinition.copy( // We can't leave `locationUri` empty and count on Hive metastore to set a default table @@ -257,7 +257,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } } - private def createDataSourceTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = { + private def saveDataSourceTable(table: CatalogTable)(saveFn: CatalogTable => Unit): Unit = { // data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`. val provider = table.provider.get @@ -363,19 +363,19 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // specific way. try { logInfo(message) - saveTableIntoHive(table, ignoreIfExists) + saveFn(table) } catch { case NonFatal(e) => val warningMessage = s"Could not persist ${table.identifier.quotedString} in a Hive " + "compatible way. Persisting it into Hive metastore in Spark SQL specific format." logWarning(warningMessage, e) - saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists) + saveFn(newSparkSQLSpecificMetastoreTable()) } case (None, message) => logWarning(message) - saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists) + saveFn(newSparkSQLSpecificMetastoreTable()) } } @@ -610,30 +610,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat override def alterTableSchema(db: String, table: String, schema: StructType): Unit = withClient { requireTableExists(db, table) - val rawTable = getRawTable(db, table) - val withNewSchema = rawTable.copy(schema = schema) - verifyColumnNames(withNewSchema) - // Add table metadata such as table schema, partition columns, etc. to table properties. - val updatedTable = withNewSchema.copy( - properties = withNewSchema.properties ++ tableMetaToTableProps(withNewSchema)) - - // If it's a data source table, make sure the original schema is left unchanged; the - // actual schema is recorded as a table property. - val tableToStore = if (DDLUtils.isDatasourceTable(updatedTable)) { - updatedTable.copy(schema = rawTable.schema) + val updatedTable = getTable(db, table).copy(schema = schema) + verifyColumnNames(updatedTable) + if (DDLUtils.isDatasourceTable(updatedTable)) { + saveDataSourceTable(updatedTable) { table => + client.alterTable(table) + } } else { - updatedTable - } - - try { - client.alterTable(tableToStore) - } catch { - case NonFatal(e) => - val warningMessage = - s"Could not alter schema of table ${rawTable.identifier.quotedString} in a Hive " + - "compatible way. Updating Hive metastore in Spark SQL specific format." - logWarning(warningMessage, e) - client.alterTable(updatedTable.copy(schema = tableToStore.partitionSchema)) + client.alterTable(updatedTable) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index c71b9f46eb3e1..871d6965a9008 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -416,9 +416,6 @@ private[hive] class HiveClientImpl( val properties = Option(h.getParameters).map(_.asScala.toMap).getOrElse(Map()) - val provider = properties.get(HiveExternalCatalog.DATASOURCE_PROVIDER) - .orElse(Some(DDLUtils.HIVE_PROVIDER)) - // Hive-generated Statistics are also recorded in ignoredProperties val ignoredProperties = scala.collection.mutable.Map.empty[String, String] for (key <- HiveStatisticsProperties; value <- properties.get(key)) { @@ -472,7 +469,6 @@ private[hive] class HiveClientImpl( throw new AnalysisException("Hive index table is not supported.") }, schema = schema, - provider = provider, partitionColumnNames = partCols.map(_.name), // If the table is written by Spark, we will put bucketing information in table properties, // and will always overwrite the bucket spec in hive metastore by the bucketing information @@ -533,7 +529,7 @@ private[hive] class HiveClientImpl( table.copy(properties = table.ignoredProperties ++ table.properties), Some(userName)) // Do not use `table.qualifiedName` here because this may be a rename val qualifiedTableName = s"${table.database}.$tableName" - shim.alterTable(client, qualifiedTableName, hiveTable) + shim.alterTable(client, qualifiedTableName, hiveTable, table.storage.locationUri.isDefined) } override def createPartitions( @@ -612,8 +608,10 @@ private[hive] class HiveClientImpl( db: String, table: String, newParts: Seq[CatalogTablePartition]): Unit = withHiveState { - val hiveTable = toHiveTable(getTable(db, table), Some(userName)) - shim.alterPartitions(client, table, newParts.map { p => toHivePartition(p, hiveTable) }.asJava) + val sparkTable = getTable(db, table) + val hiveTable = toHiveTable(sparkTable, Some(userName)) + shim.alterPartitions(client, table, newParts.map { p => toHivePartition(p, hiveTable) }.asJava, + sparkTable.storage.locationUri.isDefined) } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 449a303b59eed..d04a2f732709e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.hive.client import java.lang.{Boolean => JBoolean, Integer => JInteger, Long => JLong} import java.lang.reflect.{InvocationTargetException, Method, Modifier} import java.net.URI -import java.util.{ArrayList => JArrayList, List => JList, Locale, Map => JMap, Set => JSet} +import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, Locale, Map => JMap, Set => JSet} import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ @@ -86,9 +86,13 @@ private[client] sealed abstract class Shim { def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long - def alterTable(hive: Hive, tableName: String, table: Table): Unit + def alterTable(hive: Hive, tableName: String, table: Table, allowGatherStats: Boolean): Unit - def alterPartitions(hive: Hive, tableName: String, newParts: JList[Partition]): Unit + def alterPartitions( + hive: Hive, + tableName: String, + newParts: JList[Partition], + allowGatherStats: Boolean): Unit def createPartitions( hive: Hive, @@ -397,11 +401,19 @@ private[client] class Shim_v0_12 extends Shim with Logging { hive.dropTable(dbName, tableName, deleteData, ignoreIfNotExists) } - override def alterTable(hive: Hive, tableName: String, table: Table): Unit = { + override def alterTable( + hive: Hive, + tableName: String, + table: Table, + allowGatherStats: Boolean): Unit = { alterTableMethod.invoke(hive, tableName, table) } - override def alterPartitions(hive: Hive, tableName: String, newParts: JList[Partition]): Unit = { + override def alterPartitions( + hive: Hive, + tableName: String, + newParts: JList[Partition], + allowGatherStats: Boolean): Unit = { alterPartitionsMethod.invoke(hive, tableName, newParts) } @@ -1008,9 +1020,6 @@ private[client] class Shim_v2_1 extends Shim_v2_0 { // true if there is any following stats task protected lazy val hasFollowingStatsTask = JBoolean.FALSE - // TODO: Now, always set environmentContext to null. In the future, we should avoid setting - // hive-generated stats to -1 when altering tables by using environmentContext. See Hive-12730 - protected lazy val environmentContextInAlterTable = null private lazy val loadPartitionMethod = findMethod( @@ -1102,11 +1111,35 @@ private[client] class Shim_v2_1 extends Shim_v2_0 { hasFollowingStatsTask, AcidUtils.Operation.NOT_ACID) } - override def alterTable(hive: Hive, tableName: String, table: Table): Unit = { - alterTableMethod.invoke(hive, tableName, table, environmentContextInAlterTable) + override def alterTable( + hive: Hive, + tableName: String, + table: Table, + allowGatherStats: Boolean): Unit = { + alterTableMethod.invoke(hive, tableName, table, createEnvironmentContext(allowGatherStats)) + } + + override def alterPartitions( + hive: Hive, + tableName: String, + newParts: JList[Partition], + allowGatherStats: Boolean): Unit = { + alterPartitionsMethod.invoke(hive, tableName, newParts, + createEnvironmentContext(allowGatherStats)) } - override def alterPartitions(hive: Hive, tableName: String, newParts: JList[Partition]): Unit = { - alterPartitionsMethod.invoke(hive, tableName, newParts, environmentContextInAlterTable) + // TODO: In the future, we should avoid setting hive-generated stats to -1 when altering tables. + // See HIVE-12730. + private def createEnvironmentContext(allowGatherStats: Boolean): EnvironmentContext = { + if (!allowGatherStats) { + val properties = new JHashMap[String, String]() + properties.put("DO_NOT_UPDATE_STATS", "true") + + val ctx = new EnvironmentContext() + ctx.setProperties(properties) + ctx + } else { + null + } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 2cef3abfa6021..688fda0f0c318 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -22,10 +22,11 @@ import java.net.URI import scala.language.existentials +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} -import org.apache.spark.{SparkException, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} import org.apache.spark.launcher.SparkLauncher import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException} @@ -39,6 +40,7 @@ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ +import org.apache.spark.tags.ExtendedHiveTest import org.apache.spark.util.Utils // TODO(gatorsmile): combine HiveCatalogedDDLSuite and HiveDDLSuite @@ -2004,41 +2006,56 @@ class HiveDDLSuite * A separate set of DDL tests that uses Hive 2.1 libraries, which behave a little differently * from the built-in ones. */ -class HiveDDLSuite_2_1 extends SparkFunSuite with BeforeAndAfterEach with BeforeAndAfterAll { +@ExtendedHiveTest +class Hive_2_1_DDLSuite extends SparkFunSuite with TestHiveSingleton with BeforeAndAfterEach + with BeforeAndAfterAll { - private val spark = { + // Create a custom HiveExternalCatalog instance with the desired configuration. We cannot + // use SparkSession here since there's already an active on managed by the TestHive object. + private var catalog = { val warehouse = Utils.createTempDir() val metastore = Utils.createTempDir() metastore.delete() - SparkSession.builder() - .config(SparkLauncher.SPARK_MASTER, "local") - .config(WAREHOUSE_PATH.key, warehouse.toURI().toString()) - .config(CATALOG_IMPLEMENTATION.key, "hive") - .config(HiveUtils.HIVE_METASTORE_VERSION.key, "2.1") - .config(HiveUtils.HIVE_METASTORE_JARS.key, "maven") - .config("spark.hadoop.javax.jdo.option.ConnectionURL", - s"jdbc:derby:;databaseName=${metastore.getAbsolutePath()};create=true") - // These options are needed since the defaults in Hive 2.1 cause exceptions with an - // empty metastore db. - .config("spark.hadoop.datanucleus.schema.autoCreateAll", "true") - .config("spark.hadoop.hive.metastore.schema.verification", "false") - .getOrCreate() + val sparkConf = new SparkConf() + .set(SparkLauncher.SPARK_MASTER, "local") + .set(WAREHOUSE_PATH.key, warehouse.toURI().toString()) + .set(CATALOG_IMPLEMENTATION.key, "hive") + .set(HiveUtils.HIVE_METASTORE_VERSION.key, "2.1") + .set(HiveUtils.HIVE_METASTORE_JARS.key, "maven") + + val hadoopConf = new Configuration() + hadoopConf.set("hive.metastore.warehouse.dir", warehouse.toURI().toString()) + hadoopConf.set("javax.jdo.option.ConnectionURL", + s"jdbc:derby:;databaseName=${metastore.getAbsolutePath()};create=true") + // These options are needed since the defaults in Hive 2.1 cause exceptions with an + // empty metastore db. + hadoopConf.set("datanucleus.schema.autoCreateAll", "true") + hadoopConf.set("hive.metastore.schema.verification", "false") + + new HiveExternalCatalog(sparkConf, hadoopConf) } override def afterEach: Unit = { + catalog.listTables("default").foreach { t => + catalog.dropTable("default", t, true, false) + } spark.sessionState.catalog.reset() } override def afterAll(): Unit = { - spark.close() + catalog = null } test("SPARK-21617: ALTER TABLE..ADD COLUMNS for DataSource tables") { spark.sql("CREATE TABLE t1 (c1 int) USING json") - spark.sql("ALTER TABLE t1 ADD COLUMNS (c2 int)") + val oldTable = spark.sessionState.catalog.externalCatalog.getTable("default", "t1") + catalog.createTable(oldTable, true) + + val newSchema = StructType(oldTable.schema.fields ++ Array(StructField("c2", IntegerType))) + catalog.alterTableSchema("default", "t1", newSchema) - val df = spark.table("t1") - assert(df.schema.fieldNames === Array("c1", "c2")) + val updatedTable = catalog.getTable("default", "t1") + assert(updatedTable.schema.fieldNames === Array("c1", "c2")) } }