Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2356,18 +2356,9 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}.getMessage
assert(e.contains("Found duplicate column(s)"))
} else {
if (isUsingHiveMetastore) {
// hive catalog will still complains that c1 is duplicate column name because hive
// identifiers are case insensitive.
val e = intercept[AnalysisException] {
sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
}.getMessage
assert(e.contains("HiveException"))
} else {
sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
assert(spark.table("t1").schema
.equals(new StructType().add("c1", IntegerType).add("C1", StringType)))
}
sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
assert(spark.table("t1").schema
.equals(new StructType().add("c1", IntegerType).add("C1", StringType)))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}

if (DDLUtils.isDatasourceTable(tableDefinition)) {
createDataSourceTable(
tableDefinition.withNewStorage(locationUri = tableLocation),
ignoreIfExists)
saveDataSourceTable(tableDefinition.withNewStorage(locationUri = tableLocation)) { table =>
saveTableIntoHive(table, ignoreIfExists)
}
} else {
val tableWithDataSourceProps = tableDefinition.copy(
// We can't leave `locationUri` empty and count on Hive metastore to set a default table
Expand All @@ -257,7 +257,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
}

private def createDataSourceTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = {
private def saveDataSourceTable(table: CatalogTable)(saveFn: CatalogTable => Unit): Unit = {
// data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`.
val provider = table.provider.get

Expand Down Expand Up @@ -363,19 +363,19 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// specific way.
try {
logInfo(message)
saveTableIntoHive(table, ignoreIfExists)
saveFn(table)
} catch {
case NonFatal(e) =>
val warningMessage =
s"Could not persist ${table.identifier.quotedString} in a Hive " +
"compatible way. Persisting it into Hive metastore in Spark SQL specific format."
logWarning(warningMessage, e)
saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists)
saveFn(newSparkSQLSpecificMetastoreTable())
}

case (None, message) =>
logWarning(message)
saveTableIntoHive(newSparkSQLSpecificMetastoreTable(), ignoreIfExists)
saveFn(newSparkSQLSpecificMetastoreTable())
}
}

Expand Down Expand Up @@ -610,21 +610,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat

override def alterTableSchema(db: String, table: String, schema: StructType): Unit = withClient {
requireTableExists(db, table)
val rawTable = getRawTable(db, table)
val withNewSchema = rawTable.copy(schema = schema)
verifyColumnNames(withNewSchema)
// Add table metadata such as table schema, partition columns, etc. to table properties.
val updatedTable = withNewSchema.copy(
properties = withNewSchema.properties ++ tableMetaToTableProps(withNewSchema))
try {
val updatedTable = getTable(db, table).copy(schema = schema)
verifyColumnNames(updatedTable)
if (DDLUtils.isDatasourceTable(updatedTable)) {
saveDataSourceTable(updatedTable) { table =>
client.alterTable(table)
}
} else {
client.alterTable(updatedTable)
} catch {
case NonFatal(e) =>
val warningMessage =
s"Could not alter schema of table ${rawTable.identifier.quotedString} in a Hive " +
"compatible way. Updating Hive metastore in Spark SQL specific format."
logWarning(warningMessage, e)
client.alterTable(updatedTable.copy(schema = updatedTable.partitionSchema))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.client.HiveClientImpl._
import org.apache.spark.sql.types._
import org.apache.spark.util.{CircularBuffer, Utils}
Expand Down Expand Up @@ -413,7 +414,7 @@ private[hive] class HiveClientImpl(
unsupportedFeatures += "partitioned view"
}

val properties = Option(h.getParameters).map(_.asScala.toMap).orNull
val properties = Option(h.getParameters).map(_.asScala.toMap).getOrElse(Map())

// Hive-generated Statistics are also recorded in ignoredProperties
val ignoredProperties = scala.collection.mutable.Map.empty[String, String]
Expand Down Expand Up @@ -528,7 +529,7 @@ private[hive] class HiveClientImpl(
table.copy(properties = table.ignoredProperties ++ table.properties), Some(userName))
// Do not use `table.qualifiedName` here because this may be a rename
val qualifiedTableName = s"${table.database}.$tableName"
shim.alterTable(client, qualifiedTableName, hiveTable)
shim.alterTable(client, qualifiedTableName, hiveTable, table.storage.locationUri.isDefined)
}

override def createPartitions(
Expand Down Expand Up @@ -607,8 +608,10 @@ private[hive] class HiveClientImpl(
db: String,
table: String,
newParts: Seq[CatalogTablePartition]): Unit = withHiveState {
val hiveTable = toHiveTable(getTable(db, table), Some(userName))
shim.alterPartitions(client, table, newParts.map { p => toHivePartition(p, hiveTable) }.asJava)
val sparkTable = getTable(db, table)
val hiveTable = toHiveTable(sparkTable, Some(userName))
shim.alterPartitions(client, table, newParts.map { p => toHivePartition(p, hiveTable) }.asJava,
sparkTable.storage.locationUri.isDefined)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.hive.client
import java.lang.{Boolean => JBoolean, Integer => JInteger, Long => JLong}
import java.lang.reflect.{InvocationTargetException, Method, Modifier}
import java.net.URI
import java.util.{ArrayList => JArrayList, List => JList, Locale, Map => JMap, Set => JSet}
import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, Locale, Map => JMap, Set => JSet}
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -86,9 +86,13 @@ private[client] sealed abstract class Shim {

def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long

def alterTable(hive: Hive, tableName: String, table: Table): Unit
def alterTable(hive: Hive, tableName: String, table: Table, allowGatherStats: Boolean): Unit

def alterPartitions(hive: Hive, tableName: String, newParts: JList[Partition]): Unit
def alterPartitions(
hive: Hive,
tableName: String,
newParts: JList[Partition],
allowGatherStats: Boolean): Unit

def createPartitions(
hive: Hive,
Expand Down Expand Up @@ -397,11 +401,19 @@ private[client] class Shim_v0_12 extends Shim with Logging {
hive.dropTable(dbName, tableName, deleteData, ignoreIfNotExists)
}

override def alterTable(hive: Hive, tableName: String, table: Table): Unit = {
override def alterTable(
hive: Hive,
tableName: String,
table: Table,
allowGatherStats: Boolean): Unit = {
alterTableMethod.invoke(hive, tableName, table)
}

override def alterPartitions(hive: Hive, tableName: String, newParts: JList[Partition]): Unit = {
override def alterPartitions(
hive: Hive,
tableName: String,
newParts: JList[Partition],
allowGatherStats: Boolean): Unit = {
alterPartitionsMethod.invoke(hive, tableName, newParts)
}

Expand Down Expand Up @@ -1008,9 +1020,6 @@ private[client] class Shim_v2_1 extends Shim_v2_0 {

// true if there is any following stats task
protected lazy val hasFollowingStatsTask = JBoolean.FALSE
// TODO: Now, always set environmentContext to null. In the future, we should avoid setting
// hive-generated stats to -1 when altering tables by using environmentContext. See Hive-12730
protected lazy val environmentContextInAlterTable = null

private lazy val loadPartitionMethod =
findMethod(
Expand Down Expand Up @@ -1102,11 +1111,35 @@ private[client] class Shim_v2_1 extends Shim_v2_0 {
hasFollowingStatsTask, AcidUtils.Operation.NOT_ACID)
}

override def alterTable(hive: Hive, tableName: String, table: Table): Unit = {
alterTableMethod.invoke(hive, tableName, table, environmentContextInAlterTable)
override def alterTable(
hive: Hive,
tableName: String,
table: Table,
allowGatherStats: Boolean): Unit = {
alterTableMethod.invoke(hive, tableName, table, createEnvironmentContext(allowGatherStats))
}

override def alterPartitions(
hive: Hive,
tableName: String,
newParts: JList[Partition],
allowGatherStats: Boolean): Unit = {
alterPartitionsMethod.invoke(hive, tableName, newParts,
createEnvironmentContext(allowGatherStats))
}

override def alterPartitions(hive: Hive, tableName: String, newParts: JList[Partition]): Unit = {
alterPartitionsMethod.invoke(hive, tableName, newParts, environmentContextInAlterTable)
// TODO: In the future, we should avoid setting hive-generated stats to -1 when altering tables.
// See HIVE-12730.
private def createEnvironmentContext(allowGatherStats: Boolean): EnvironmentContext = {
if (!allowGatherStats) {
val properties = new JHashMap[String, String]()
properties.put("DO_NOT_UPDATE_STATS", "true")

val ctx = new EnvironmentContext()
ctx.setProperties(properties)
ctx
} else {
null
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,25 @@ import java.net.URI

import scala.language.existentials

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}

import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
import org.apache.spark.sql.hive.orc.OrcFileOperator
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
import org.apache.spark.tags.ExtendedHiveTest
import org.apache.spark.util.Utils

// TODO(gatorsmile): combine HiveCatalogedDDLSuite and HiveDDLSuite
Expand Down Expand Up @@ -1998,3 +2001,61 @@ class HiveDDLSuite
}
}
}

/**
* A separate set of DDL tests that uses Hive 2.1 libraries, which behave a little differently
* from the built-in ones.
*/
@ExtendedHiveTest
class Hive_2_1_DDLSuite extends SparkFunSuite with TestHiveSingleton with BeforeAndAfterEach
with BeforeAndAfterAll {

// Create a custom HiveExternalCatalog instance with the desired configuration. We cannot
// use SparkSession here since there's already an active on managed by the TestHive object.
private var catalog = {
val warehouse = Utils.createTempDir()
val metastore = Utils.createTempDir()
metastore.delete()
val sparkConf = new SparkConf()
.set(SparkLauncher.SPARK_MASTER, "local")
.set(WAREHOUSE_PATH.key, warehouse.toURI().toString())
.set(CATALOG_IMPLEMENTATION.key, "hive")
.set(HiveUtils.HIVE_METASTORE_VERSION.key, "2.1")
.set(HiveUtils.HIVE_METASTORE_JARS.key, "maven")

val hadoopConf = new Configuration()
hadoopConf.set("hive.metastore.warehouse.dir", warehouse.toURI().toString())
hadoopConf.set("javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=${metastore.getAbsolutePath()};create=true")
// These options are needed since the defaults in Hive 2.1 cause exceptions with an
// empty metastore db.
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
hadoopConf.set("hive.metastore.schema.verification", "false")

new HiveExternalCatalog(sparkConf, hadoopConf)
}

override def afterEach: Unit = {
catalog.listTables("default").foreach { t =>
catalog.dropTable("default", t, true, false)
}
spark.sessionState.catalog.reset()
}

override def afterAll(): Unit = {
catalog = null
}

test("SPARK-21617: ALTER TABLE..ADD COLUMNS for DataSource tables") {
spark.sql("CREATE TABLE t1 (c1 int) USING json")
val oldTable = spark.sessionState.catalog.externalCatalog.getTable("default", "t1")
catalog.createTable(oldTable, true)

val newSchema = StructType(oldTable.schema.fields ++ Array(StructField("c2", IntegerType)))
catalog.alterTableSchema("default", "t1", newSchema)

val updatedTable = catalog.getTable("default", "t1")
assert(updatedTable.schema.fieldNames === Array("c1", "c2"))
}

}