Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ statement
| ALTER VIEW tableIdentifier
DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* #dropTablePartitions
| ALTER TABLE tableIdentifier partitionSpec? SET locationSpec #setTableLocation
| ALTER TABLE tableIdentifier RECOVER PARTITIONS #recoverPartitions
| DROP TABLE (IF EXISTS)? tableIdentifier PURGE? #dropTable
| DROP VIEW (IF EXISTS)? tableIdentifier #dropTable
| CREATE (OR REPLACE)? TEMPORARY? VIEW (IF NOT EXISTS)? tableIdentifier
Expand Down Expand Up @@ -121,6 +122,7 @@ statement
| LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE
tableIdentifier partitionSpec? #loadData
| TRUNCATE TABLE tableIdentifier partitionSpec? #truncateTable
| MSCK REPAIR TABLE tableIdentifier #repairTable
| op=(ADD | LIST) identifier .*? #manageResource
| SET ROLE .*? #failNativeCommand
| SET .*? #setConfiguration
Expand Down Expand Up @@ -154,7 +156,6 @@ unsupportedHiveNativeCommands
| kw1=UNLOCK kw2=DATABASE
| kw1=CREATE kw2=TEMPORARY kw3=MACRO
| kw1=DROP kw2=TEMPORARY kw3=MACRO
| kw1=MSCK kw2=REPAIR kw3=TABLE
| kw1=ALTER kw2=TABLE tableIdentifier kw3=NOT kw4=CLUSTERED
| kw1=ALTER kw2=TABLE tableIdentifier kw3=CLUSTERED kw4=BY
| kw1=ALTER kw2=TABLE tableIdentifier kw3=NOT kw4=SORTED
Expand Down Expand Up @@ -653,7 +654,7 @@ nonReserved
| CASCADE | RESTRICT | BUCKETS | CLUSTERED | SORTED | PURGE | INPUTFORMAT | OUTPUTFORMAT
| DBPROPERTIES | DFS | TRUNCATE | COMPUTE | LIST
| STATISTICS | ANALYZE | PARTITIONED | EXTERNAL | DEFINED | RECORDWRITER
| REVOKE | GRANT | LOCK | UNLOCK | MSCK | REPAIR | EXPORT | IMPORT | LOAD | VALUES | COMMENT | ROLE
| REVOKE | GRANT | LOCK | UNLOCK | MSCK | REPAIR | RECOVER | EXPORT | IMPORT | LOAD | VALUES | COMMENT | ROLE
| ROLES | COMPACTIONS | PRINCIPALS | TRANSACTIONS | INDEX | INDEXES | LOCKS | OPTION | LOCAL | INPATH
| ASC | DESC | LIMIT | RENAME | SETS
| AT | NULLS | OVERWRITE | ALL | ALTER | AS | BETWEEN | BY | CREATE | DELETE
Expand Down Expand Up @@ -866,6 +867,7 @@ LOCK: 'LOCK';
UNLOCK: 'UNLOCK';
MSCK: 'MSCK';
REPAIR: 'REPAIR';
RECOVER: 'RECOVER';
EXPORT: 'EXPORT';
IMPORT: 'IMPORT';
LOAD: 'LOAD';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,20 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))
}

/**
* Create a [[AlterTableRecoverPartitionsCommand]] command.
*
* For example:
* {{{
* MSCK REPAIR TABLE tablename
* }}}
*/
override def visitRepairTable(ctx: RepairTableContext): LogicalPlan = withOrigin(ctx) {
AlterTableRecoverPartitionsCommand(
visitTableIdentifier(ctx.tableIdentifier),
"MSCK REPAIR TABLE")
}

/**
* Convert a table property list into a key-value map.
* This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]].
Expand Down Expand Up @@ -784,6 +798,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
ctx.PURGE != null)
}

/**
* Create an [[AlterTableDiscoverPartitionsCommand]] command
*
* For example:
* {{{
* ALTER TABLE table RECOVER PARTITIONS;
* }}}
*/
override def visitRecoverPartitions(
ctx: RecoverPartitionsContext): LogicalPlan = withOrigin(ctx) {
AlterTableRecoverPartitionsCommand(visitTableIdentifier(ctx.tableIdentifier))
}

