Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -2357,18 +2357,9 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}.getMessage
assert(e.contains("Found duplicate column(s)"))
} else {
if (isUsingHiveMetastore) {
// hive catalog will still complains that c1 is duplicate column name because hive
// identifiers are case insensitive.
val e = intercept[AnalysisException] {
sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
}.getMessage
assert(e.contains("HiveException"))
} else {
sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
assert(spark.table("t1").schema
.equals(new StructType().add("c1", IntegerType).add("C1", StringType)))
}
sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
assert(spark.table("t1").schema ==
new StructType().add("c1", IntegerType).add("C1", StringType))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
* should interpret these special data source properties and restore the original table metadata
* before returning it.
*/
private def getRawTable(db: String, table: String): CatalogTable = withClient {
private[hive] def getRawTable(db: String, table: String): CatalogTable = withClient {
client.getTable(db, table)
}

Expand Down Expand Up @@ -386,6 +386,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
* can be used as table properties later.
*/
private def tableMetaToTableProps(table: CatalogTable): mutable.Map[String, String] = {
tableMetaToTableProps(table, table.schema)
}

private def tableMetaToTableProps(
table: CatalogTable,
schema: StructType): mutable.Map[String, String] = {
val partitionColumns = table.partitionColumnNames
val bucketSpec = table.bucketSpec

Expand All @@ -397,7 +403,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// property. In this case, we split the JSON string and store each part as a separate table
// property.
val threshold = conf.get(SCHEMA_STRING_LENGTH_THRESHOLD)
val schemaJsonString = table.schema.json
val schemaJsonString = schema.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
properties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
Expand Down Expand Up @@ -615,20 +621,29 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
override def alterTableSchema(db: String, table: String, schema: StructType): Unit = withClient {
requireTableExists(db, table)
val rawTable = getRawTable(db, table)
val withNewSchema = rawTable.copy(schema = schema)
verifyColumnNames(withNewSchema)
// Add table metadata such as table schema, partition columns, etc. to table properties.
val updatedTable = withNewSchema.copy(
properties = withNewSchema.properties ++ tableMetaToTableProps(withNewSchema))
try {
client.alterTable(updatedTable)
} catch {
case NonFatal(e) =>
val warningMessage =
s"Could not alter schema of table ${rawTable.identifier.quotedString} in a Hive " +
"compatible way. Updating Hive metastore in Spark SQL specific format."
logWarning(warningMessage, e)
client.alterTable(updatedTable.copy(schema = updatedTable.partitionSchema))
val updatedProperties = rawTable.properties ++ tableMetaToTableProps(rawTable, schema)
val withNewSchema = rawTable.copy(properties = updatedProperties, schema = schema)
verifyColumnNames(withNewSchema)

if (isDatasourceTable(rawTable)) {
// For data source tables, first try to write it with the schema set; if that does not work,
// try again with updated properties and the partition schema. This is a simplified version of
// what createDataSourceTable() does, and may leave the table in a state unreadable by Hive
// (for example, the schema does not match the data source schema, or does not match the
// storage descriptor).
try {
client.alterTable(withNewSchema)
} catch {
case NonFatal(e) =>
val warningMessage =
s"Could not alter schema of table ${rawTable.identifier.quotedString} in a Hive " +
"compatible way. Updating Hive metastore in Spark SQL specific format."
logWarning(warningMessage, e)
client.alterTable(withNewSchema.copy(schema = rawTable.partitionSchema))
}
} else {
client.alterTable(withNewSchema)
}
}

Expand Down Expand Up @@ -1351,4 +1366,14 @@ object HiveExternalCatalog {
getColumnNamesByType(metadata.properties, "sort", "sorting columns"))
}
}

/**
* Detects a data source table. This checks both the table provider and the table properties,
* unlike DDLUtils which just checks the former.
*/
private[spark] def isDatasourceTable(table: CatalogTable): Boolean = {
val provider = table.provider.orElse(table.properties.get(DATASOURCE_PROVIDER))
provider.isDefined && provider != Some(DDLUtils.HIVE_PROVIDER)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.client.HiveClientImpl._
import org.apache.spark.sql.types._
import org.apache.spark.util.{CircularBuffer, Utils}
Expand Down Expand Up @@ -883,7 +884,7 @@ private[hive] object HiveClientImpl {
}
// after SPARK-19279, it is not allowed to create a hive table with an empty schema,
// so here we should not add a default col schema
Copy link
Member

@viirya viirya Aug 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment looks like needing to move accordingly?

if (schema.isEmpty && DDLUtils.isDatasourceTable(table)) {
if (schema.isEmpty && HiveExternalCatalog.isDatasourceTable(table)) {
// This is a hack to preserve existing behavior. Before Spark 2.0, we do not
// set a default serde here (this was done in Hive), and so if the user provides
// an empty schema Hive would automatically populate the schema with a single
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.execution

import scala.language.existentials

import org.apache.hadoop.conf.Configuration
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.sql.types._
import org.apache.spark.tags.ExtendedHiveTest
import org.apache.spark.util.Utils

/**
* A separate set of DDL tests that uses Hive 2.1 libraries, which behave a little differently
* from the built-in ones.
*/
@ExtendedHiveTest
class Hive_2_1_DDLSuite extends SparkFunSuite with TestHiveSingleton with BeforeAndAfterEach
with BeforeAndAfterAll {

// Create a custom HiveExternalCatalog instance with the desired configuration. We cannot
// use SparkSession here since there's already an active on managed by the TestHive object.
private var catalog = {
val warehouse = Utils.createTempDir()
val metastore = Utils.createTempDir()
metastore.delete()
val sparkConf = new SparkConf()
.set(SparkLauncher.SPARK_MASTER, "local")
.set(WAREHOUSE_PATH.key, warehouse.toURI().toString())
.set(CATALOG_IMPLEMENTATION.key, "hive")
.set(HiveUtils.HIVE_METASTORE_VERSION.key, "2.1")
.set(HiveUtils.HIVE_METASTORE_JARS.key, "maven")

val hadoopConf = new Configuration()
hadoopConf.set("hive.metastore.warehouse.dir", warehouse.toURI().toString())
hadoopConf.set("javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=${metastore.getAbsolutePath()};create=true")
// These options are needed since the defaults in Hive 2.1 cause exceptions with an
// empty metastore db.
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
hadoopConf.set("hive.metastore.schema.verification", "false")

new HiveExternalCatalog(sparkConf, hadoopConf)
}

override def afterEach: Unit = {
catalog.listTables("default").foreach { t =>
catalog.dropTable("default", t, true, false)
}
spark.sessionState.catalog.reset()
}

override def afterAll(): Unit = {
catalog = null
}

test("SPARK-21617: ALTER TABLE for non-compatible DataSource tables") {
testAlterTable(
"t1",
"CREATE TABLE t1 (c1 int) USING json",
StructType(Array(StructField("c1", IntegerType), StructField("c2", IntegerType))),
hiveCompatible = false)
}

test("SPARK-21617: ALTER TABLE for Hive-compatible DataSource tables") {
testAlterTable(
"t1",
"CREATE TABLE t1 (c1 int) USING parquet",
StructType(Array(StructField("c1", IntegerType), StructField("c2", IntegerType))))
}

test("SPARK-21617: ALTER TABLE for Hive tables") {
testAlterTable(
"t1",
"CREATE TABLE t1 (c1 int) STORED AS parquet",
StructType(Array(StructField("c1", IntegerType), StructField("c2", IntegerType))))
}

test("SPARK-21617: ALTER TABLE with incompatible schema on Hive-compatible table") {
val exception = intercept[AnalysisException] {
testAlterTable(
"t1",
"CREATE TABLE t1 (c1 string) USING parquet",
StructType(Array(StructField("c2", IntegerType))))
}
assert(exception.getMessage().contains("types incompatible with the existing columns"))
}

private def testAlterTable(
tableName: String,
createTableStmt: String,
updatedSchema: StructType,
hiveCompatible: Boolean = true): Unit = {
spark.sql(createTableStmt)
val oldTable = spark.sessionState.catalog.externalCatalog.getTable("default", tableName)
catalog.createTable(oldTable, true)
catalog.alterTableSchema("default", tableName, updatedSchema)

val updatedTable = catalog.getTable("default", tableName)
assert(updatedTable.schema.fieldNames === updatedSchema.fieldNames)
}

}