diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 9d3c55060dfb6..cabfa2e277e71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -17,24 +17,10 @@ package org.apache.spark.sql.execution.command -import scala.collection.{GenMap, GenSeq} -import scala.collection.parallel.ForkJoinTaskSupport -import scala.concurrent.forkjoin.ForkJoinPool -import scala.util.control.NonFatal - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs._ -import org.apache.hadoop.mapred.{FileInputFormat, JobConf} - import org.apache.spark.sql.{AnalysisException, Row, SparkSession} -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver} import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} -import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.types._ -import org.apache.spark.util.SerializableConfiguration // Note: The definition of these commands are based on the ones described in // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL @@ -167,599 +153,6 @@ case class DescribeDatabaseCommand( } } -/** - * Drops a table/view from the metastore and removes it if it is cached. - * - * The syntax of this command is: - * {{{ - * DROP TABLE [IF EXISTS] table_name; - * DROP VIEW [IF EXISTS] [db_name.]view_name; - * }}} - */ -case class DropTableCommand( - tableName: TableIdentifier, - ifExists: Boolean, - isView: Boolean, - purge: Boolean) extends RunnableCommand { - - override def run(sparkSession: SparkSession): Seq[Row] = { - val catalog = sparkSession.sessionState.catalog - - if (!catalog.isTemporaryTable(tableName) && catalog.tableExists(tableName)) { - // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view - // issue an exception. - catalog.getTableMetadata(tableName).tableType match { - case CatalogTableType.VIEW if !isView => - throw new AnalysisException( - "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead") - case o if o != CatalogTableType.VIEW && isView => - throw new AnalysisException( - s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead") - case _ => - } - } - try { - sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(tableName)) - } catch { - case _: NoSuchTableException if ifExists => - case NonFatal(e) => log.warn(e.toString, e) - } - catalog.refreshTable(tableName) - catalog.dropTable(tableName, ifExists, purge) - Seq.empty[Row] - } -} - -/** - * A command that sets table/view properties. - * - * The syntax of this command is: - * {{{ - * ALTER TABLE table1 SET TBLPROPERTIES ('key1' = 'val1', 'key2' = 'val2', ...); - * ALTER VIEW view1 SET TBLPROPERTIES ('key1' = 'val1', 'key2' = 'val2', ...); - * }}} - */ -case class AlterTableSetPropertiesCommand( - tableName: TableIdentifier, - properties: Map[String, String], - isView: Boolean) - extends RunnableCommand { - - override def run(sparkSession: SparkSession): Seq[Row] = { - val catalog = sparkSession.sessionState.catalog - val table = catalog.getTableMetadata(tableName) - DDLUtils.verifyAlterTableType(catalog, table, isView) - // This overrides old properties - val newTable = table.copy(properties = table.properties ++ properties) - catalog.alterTable(newTable) - Seq.empty[Row] - } - -} - -/** - * A command that unsets table/view properties. - * - * The syntax of this command is: - * {{{ - * ALTER TABLE table1 UNSET TBLPROPERTIES [IF EXISTS] ('key1', 'key2', ...); - * ALTER VIEW view1 UNSET TBLPROPERTIES [IF EXISTS] ('key1', 'key2', ...); - * }}} - */ -case class AlterTableUnsetPropertiesCommand( - tableName: TableIdentifier, - propKeys: Seq[String], - ifExists: Boolean, - isView: Boolean) - extends RunnableCommand { - - override def run(sparkSession: SparkSession): Seq[Row] = { - val catalog = sparkSession.sessionState.catalog - val table = catalog.getTableMetadata(tableName) - DDLUtils.verifyAlterTableType(catalog, table, isView) - if (!ifExists) { - propKeys.foreach { k => - if (!table.properties.contains(k)) { - throw new AnalysisException( - s"Attempted to unset non-existent property '$k' in table '${table.identifier}'") - } - } - } - val newProperties = table.properties.filter { case (k, _) => !propKeys.contains(k) } - val newTable = table.copy(properties = newProperties) - catalog.alterTable(newTable) - Seq.empty[Row] - } - -} - - -/** - * A command to change the column for a table, only support changing the comment of a non-partition - * column for now. - * - * The syntax of using this command in SQL is: - * {{{ - * ALTER TABLE table_identifier - * CHANGE [COLUMN] column_old_name column_new_name column_dataType [COMMENT column_comment] - * [FIRST | AFTER column_name]; - * }}} - */ -case class AlterTableChangeColumnCommand( - tableName: TableIdentifier, - columnName: String, - newColumn: StructField) extends RunnableCommand { - - // TODO: support change column name/dataType/metadata/position. - override def run(sparkSession: SparkSession): Seq[Row] = { - val catalog = sparkSession.sessionState.catalog - val table = catalog.getTableMetadata(tableName) - val resolver = sparkSession.sessionState.conf.resolver - DDLUtils.verifyAlterTableType(catalog, table, isView = false) - - // Find the origin column from schema by column name. - val originColumn = findColumnByName(table.schema, columnName, resolver) - // Throw an AnalysisException if the column name/dataType is changed. - if (!columnEqual(originColumn, newColumn, resolver)) { - throw new AnalysisException( - "ALTER TABLE CHANGE COLUMN is not supported for changing column " + - s"'${originColumn.name}' with type '${originColumn.dataType}' to " + - s"'${newColumn.name}' with type '${newColumn.dataType}'") - } - - val newSchema = table.schema.fields.map { field => - if (field.name == originColumn.name) { - // Create a new column from the origin column with the new comment. - addComment(field, newColumn.getComment) - } else { - field - } - } - val newTable = table.copy(schema = StructType(newSchema)) - catalog.alterTable(newTable) - - Seq.empty[Row] - } - - // Find the origin column from schema by column name, throw an AnalysisException if the column - // reference is invalid. - private def findColumnByName( - schema: StructType, name: String, resolver: Resolver): StructField = { - schema.fields.collectFirst { - case field if resolver(field.name, name) => field - }.getOrElse(throw new AnalysisException( - s"Invalid column reference '$name', table schema is '${schema}'")) - } - - // Add the comment to a column, if comment is empty, return the original column. - private def addComment(column: StructField, comment: Option[String]): StructField = { - comment.map(column.withComment(_)).getOrElse(column) - } - - // Compare a [[StructField]] to another, return true if they have the same column - // name(by resolver) and dataType. - private def columnEqual( - field: StructField, other: StructField, resolver: Resolver): Boolean = { - resolver(field.name, other.name) && field.dataType == other.dataType - } -} - -/** - * A command that sets the serde class and/or serde properties of a table/view. - * - * The syntax of this command is: - * {{{ - * ALTER TABLE table [PARTITION spec] SET SERDE serde_name [WITH SERDEPROPERTIES props]; - * ALTER TABLE table [PARTITION spec] SET SERDEPROPERTIES serde_properties; - * }}} - */ -case class AlterTableSerDePropertiesCommand( - tableName: TableIdentifier, - serdeClassName: Option[String], - serdeProperties: Option[Map[String, String]], - partSpec: Option[TablePartitionSpec]) - extends RunnableCommand { - - // should never happen if we parsed things correctly - require(serdeClassName.isDefined || serdeProperties.isDefined, - "ALTER TABLE attempted to set neither serde class name nor serde properties") - - override def run(sparkSession: SparkSession): Seq[Row] = { - val catalog = sparkSession.sessionState.catalog - val table = catalog.getTableMetadata(tableName) - DDLUtils.verifyAlterTableType(catalog, table, isView = false) - // For datasource tables, disallow setting serde or specifying partition - if (partSpec.isDefined && DDLUtils.isDatasourceTable(table)) { - throw new AnalysisException("Operation not allowed: ALTER TABLE SET " + - "[SERDE | SERDEPROPERTIES] for a specific partition is not supported " + - "for tables created with the datasource API") - } - if (serdeClassName.isDefined && DDLUtils.isDatasourceTable(table)) { - throw new AnalysisException("Operation not allowed: ALTER TABLE SET SERDE is " + - "not supported for tables created with the datasource API") - } - if (partSpec.isEmpty) { - val newTable = table.withNewStorage( - serde = serdeClassName.orElse(table.storage.serde), - properties = table.storage.properties ++ serdeProperties.getOrElse(Map())) - catalog.alterTable(newTable) - } else { - val spec = partSpec.get - val part = catalog.getPartition(table.identifier, spec) - val newPart = part.copy(storage = part.storage.copy( - serde = serdeClassName.orElse(part.storage.serde), - properties = part.storage.properties ++ serdeProperties.getOrElse(Map()))) - catalog.alterPartitions(table.identifier, Seq(newPart)) - } - Seq.empty[Row] - } - -} - -/** - * Add Partition in ALTER TABLE: add the table partitions. - * - * An error message will be issued if the partition exists, unless 'ifNotExists' is true. - * - * The syntax of this command is: - * {{{ - * ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec1 [LOCATION 'loc1'] - * PARTITION spec2 [LOCATION 'loc2'] - * }}} - */ -case class AlterTableAddPartitionCommand( - tableName: TableIdentifier, - partitionSpecsAndLocs: Seq[(TablePartitionSpec, Option[String])], - ifNotExists: Boolean) - extends RunnableCommand { - - override def run(sparkSession: SparkSession): Seq[Row] = { - val catalog = sparkSession.sessionState.catalog - val table = catalog.getTableMetadata(tableName) - DDLUtils.verifyAlterTableType(catalog, table, isView = false) - DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE ADD PARTITION") - val parts = partitionSpecsAndLocs.map { case (spec, location) => - val normalizedSpec = PartitioningUtils.normalizePartitionSpec( - spec, - table.partitionColumnNames, - table.identifier.quotedString, - sparkSession.sessionState.conf.resolver) - // inherit table storage format (possibly except for location) - CatalogTablePartition(normalizedSpec, table.storage.copy( - locationUri = location.map(CatalogUtils.stringToURI(_)))) - } - catalog.createPartitions(table.identifier, parts, ignoreIfExists = ifNotExists) - Seq.empty[Row] - } - -} - -/** - * Alter a table partition's spec. - * - * The syntax of this command is: - * {{{ - * ALTER TABLE table PARTITION spec1 RENAME TO PARTITION spec2; - * }}} - */ -case class AlterTableRenamePartitionCommand( - tableName: TableIdentifier, - oldPartition: TablePartitionSpec, - newPartition: TablePartitionSpec) - extends RunnableCommand { - - override def run(sparkSession: SparkSession): Seq[Row] = { - val catalog = sparkSession.sessionState.catalog - val table = catalog.getTableMetadata(tableName) - DDLUtils.verifyAlterTableType(catalog, table, isView = false) - DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE RENAME PARTITION") - - val normalizedOldPartition = PartitioningUtils.normalizePartitionSpec( - oldPartition, - table.partitionColumnNames, - table.identifier.quotedString, - sparkSession.sessionState.conf.resolver) - - val normalizedNewPartition = PartitioningUtils.normalizePartitionSpec( - newPartition, - table.partitionColumnNames, - table.identifier.quotedString, - sparkSession.sessionState.conf.resolver) - - catalog.renamePartitions( - tableName, Seq(normalizedOldPartition), Seq(normalizedNewPartition)) - Seq.empty[Row] - } - -} - -/** - * Drop Partition in ALTER TABLE: to drop a particular partition for a table. - * - * This removes the data and metadata for this partition. - * The data is actually moved to the .Trash/Current directory if Trash is configured, - * unless 'purge' is true, but the metadata is completely lost. - * An error message will be issued if the partition does not exist, unless 'ifExists' is true. - * Note: purge is always false when the target is a view. - * - * The syntax of this command is: - * {{{ - * ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE]; - * }}} - */ -case class AlterTableDropPartitionCommand( - tableName: TableIdentifier, - specs: Seq[TablePartitionSpec], - ifExists: Boolean, - purge: Boolean, - retainData: Boolean) - extends RunnableCommand { - - override def run(sparkSession: SparkSession): Seq[Row] = { - val catalog = sparkSession.sessionState.catalog - val table = catalog.getTableMetadata(tableName) - DDLUtils.verifyAlterTableType(catalog, table, isView = false) - DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") - - val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( - spec, - table.partitionColumnNames, - table.identifier.quotedString, - sparkSession.sessionState.conf.resolver) - } - - catalog.dropPartitions( - table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge, - retainData = retainData) - Seq.empty[Row] - } - -} - - -case class PartitionStatistics(numFiles: Int, totalSize: Long) - -/** - * Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and - * update the catalog. - * - * The syntax of this command is: - * {{{ - * ALTER TABLE table RECOVER PARTITIONS; - * MSCK REPAIR TABLE table; - * }}} - */ -case class AlterTableRecoverPartitionsCommand( - tableName: TableIdentifier, - cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand { - - // These are list of statistics that can be collected quickly without requiring a scan of the data - // see https://github.com/apache/hive/blob/master/ - // common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java - val NUM_FILES = "numFiles" - val TOTAL_SIZE = "totalSize" - val DDL_TIME = "transient_lastDdlTime" - - private def getPathFilter(hadoopConf: Configuration): PathFilter = { - // Dummy jobconf to get to the pathFilter defined in configuration - // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow) - val jobConf = new JobConf(hadoopConf, this.getClass) - val pathFilter = FileInputFormat.getInputPathFilter(jobConf) - new PathFilter { - override def accept(path: Path): Boolean = { - val name = path.getName - if (name != "_SUCCESS" && name != "_temporary" && !name.startsWith(".")) { - pathFilter == null || pathFilter.accept(path) - } else { - false - } - } - } - } - - override def run(spark: SparkSession): Seq[Row] = { - val catalog = spark.sessionState.catalog - val table = catalog.getTableMetadata(tableName) - val tableIdentWithDB = table.identifier.quotedString - DDLUtils.verifyAlterTableType(catalog, table, isView = false) - if (table.partitionColumnNames.isEmpty) { - throw new AnalysisException( - s"Operation not allowed: $cmd only works on partitioned tables: $tableIdentWithDB") - } - - if (table.storage.locationUri.isEmpty) { - throw new AnalysisException(s"Operation not allowed: $cmd only works on table with " + - s"location provided: $tableIdentWithDB") - } - - val root = new Path(table.location) - logInfo(s"Recover all the partitions in $root") - val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration) - - val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt - val hadoopConf = spark.sparkContext.hadoopConfiguration - val pathFilter = getPathFilter(hadoopConf) - val partitionSpecsAndLocs = scanPartitions(spark, fs, pathFilter, root, Map(), - table.partitionColumnNames, threshold, spark.sessionState.conf.resolver) - val total = partitionSpecsAndLocs.length - logInfo(s"Found $total partitions in $root") - - val partitionStats = if (spark.sqlContext.conf.gatherFastStats) { - gatherPartitionStats(spark, partitionSpecsAndLocs, fs, pathFilter, threshold) - } else { - GenMap.empty[String, PartitionStatistics] - } - logInfo(s"Finished to gather the fast stats for all $total partitions.") - - addPartitions(spark, table, partitionSpecsAndLocs, partitionStats) - // Updates the table to indicate that its partition metadata is stored in the Hive metastore. - // This is always the case for Hive format tables, but is not true for Datasource tables created - // before Spark 2.1 unless they are converted via `msck repair table`. - spark.sessionState.catalog.alterTable(table.copy(tracksPartitionsInCatalog = true)) - catalog.refreshTable(tableName) - logInfo(s"Recovered all partitions ($total).") - Seq.empty[Row] - } - - @transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8)) - - private def scanPartitions( - spark: SparkSession, - fs: FileSystem, - filter: PathFilter, - path: Path, - spec: TablePartitionSpec, - partitionNames: Seq[String], - threshold: Int, - resolver: Resolver): GenSeq[(TablePartitionSpec, Path)] = { - if (partitionNames.isEmpty) { - return Seq(spec -> path) - } - - val statuses = fs.listStatus(path, filter) - val statusPar: GenSeq[FileStatus] = - if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) { - // parallelize the list of partitions here, then we can have better parallelism later. - val parArray = statuses.par - parArray.tasksupport = evalTaskSupport - parArray - } else { - statuses - } - statusPar.flatMap { st => - val name = st.getPath.getName - if (st.isDirectory && name.contains("=")) { - val ps = name.split("=", 2) - val columnName = ExternalCatalogUtils.unescapePathName(ps(0)) - // TODO: Validate the value - val value = ExternalCatalogUtils.unescapePathName(ps(1)) - if (resolver(columnName, partitionNames.head)) { - scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value), - partitionNames.drop(1), threshold, resolver) - } else { - logWarning( - s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring it") - Seq() - } - } else { - logWarning(s"ignore ${new Path(path, name)}") - Seq() - } - } - } - - private def gatherPartitionStats( - spark: SparkSession, - partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)], - fs: FileSystem, - pathFilter: PathFilter, - threshold: Int): GenMap[String, PartitionStatistics] = { - if (partitionSpecsAndLocs.length > threshold) { - val hadoopConf = spark.sparkContext.hadoopConfiguration - val serializableConfiguration = new SerializableConfiguration(hadoopConf) - val serializedPaths = partitionSpecsAndLocs.map(_._2.toString).toArray - - // Set the number of parallelism to prevent following file listing from generating many tasks - // in case of large #defaultParallelism. - val numParallelism = Math.min(serializedPaths.length, - Math.min(spark.sparkContext.defaultParallelism, 10000)) - // gather the fast stats for all the partitions otherwise Hive metastore will list all the - // files for all the new partitions in sequential way, which is super slow. - logInfo(s"Gather the fast stats in parallel using $numParallelism tasks.") - spark.sparkContext.parallelize(serializedPaths, numParallelism) - .mapPartitions { paths => - val pathFilter = getPathFilter(serializableConfiguration.value) - paths.map(new Path(_)).map{ path => - val fs = path.getFileSystem(serializableConfiguration.value) - val statuses = fs.listStatus(path, pathFilter) - (path.toString, PartitionStatistics(statuses.length, statuses.map(_.getLen).sum)) - } - }.collectAsMap() - } else { - partitionSpecsAndLocs.map { case (_, location) => - val statuses = fs.listStatus(location, pathFilter) - (location.toString, PartitionStatistics(statuses.length, statuses.map(_.getLen).sum)) - }.toMap - } - } - - private def addPartitions( - spark: SparkSession, - table: CatalogTable, - partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)], - partitionStats: GenMap[String, PartitionStatistics]): Unit = { - val total = partitionSpecsAndLocs.length - var done = 0L - // Hive metastore may not have enough memory to handle millions of partitions in single RPC, - // we should split them into smaller batches. Since Hive client is not thread safe, we cannot - // do this in parallel. - val batchSize = 100 - partitionSpecsAndLocs.toIterator.grouped(batchSize).foreach { batch => - val now = System.currentTimeMillis() / 1000 - val parts = batch.map { case (spec, location) => - val params = partitionStats.get(location.toString).map { - case PartitionStatistics(numFiles, totalSize) => - // This two fast stat could prevent Hive metastore to list the files again. - Map(NUM_FILES -> numFiles.toString, - TOTAL_SIZE -> totalSize.toString, - // Workaround a bug in HiveMetastore that try to mutate a read-only parameters. - // see metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java - DDL_TIME -> now.toString) - }.getOrElse(Map.empty) - // inherit table storage format (possibly except for location) - CatalogTablePartition( - spec, - table.storage.copy(locationUri = Some(location.toUri)), - params) - } - spark.sessionState.catalog.createPartitions(tableName, parts, ignoreIfExists = true) - done += parts.length - logDebug(s"Recovered ${parts.length} partitions ($done/$total so far)") - } - } -} - - -/** - * A command that sets the location of a table or a partition. - * - * For normal tables, this just sets the location URI in the table/partition's storage format. - * For datasource tables, this sets a "path" parameter in the table/partition's serde properties. - * - * The syntax of this command is: - * {{{ - * ALTER TABLE table_name [PARTITION partition_spec] SET LOCATION "loc"; - * }}} - */ -case class AlterTableSetLocationCommand( - tableName: TableIdentifier, - partitionSpec: Option[TablePartitionSpec], - location: String) - extends RunnableCommand { - - override def run(sparkSession: SparkSession): Seq[Row] = { - val catalog = sparkSession.sessionState.catalog - val table = catalog.getTableMetadata(tableName) - val locUri = CatalogUtils.stringToURI(location) - DDLUtils.verifyAlterTableType(catalog, table, isView = false) - partitionSpec match { - case Some(spec) => - DDLUtils.verifyPartitionProviderIsHive( - sparkSession, table, "ALTER TABLE ... SET LOCATION") - // Partition spec is specified, so we set the location only for this partition - val part = catalog.getPartition(table.identifier, spec) - val newPart = part.copy(storage = part.storage.copy(locationUri = Some(locUri))) - catalog.alterPartitions(table.identifier, Seq(newPart)) - case None => - // No partition spec is specified, so we set the location for the table itself - catalog.alterTable(table.withNewStorage(locationUri = Some(locUri))) - } - Seq.empty[Row] - } -} - - object DDLUtils { val HIVE_PROVIDER = "hive" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 86394ff23e379..51609207f469f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -22,16 +22,20 @@ import java.net.URI import java.nio.file.FileSystems import java.util.Date +import scala.collection.{GenMap, GenSeq} import scala.collection.mutable.ArrayBuffer +import scala.collection.parallel.ForkJoinTaskSupport +import scala.concurrent.forkjoin.ForkJoinPool import scala.util.control.NonFatal import scala.util.Try -import org.apache.commons.lang3.StringEscapeUtils -import org.apache.hadoop.fs.Path +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} +import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException +import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTableType._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec @@ -39,7 +43,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.types._ -import org.apache.spark.util.Utils +import org.apache.spark.util.{SerializableConfiguration, Utils} /** * A command to create a table with the same definition of the given existing table. @@ -91,9 +95,6 @@ case class CreateTableLikeCommand( } } - -// TODO: move the rest of the table commands from ddl.scala to this file - /** * A command to create a table. * @@ -998,3 +999,596 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman builder.toString() } } + + +/** + * Drops a table/view from the metastore and removes it if it is cached. + * + * The syntax of this command is: + * {{{ + * DROP TABLE [IF EXISTS] table_name; + * DROP VIEW [IF EXISTS] [db_name.]view_name; + * }}} + */ +case class DropTableCommand( + tableName: TableIdentifier, + ifExists: Boolean, + isView: Boolean, + purge: Boolean) extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val catalog = sparkSession.sessionState.catalog + + if (!catalog.isTemporaryTable(tableName) && catalog.tableExists(tableName)) { + // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view + // issue an exception. + catalog.getTableMetadata(tableName).tableType match { + case CatalogTableType.VIEW if !isView => + throw new AnalysisException( + "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead") + case o if o != CatalogTableType.VIEW && isView => + throw new AnalysisException( + s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead") + case _ => + } + } + try { + sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(tableName)) + } catch { + case _: NoSuchTableException if ifExists => + case NonFatal(e) => log.warn(e.toString, e) + } + catalog.refreshTable(tableName) + catalog.dropTable(tableName, ifExists, purge) + Seq.empty[Row] + } +} + +/** + * A command that sets table/view properties. + * + * The syntax of this command is: + * {{{ + * ALTER TABLE table1 SET TBLPROPERTIES ('key1' = 'val1', 'key2' = 'val2', ...); + * ALTER VIEW view1 SET TBLPROPERTIES ('key1' = 'val1', 'key2' = 'val2', ...); + * }}} + */ +case class AlterTableSetPropertiesCommand( + tableName: TableIdentifier, + properties: Map[String, String], + isView: Boolean) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val catalog = sparkSession.sessionState.catalog + val table = catalog.getTableMetadata(tableName) + DDLUtils.verifyAlterTableType(catalog, table, isView) + // This overrides old properties + val newTable = table.copy(properties = table.properties ++ properties) + catalog.alterTable(newTable) + Seq.empty[Row] + } + +} + +/** + * A command that unsets table/view properties. + * + * The syntax of this command is: + * {{{ + * ALTER TABLE table1 UNSET TBLPROPERTIES [IF EXISTS] ('key1', 'key2', ...); + * ALTER VIEW view1 UNSET TBLPROPERTIES [IF EXISTS] ('key1', 'key2', ...); + * }}} + */ +case class AlterTableUnsetPropertiesCommand( + tableName: TableIdentifier, + propKeys: Seq[String], + ifExists: Boolean, + isView: Boolean) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val catalog = sparkSession.sessionState.catalog + val table = catalog.getTableMetadata(tableName) + DDLUtils.verifyAlterTableType(catalog, table, isView) + if (!ifExists) { + propKeys.foreach { k => + if (!table.properties.contains(k)) { + throw new AnalysisException( + s"Attempted to unset non-existent property '$k' in table '${table.identifier}'") + } + } + } + val newProperties = table.properties.filter { case (k, _) => !propKeys.contains(k) } + val newTable = table.copy(properties = newProperties) + catalog.alterTable(newTable) + Seq.empty[Row] + } + +} + + +/** + * A command to change the column for a table, only support changing the comment of a non-partition + * column for now. + * + * The syntax of using this command in SQL is: + * {{{ + * ALTER TABLE table_identifier + * CHANGE [COLUMN] column_old_name column_new_name column_dataType [COMMENT column_comment] + * [FIRST | AFTER column_name]; + * }}} + */ +case class AlterTableChangeColumnCommand( + tableName: TableIdentifier, + columnName: String, + newColumn: StructField) extends RunnableCommand { + + // TODO: support change column name/dataType/metadata/position. + override def run(sparkSession: SparkSession): Seq[Row] = { + val catalog = sparkSession.sessionState.catalog + val table = catalog.getTableMetadata(tableName) + val resolver = sparkSession.sessionState.conf.resolver + DDLUtils.verifyAlterTableType(catalog, table, isView = false) + + // Find the origin column from schema by column name. + val originColumn = findColumnByName(table.schema, columnName, resolver) + // Throw an AnalysisException if the column name/dataType is changed. + if (!columnEqual(originColumn, newColumn, resolver)) { + throw new AnalysisException( + "ALTER TABLE CHANGE COLUMN is not supported for changing column " + + s"'${originColumn.name}' with type '${originColumn.dataType}' to " + + s"'${newColumn.name}' with type '${newColumn.dataType}'") + } + + val newSchema = table.schema.fields.map { field => + if (field.name == originColumn.name) { + // Create a new column from the origin column with the new comment. + addComment(field, newColumn.getComment) + } else { + field + } + } + val newTable = table.copy(schema = StructType(newSchema)) + catalog.alterTable(newTable) + + Seq.empty[Row] + } + + // Find the origin column from schema by column name, throw an AnalysisException if the column + // reference is invalid. + private def findColumnByName( + schema: StructType, name: String, resolver: Resolver): StructField = { + schema.fields.collectFirst { + case field if resolver(field.name, name) => field + }.getOrElse(throw new AnalysisException( + s"Invalid column reference '$name', table schema is '${schema}'")) + } + + // Add the comment to a column, if comment is empty, return the original column. + private def addComment(column: StructField, comment: Option[String]): StructField = { + comment.map(column.withComment(_)).getOrElse(column) + } + + // Compare a [[StructField]] to another, return true if they have the same column + // name(by resolver) and dataType. + private def columnEqual( + field: StructField, other: StructField, resolver: Resolver): Boolean = { + resolver(field.name, other.name) && field.dataType == other.dataType + } +} + +/** + * A command that sets the serde class and/or serde properties of a table/view. + * + * The syntax of this command is: + * {{{ + * ALTER TABLE table [PARTITION spec] SET SERDE serde_name [WITH SERDEPROPERTIES props]; + * ALTER TABLE table [PARTITION spec] SET SERDEPROPERTIES serde_properties; + * }}} + */ +case class AlterTableSerDePropertiesCommand( + tableName: TableIdentifier, + serdeClassName: Option[String], + serdeProperties: Option[Map[String, String]], + partSpec: Option[TablePartitionSpec]) + extends RunnableCommand { + + // should never happen if we parsed things correctly + require(serdeClassName.isDefined || serdeProperties.isDefined, + "ALTER TABLE attempted to set neither serde class name nor serde properties") + + override def run(sparkSession: SparkSession): Seq[Row] = { + val catalog = sparkSession.sessionState.catalog + val table = catalog.getTableMetadata(tableName) + DDLUtils.verifyAlterTableType(catalog, table, isView = false) + // For datasource tables, disallow setting serde or specifying partition + if (partSpec.isDefined && DDLUtils.isDatasourceTable(table)) { + throw new AnalysisException("Operation not allowed: ALTER TABLE SET " + + "[SERDE | SERDEPROPERTIES] for a specific partition is not supported " + + "for tables created with the datasource API") + } + if (serdeClassName.isDefined && DDLUtils.isDatasourceTable(table)) { + throw new AnalysisException("Operation not allowed: ALTER TABLE SET SERDE is " + + "not supported for tables created with the datasource API") + } + if (partSpec.isEmpty) { + val newTable = table.withNewStorage( + serde = serdeClassName.orElse(table.storage.serde), + properties = table.storage.properties ++ serdeProperties.getOrElse(Map())) + catalog.alterTable(newTable) + } else { + val spec = partSpec.get + val part = catalog.getPartition(table.identifier, spec) + val newPart = part.copy(storage = part.storage.copy( + serde = serdeClassName.orElse(part.storage.serde), + properties = part.storage.properties ++ serdeProperties.getOrElse(Map()))) + catalog.alterPartitions(table.identifier, Seq(newPart)) + } + Seq.empty[Row] + } + +} + +/** + * Add Partition in ALTER TABLE: add the table partitions. + * + * An error message will be issued if the partition exists, unless 'ifNotExists' is true. + * + * The syntax of this command is: + * {{{ + * ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec1 [LOCATION 'loc1'] + * PARTITION spec2 [LOCATION 'loc2'] + * }}} + */ +case class AlterTableAddPartitionCommand( + tableName: TableIdentifier, + partitionSpecsAndLocs: Seq[(TablePartitionSpec, Option[String])], + ifNotExists: Boolean) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val catalog = sparkSession.sessionState.catalog + val table = catalog.getTableMetadata(tableName) + DDLUtils.verifyAlterTableType(catalog, table, isView = false) + DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE ADD PARTITION") + val parts = partitionSpecsAndLocs.map { case (spec, location) => + val normalizedSpec = PartitioningUtils.normalizePartitionSpec( + spec, + table.partitionColumnNames, + table.identifier.quotedString, + sparkSession.sessionState.conf.resolver) + // inherit table storage format (possibly except for location) + CatalogTablePartition(normalizedSpec, table.storage.copy( + locationUri = location.map(CatalogUtils.stringToURI(_)))) + } + catalog.createPartitions(table.identifier, parts, ignoreIfExists = ifNotExists) + Seq.empty[Row] + } + +} + +/** + * Alter a table partition's spec. + * + * The syntax of this command is: + * {{{ + * ALTER TABLE table PARTITION spec1 RENAME TO PARTITION spec2; + * }}} + */ +case class AlterTableRenamePartitionCommand( + tableName: TableIdentifier, + oldPartition: TablePartitionSpec, + newPartition: TablePartitionSpec) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val catalog = sparkSession.sessionState.catalog + val table = catalog.getTableMetadata(tableName) + DDLUtils.verifyAlterTableType(catalog, table, isView = false) + DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE RENAME PARTITION") + + val normalizedOldPartition = PartitioningUtils.normalizePartitionSpec( + oldPartition, + table.partitionColumnNames, + table.identifier.quotedString, + sparkSession.sessionState.conf.resolver) + + val normalizedNewPartition = PartitioningUtils.normalizePartitionSpec( + newPartition, + table.partitionColumnNames, + table.identifier.quotedString, + sparkSession.sessionState.conf.resolver) + + catalog.renamePartitions( + tableName, Seq(normalizedOldPartition), Seq(normalizedNewPartition)) + Seq.empty[Row] + } + +} + +/** + * Drop Partition in ALTER TABLE: to drop a particular partition for a table. + * + * This removes the data and metadata for this partition. + * The data is actually moved to the .Trash/Current directory if Trash is configured, + * unless 'purge' is true, but the metadata is completely lost. + * An error message will be issued if the partition does not exist, unless 'ifExists' is true. + * Note: purge is always false when the target is a view. + * + * The syntax of this command is: + * {{{ + * ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE]; + * }}} + */ +case class AlterTableDropPartitionCommand( + tableName: TableIdentifier, + specs: Seq[TablePartitionSpec], + ifExists: Boolean, + purge: Boolean, + retainData: Boolean) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val catalog = sparkSession.sessionState.catalog + val table = catalog.getTableMetadata(tableName) + DDLUtils.verifyAlterTableType(catalog, table, isView = false) + DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") + + val normalizedSpecs = specs.map { spec => + PartitioningUtils.normalizePartitionSpec( + spec, + table.partitionColumnNames, + table.identifier.quotedString, + sparkSession.sessionState.conf.resolver) + } + + catalog.dropPartitions( + table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge, + retainData = retainData) + Seq.empty[Row] + } + +} + + +case class PartitionStatistics(numFiles: Int, totalSize: Long) + +/** + * Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and + * update the catalog. + * + * The syntax of this command is: + * {{{ + * ALTER TABLE table RECOVER PARTITIONS; + * MSCK REPAIR TABLE table; + * }}} + */ +case class AlterTableRecoverPartitionsCommand( + tableName: TableIdentifier, + cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand { + + // These are list of statistics that can be collected quickly without requiring a scan of the data + // see https://github.com/apache/hive/blob/master/ + // common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java + val NUM_FILES = "numFiles" + val TOTAL_SIZE = "totalSize" + val DDL_TIME = "transient_lastDdlTime" + + private def getPathFilter(hadoopConf: Configuration): PathFilter = { + // Dummy jobconf to get to the pathFilter defined in configuration + // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow) + val jobConf = new JobConf(hadoopConf, this.getClass) + val pathFilter = FileInputFormat.getInputPathFilter(jobConf) + new PathFilter { + override def accept(path: Path): Boolean = { + val name = path.getName + if (name != "_SUCCESS" && name != "_temporary" && !name.startsWith(".")) { + pathFilter == null || pathFilter.accept(path) + } else { + false + } + } + } + } + + override def run(spark: SparkSession): Seq[Row] = { + val catalog = spark.sessionState.catalog + val table = catalog.getTableMetadata(tableName) + val tableIdentWithDB = table.identifier.quotedString + DDLUtils.verifyAlterTableType(catalog, table, isView = false) + if (table.partitionColumnNames.isEmpty) { + throw new AnalysisException( + s"Operation not allowed: $cmd only works on partitioned tables: $tableIdentWithDB") + } + + if (table.storage.locationUri.isEmpty) { + throw new AnalysisException(s"Operation not allowed: $cmd only works on table with " + + s"location provided: $tableIdentWithDB") + } + + val root = new Path(table.location) + logInfo(s"Recover all the partitions in $root") + val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration) + + val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt + val hadoopConf = spark.sparkContext.hadoopConfiguration + val pathFilter = getPathFilter(hadoopConf) + val partitionSpecsAndLocs = scanPartitions(spark, fs, pathFilter, root, Map(), + table.partitionColumnNames, threshold, spark.sessionState.conf.resolver) + val total = partitionSpecsAndLocs.length + logInfo(s"Found $total partitions in $root") + + val partitionStats = if (spark.sqlContext.conf.gatherFastStats) { + gatherPartitionStats(spark, partitionSpecsAndLocs, fs, pathFilter, threshold) + } else { + GenMap.empty[String, PartitionStatistics] + } + logInfo(s"Finished to gather the fast stats for all $total partitions.") + + addPartitions(spark, table, partitionSpecsAndLocs, partitionStats) + // Updates the table to indicate that its partition metadata is stored in the Hive metastore. + // This is always the case for Hive format tables, but is not true for Datasource tables created + // before Spark 2.1 unless they are converted via `msck repair table`. + spark.sessionState.catalog.alterTable(table.copy(tracksPartitionsInCatalog = true)) + catalog.refreshTable(tableName) + logInfo(s"Recovered all partitions ($total).") + Seq.empty[Row] + } + + @transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8)) + + private def scanPartitions( + spark: SparkSession, + fs: FileSystem, + filter: PathFilter, + path: Path, + spec: TablePartitionSpec, + partitionNames: Seq[String], + threshold: Int, + resolver: Resolver): GenSeq[(TablePartitionSpec, Path)] = { + if (partitionNames.isEmpty) { + return Seq(spec -> path) + } + + val statuses = fs.listStatus(path, filter) + val statusPar: GenSeq[FileStatus] = + if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) { + // parallelize the list of partitions here, then we can have better parallelism later. + val parArray = statuses.par + parArray.tasksupport = evalTaskSupport + parArray + } else { + statuses + } + statusPar.flatMap { st => + val name = st.getPath.getName + if (st.isDirectory && name.contains("=")) { + val ps = name.split("=", 2) + val columnName = ExternalCatalogUtils.unescapePathName(ps(0)) + // TODO: Validate the value + val value = ExternalCatalogUtils.unescapePathName(ps(1)) + if (resolver(columnName, partitionNames.head)) { + scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value), + partitionNames.drop(1), threshold, resolver) + } else { + logWarning( + s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring it") + Seq() + } + } else { + logWarning(s"ignore ${new Path(path, name)}") + Seq() + } + } + } + + private def gatherPartitionStats( + spark: SparkSession, + partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)], + fs: FileSystem, + pathFilter: PathFilter, + threshold: Int): GenMap[String, PartitionStatistics] = { + if (partitionSpecsAndLocs.length > threshold) { + val hadoopConf = spark.sparkContext.hadoopConfiguration + val serializableConfiguration = new SerializableConfiguration(hadoopConf) + val serializedPaths = partitionSpecsAndLocs.map(_._2.toString).toArray + + // Set the number of parallelism to prevent following file listing from generating many tasks + // in case of large #defaultParallelism. + val numParallelism = Math.min(serializedPaths.length, + Math.min(spark.sparkContext.defaultParallelism, 10000)) + // gather the fast stats for all the partitions otherwise Hive metastore will list all the + // files for all the new partitions in sequential way, which is super slow. + logInfo(s"Gather the fast stats in parallel using $numParallelism tasks.") + spark.sparkContext.parallelize(serializedPaths, numParallelism) + .mapPartitions { paths => + val pathFilter = getPathFilter(serializableConfiguration.value) + paths.map(new Path(_)).map{ path => + val fs = path.getFileSystem(serializableConfiguration.value) + val statuses = fs.listStatus(path, pathFilter) + (path.toString, PartitionStatistics(statuses.length, statuses.map(_.getLen).sum)) + } + }.collectAsMap() + } else { + partitionSpecsAndLocs.map { case (_, location) => + val statuses = fs.listStatus(location, pathFilter) + (location.toString, PartitionStatistics(statuses.length, statuses.map(_.getLen).sum)) + }.toMap + } + } + + private def addPartitions( + spark: SparkSession, + table: CatalogTable, + partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)], + partitionStats: GenMap[String, PartitionStatistics]): Unit = { + val total = partitionSpecsAndLocs.length + var done = 0L + // Hive metastore may not have enough memory to handle millions of partitions in single RPC, + // we should split them into smaller batches. Since Hive client is not thread safe, we cannot + // do this in parallel. + val batchSize = 100 + partitionSpecsAndLocs.toIterator.grouped(batchSize).foreach { batch => + val now = System.currentTimeMillis() / 1000 + val parts = batch.map { case (spec, location) => + val params = partitionStats.get(location.toString).map { + case PartitionStatistics(numFiles, totalSize) => + // This two fast stat could prevent Hive metastore to list the files again. + Map(NUM_FILES -> numFiles.toString, + TOTAL_SIZE -> totalSize.toString, + // Workaround a bug in HiveMetastore that try to mutate a read-only parameters. + // see metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java + DDL_TIME -> now.toString) + }.getOrElse(Map.empty) + // inherit table storage format (possibly except for location) + CatalogTablePartition( + spec, + table.storage.copy(locationUri = Some(location.toUri)), + params) + } + spark.sessionState.catalog.createPartitions(tableName, parts, ignoreIfExists = true) + done += parts.length + logDebug(s"Recovered ${parts.length} partitions ($done/$total so far)") + } + } +} + + +/** + * A command that sets the location of a table or a partition. + * + * For normal tables, this just sets the location URI in the table/partition's storage format. + * For datasource tables, this sets a "path" parameter in the table/partition's serde properties. + * + * The syntax of this command is: + * {{{ + * ALTER TABLE table_name [PARTITION partition_spec] SET LOCATION "loc"; + * }}} + */ +case class AlterTableSetLocationCommand( + tableName: TableIdentifier, + partitionSpec: Option[TablePartitionSpec], + location: String) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val catalog = sparkSession.sessionState.catalog + val table = catalog.getTableMetadata(tableName) + val locUri = CatalogUtils.stringToURI(location) + DDLUtils.verifyAlterTableType(catalog, table, isView = false) + partitionSpec match { + case Some(spec) => + DDLUtils.verifyPartitionProviderIsHive( + sparkSession, table, "ALTER TABLE ... SET LOCATION") + // Partition spec is specified, so we set the location only for this partition + val part = catalog.getPartition(table.identifier, spec) + val newPart = part.copy(storage = part.storage.copy(locationUri = Some(locUri))) + catalog.alterPartitions(table.identifier, Seq(newPart)) + case None => + // No partition spec is specified, so we set the location for the table itself + catalog.alterTable(table.withNewStorage(locationUri = Some(locUri))) + } + Seq.empty[Row] + } +}