/**
* Create an [[AlterTableSetLocationCommand]] command
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,23 @@

package org.apache.spark.sql.execution.command

import scala.collection.GenSeq
import scala.collection.parallel.ForkJoinTaskSupport
import scala.concurrent.forkjoin.ForkJoinPool
import scala.util.control.NonFatal

import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogTable, CatalogTablePartition, CatalogTableType, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes._
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.types._


// Note: The definition of these commands are based on the ones described in
// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL

Expand Down Expand Up @@ -425,6 +430,111 @@ case class AlterTableDropPartitionCommand(

}

/**
* Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and
* update the catalog.
*
* The syntax of this command is:
* {{{
* ALTER TABLE table RECOVER PARTITIONS;
* MSCK REPAIR TABLE table;
* }}}
*/
case class AlterTableRecoverPartitionsCommand(
tableName: TableIdentifier,
cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {
override def run(spark: SparkSession): Seq[Row] = {
val catalog = spark.sessionState.catalog
if (!catalog.tableExists(tableName)) {
throw new AnalysisException(s"Table $tableName in $cmd does not exist.")
}
val table = catalog.getTableMetadata(tableName)
if (catalog.isTemporaryTable(tableName)) {
throw new AnalysisException(
s"Operation not allowed: $cmd on temporary tables: $tableName")
}
if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
s"Operation not allowed: $cmd on datasource tables: $tableName")
}
if (table.tableType != CatalogTableType.EXTERNAL) {
throw new AnalysisException(
s"Operation not allowed: $cmd only works on external tables: $tableName")
}
if (!DDLUtils.isTablePartitioned(table)) {
throw new AnalysisException(
s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
}
if (table.storage.locationUri.isEmpty) {
throw new AnalysisException(
s"Operation not allowed: $cmd only works on table with location provided: $tableName")
}

val root = new Path(table.storage.locationUri.get)
val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
// Dummy jobconf to get to the pathFilter defined in configuration
// It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, this.getClass)
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
val partitionSpecsAndLocs = scanPartitions(
spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase))
val parts = partitionSpecsAndLocs.map { case (spec, location) =>
// inherit table storage format (possibly except for location)
CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
}
spark.sessionState.catalog.createPartitions(tableName,
parts.toArray[CatalogTablePartition], ignoreIfExists = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if we get thousands of new partitions of tens thousands of new partitions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, see the implementation in HiveShim:

  // Follows exactly the same logic of DDLTask.createPartitions in Hive 0.12
  override def createPartitions(
      hive: Hive,
      database: String,
      tableName: String,
      parts: Seq[CatalogTablePartition],
      ignoreIfExists: Boolean): Unit = {
    val table = hive.getTable(database, tableName)
    parts.foreach { s =>
      val location = s.storage.locationUri.map(new Path(table.getPath, _)).orNull
      val spec = s.spec.asJava
      if (hive.getPartition(table, spec, false) != null && ignoreIfExists) {
        // Ignore this partition since it already exists and ignoreIfExists == true
      } else {
        if (location == null && table.isView()) {
          throw new HiveException("LOCATION clause illegal for view partition");
        }

        createPartitionMethod.invoke(
          hive,
          table,
          spec,
          location,
          null, // partParams
          null, // inputFormat
          null, // outputFormat
          -1: JInteger, // numBuckets
          null, // cols
          null, // serializationLib
          null, // serdeParams
          null, // bucketCols
          null) // sortCols
      }
    }
  }

All these partitions will be insert into Hive in sequential way, so group them as batches will not help here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, this is true for Hive <=0.12, for Hive 0.13+, they are sent in single RPC. so we should verify that what's limit for a single RPC

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the Hive Metastore can't handle a RPC with millions of partitions, I will send a patch to do it in batch.

Seq.empty[Row]
}

@transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8))

private def scanPartitions(
spark: SparkSession,
fs: FileSystem,
filter: PathFilter,
path: Path,
spec: TablePartitionSpec,
partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
if (partitionNames.length == 0) {
return Seq(spec -> path)
}

val statuses = fs.listStatus(path)
val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
val statusPar: GenSeq[FileStatus] =
if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
val parArray = statuses.par
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i didn't look carefully - but if you are using the default exec context, please create a new one. otherwise it'd block.

Copy link
Contributor

@hvanhovell hvanhovell Aug 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool. can we make it explicit, e.g. statuses.par(evalTaskSupport)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is copied from UnionRDD.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not figure out how it work, at least statuses.par(evalTaskSupport) does not work.

parArray.tasksupport = evalTaskSupport
parArray
} else {
statuses
}
statusPar.flatMap { st =>
val name = st.getPath.getName
if (st.isDirectory && name.contains("=")) {
val ps = name.split("=", 2)
val columnName = PartitioningUtils.unescapePathName(ps(0)).toLowerCase
// TODO: Validate the value
val value = PartitioningUtils.unescapePathName(ps(1))
Copy link
Member

@viirya viirya Aug 6, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check if the value is valid? E.g., for a partition column "a" of IntegerType, "a=abc" is invalid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could have a TODO here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this escaping cause problems in (say) S3 for objects of the form "foo%20bar"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the partitions are generated by Spark, they could be unescape back correctly. For others, they could be compatibility issues. For example, Spark does not escape in Linux, the unescaping for%20 could be wrong (we could show an warning?). I think these are not in the scope of this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that makes sense.

// comparing with case-insensitive, but preserve the case
if (columnName == partitionNames(0)) {
Copy link
Member

@viirya viirya Aug 6, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A directory name like "a=" will pass this condition and get empty partition value. Is it allowed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's valid.

scanPartitions(
spark, fs, filter, st.getPath, spec ++ Map(columnName -> value), partitionNames.drop(1))
} else {
logWarning(s"expect partition column ${partitionNames(0)}, but got ${ps(0)}, ignore it")
Seq()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like hive, we may consider throwing an exception here (that could be turned off via a config).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Hive only throws exception when there are not allowed character in the value, not other cases. I'd like to avoid any configs if no serious problem here.

}
} else {
if (name != "_SUCCESS" && name != "_temporary" && !name.startsWith(".")) {
logWarning(s"ignore ${new Path(path, name)}")
}
Seq()
}
}
}
}


/**
* A command that sets the location of a table or a partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.execution.datasources.{PartitioningUtils}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,14 @@ class DDLCommandSuite extends PlanTest {
comparePlans(parsed2, expected2)
}

test("alter table: recover partitions") {
val sql = "ALTER TABLE table_name RECOVER PARTITIONS"
val parsed = parser.parsePlan(sql)
val expected = AlterTableRecoverPartitionsCommand(
TableIdentifier("table_name", None))
comparePlans(parsed, expected)
}

test("alter view: add partition (not supported)") {
assertUnsupported(
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,55 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
testAddPartitions(isDatasourceTable = true)
}

test("alter table: recover partitions (sequential)") {
withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
testRecoverPartitions()
}
}

test("alter table: recover partition (parallel)") {
withSQLConf("spark.rdd.parallelListingThreshold" -> "10") {
testRecoverPartitions()
}
}

private def testRecoverPartitions() {
val catalog = spark.sessionState.catalog
// table to alter does not exist
intercept[AnalysisException] {
sql("ALTER TABLE does_not_exist RECOVER PARTITIONS")
}

val tableIdent = TableIdentifier("tab1")
createTable(catalog, tableIdent)
val part1 = Map("a" -> "1", "b" -> "5")
createTablePartition(catalog, part1, tableIdent)
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))

val part2 = Map("a" -> "2", "b" -> "6")
val root = new Path(catalog.getTableMetadata(tableIdent).storage.locationUri.get)
val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
// valid
fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
fs.mkdirs(new Path(new Path(root, "A=2"), "B=6"))
// invalid
fs.mkdirs(new Path(new Path(root, "a"), "b")) // bad name
fs.mkdirs(new Path(new Path(root, "b=1"), "a=1")) // wrong order
fs.mkdirs(new Path(root, "a=4")) // not enough columns
fs.createNewFile(new Path(new Path(root, "a=1"), "b=4")) // file
fs.createNewFile(new Path(new Path(root, "a=1"), "_SUCCESS")) // _SUCCESS
fs.mkdirs(new Path(new Path(root, "a=1"), "_temporary")) // _temporary
fs.mkdirs(new Path(new Path(root, "a=1"), ".b=4")) // start with .

try {
sql("ALTER TABLE tab1 RECOVER PARTITIONS")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
Set(part1, part2))
} finally {
fs.delete(root, true)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add tests to exercise the command more. Here are three examples.

  1. There is an partition dir has a bad name (not in the format of key=value).
  2. Say that we have two partition columns. We have some files under the first layer (e.g. _SUCCESS, parquet's metadata files, and/or regular data files).
  3. Some dirs do not have the expected number of partition columns. For example, the schema specifies 3 partition columns. But, a path only has two partition columns.
  4. The partition column columns encoded in the path does not match the name specified in the schema. For example, when we create the table, we specify c1 as the first partition column. However, the dir in fs has c2 as the first partition column.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


test("alter table: add partition is not supported for views") {
assertUnsupported("ALTER VIEW dbx.tab1 ADD IF NOT EXISTS PARTITION (b='2')")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hive

import org.apache.spark.sql.{AnalysisException, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.dsl.expressions._
Expand Down Expand Up @@ -499,8 +500,13 @@ class HiveDDLCommandSuite extends PlanTest {
}
}

test("MSCK repair table (not supported)") {
assertUnsupported("MSCK REPAIR TABLE tab1")
test("MSCK REPAIR table") {
val sql = "MSCK REPAIR TABLE tab1"
val parsed = parser.parsePlan(sql)
val expected = AlterTableRecoverPartitionsCommand(
TableIdentifier("tab1", None),
"MSCK REPAIR TABLE")
comparePlans(parsed, expected)
}

test("create table like") {
Expand Down