Skip to content

Commit 2d136db

Browse files
Davies Liudavies
authored andcommitted
[SPARK-16905] SQL DDL: MSCK REPAIR TABLE
MSCK REPAIR TABLE could be used to recover the partitions in external catalog based on partitions in file system. Another syntax is: ALTER TABLE table RECOVER PARTITIONS The implementation in this PR will only list partitions (not the files with a partition) in driver (in parallel if needed). Added unit tests for it and Hive compatibility test suite. Author: Davies Liu <[email protected]> Closes #14500 from davies/repair_table.
1 parent 44115e9 commit 2d136db

File tree

6 files changed

+209
-7
lines changed

6 files changed

+209
-7
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ statement
8484
| ALTER VIEW tableIdentifier
8585
DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* #dropTablePartitions
8686
| ALTER TABLE tableIdentifier partitionSpec? SET locationSpec #setTableLocation
87+
| ALTER TABLE tableIdentifier RECOVER PARTITIONS #recoverPartitions
8788
| DROP TABLE (IF EXISTS)? tableIdentifier PURGE? #dropTable
8889
| DROP VIEW (IF EXISTS)? tableIdentifier #dropTable
8990
| CREATE (OR REPLACE)? TEMPORARY? VIEW (IF NOT EXISTS)? tableIdentifier
@@ -121,6 +122,7 @@ statement
121122
| LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE
122123
tableIdentifier partitionSpec? #loadData
123124
| TRUNCATE TABLE tableIdentifier partitionSpec? #truncateTable
125+
| MSCK REPAIR TABLE tableIdentifier #repairTable
124126
| op=(ADD | LIST) identifier .*? #manageResource
125127
| SET ROLE .*? #failNativeCommand
126128
| SET .*? #setConfiguration
@@ -154,7 +156,6 @@ unsupportedHiveNativeCommands
154156
| kw1=UNLOCK kw2=DATABASE
155157
| kw1=CREATE kw2=TEMPORARY kw3=MACRO
156158
| kw1=DROP kw2=TEMPORARY kw3=MACRO
157-
| kw1=MSCK kw2=REPAIR kw3=TABLE
158159
| kw1=ALTER kw2=TABLE tableIdentifier kw3=NOT kw4=CLUSTERED
159160
| kw1=ALTER kw2=TABLE tableIdentifier kw3=CLUSTERED kw4=BY
160161
| kw1=ALTER kw2=TABLE tableIdentifier kw3=NOT kw4=SORTED
@@ -646,7 +647,7 @@ nonReserved
646647
| CASCADE | RESTRICT | BUCKETS | CLUSTERED | SORTED | PURGE | INPUTFORMAT | OUTPUTFORMAT
647648
| DBPROPERTIES | DFS | TRUNCATE | COMPUTE | LIST
648649
| STATISTICS | ANALYZE | PARTITIONED | EXTERNAL | DEFINED | RECORDWRITER
649-
| REVOKE | GRANT | LOCK | UNLOCK | MSCK | REPAIR | EXPORT | IMPORT | LOAD | VALUES | COMMENT | ROLE
650+
| REVOKE | GRANT | LOCK | UNLOCK | MSCK | REPAIR | RECOVER | EXPORT | IMPORT | LOAD | VALUES | COMMENT | ROLE
650651
| ROLES | COMPACTIONS | PRINCIPALS | TRANSACTIONS | INDEX | INDEXES | LOCKS | OPTION | LOCAL | INPATH
651652
| ASC | DESC | LIMIT | RENAME | SETS
652653
| AT | NULLS | OVERWRITE | ALL | ALTER | AS | BETWEEN | BY | CREATE | DELETE
@@ -859,6 +860,7 @@ LOCK: 'LOCK';
859860
UNLOCK: 'UNLOCK';
860861
MSCK: 'MSCK';
861862
REPAIR: 'REPAIR';
863+
RECOVER: 'RECOVER';
862864
EXPORT: 'EXPORT';
863865
IMPORT: 'IMPORT';
864866
LOAD: 'LOAD';

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,20 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
405405
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))
406406
}
407407

408+
/**
409+
* Create a [[AlterTableRecoverPartitionsCommand]] command.
410+
*
411+
* For example:
412+
* {{{
413+
* MSCK REPAIR TABLE tablename
414+
* }}}
415+
*/
416+
override def visitRepairTable(ctx: RepairTableContext): LogicalPlan = withOrigin(ctx) {
417+
AlterTableRecoverPartitionsCommand(
418+
visitTableIdentifier(ctx.tableIdentifier),
419+
"MSCK REPAIR TABLE")
420+
}
421+
408422
/**
409423
* Convert a table property list into a key-value map.
410424
* This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]].
@@ -763,6 +777,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
763777
ctx.EXISTS != null)
764778
}
765779

780+
/**
781+
* Create an [[AlterTableDiscoverPartitionsCommand]] command
782+
*
783+
* For example:
784+
* {{{
785+
* ALTER TABLE table RECOVER PARTITIONS;
786+
* }}}
787+
*/
788+
override def visitRecoverPartitions(
789+
ctx: RecoverPartitionsContext): LogicalPlan = withOrigin(ctx) {
790+
AlterTableRecoverPartitionsCommand(visitTableIdentifier(ctx.tableIdentifier))
791+
}
792+
766793
/**
767794
* Create an [[AlterTableSetLocationCommand]] command
768795
*

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 113 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,24 @@
1717

1818
package org.apache.spark.sql.execution.command
1919

20+
import scala.collection.GenSeq
21+
import scala.collection.parallel.ForkJoinTaskSupport
22+
import scala.concurrent.forkjoin.ForkJoinPool
2023
import scala.util.control.NonFatal
2124

25+
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
26+
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
27+
2228
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
2329
import org.apache.spark.sql.catalyst.TableIdentifier
24-
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
25-
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType, SessionCatalog}
30+
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTablePartition, CatalogTableType, SessionCatalog}
2631
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
2732
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
2833
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
2934
import org.apache.spark.sql.execution.datasources.BucketSpec
35+
import org.apache.spark.sql.execution.datasources.PartitioningUtils
3036
import org.apache.spark.sql.types._
3137

32-
3338
// Note: The definition of these commands are based on the ones described in
3439
// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
3540

@@ -424,6 +429,111 @@ case class AlterTableDropPartitionCommand(
424429

425430
}
426431

432+
/**
433+
* Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and
434+
* update the catalog.
435+
*
436+
* The syntax of this command is:
437+
* {{{
438+
* ALTER TABLE table RECOVER PARTITIONS;
439+
* MSCK REPAIR TABLE table;
440+
* }}}
441+
*/
442+
case class AlterTableRecoverPartitionsCommand(
443+
tableName: TableIdentifier,
444+
cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {
445+
override def run(spark: SparkSession): Seq[Row] = {
446+
val catalog = spark.sessionState.catalog
447+
if (!catalog.tableExists(tableName)) {
448+
throw new AnalysisException(s"Table $tableName in $cmd does not exist.")
449+
}
450+
val table = catalog.getTableMetadata(tableName)
451+
if (catalog.isTemporaryTable(tableName)) {
452+
throw new AnalysisException(
453+
s"Operation not allowed: $cmd on temporary tables: $tableName")
454+
}
455+
if (DDLUtils.isDatasourceTable(table)) {
456+
throw new AnalysisException(
457+
s"Operation not allowed: $cmd on datasource tables: $tableName")
458+
}
459+
if (table.tableType != CatalogTableType.EXTERNAL) {
460+
throw new AnalysisException(
461+
s"Operation not allowed: $cmd only works on external tables: $tableName")
462+
}
463+
if (!DDLUtils.isTablePartitioned(table)) {
464+
throw new AnalysisException(
465+
s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
466+
}
467+
if (table.storage.locationUri.isEmpty) {
468+
throw new AnalysisException(
469+
s"Operation not allowed: $cmd only works on table with location provided: $tableName")
470+
}
471+
472+
val root = new Path(table.storage.locationUri.get)
473+
val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
474+
// Dummy jobconf to get to the pathFilter defined in configuration
475+
// It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
476+
val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, this.getClass)
477+
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
478+
val partitionSpecsAndLocs = scanPartitions(
479+
spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase))
480+
val parts = partitionSpecsAndLocs.map { case (spec, location) =>
481+
// inherit table storage format (possibly except for location)
482+
CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
483+
}
484+
spark.sessionState.catalog.createPartitions(tableName,
485+
parts.toArray[CatalogTablePartition], ignoreIfExists = true)
486+
Seq.empty[Row]
487+
}
488+
489+
@transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
490+
491+
private def scanPartitions(
492+
spark: SparkSession,
493+
fs: FileSystem,
494+
filter: PathFilter,
495+
path: Path,
496+
spec: TablePartitionSpec,
497+
partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
498+
if (partitionNames.length == 0) {
499+
return Seq(spec -> path)
500+
}
501+
502+
val statuses = fs.listStatus(path)
503+
val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
504+
val statusPar: GenSeq[FileStatus] =
505+
if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
506+
val parArray = statuses.par
507+
parArray.tasksupport = evalTaskSupport
508+
parArray
509+
} else {
510+
statuses
511+
}
512+
statusPar.flatMap { st =>
513+
val name = st.getPath.getName
514+
if (st.isDirectory && name.contains("=")) {
515+
val ps = name.split("=", 2)
516+
val columnName = PartitioningUtils.unescapePathName(ps(0)).toLowerCase
517+
// TODO: Validate the value
518+
val value = PartitioningUtils.unescapePathName(ps(1))
519+
// comparing with case-insensitive, but preserve the case
520+
if (columnName == partitionNames(0)) {
521+
scanPartitions(
522+
spark, fs, filter, st.getPath, spec ++ Map(columnName -> value), partitionNames.drop(1))
523+
} else {
524+
logWarning(s"expect partition column ${partitionNames(0)}, but got ${ps(0)}, ignore it")
525+
Seq()
526+
}
527+
} else {
528+
if (name != "_SUCCESS" && name != "_temporary" && !name.startsWith(".")) {
529+
logWarning(s"ignore ${new Path(path, name)}")
530+
}
531+
Seq()
532+
}
533+
}
534+
}
535+
}
536+
427537

428538
/**
429539
* A command that sets the location of a table or a partition.

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,14 @@ class DDLCommandSuite extends PlanTest {
564564
comparePlans(parsed2, expected2)
565565
}
566566

567+
test("alter table: recover partitions") {
568+
val sql = "ALTER TABLE table_name RECOVER PARTITIONS"
569+
val parsed = parser.parsePlan(sql)
570+
val expected = AlterTableRecoverPartitionsCommand(
571+
TableIdentifier("table_name", None))
572+
comparePlans(parsed, expected)
573+
}
574+
567575
test("alter view: add partition (not supported)") {
568576
assertUnsupported(
569577
"""

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,55 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
628628
testAddPartitions(isDatasourceTable = true)
629629
}
630630

631+
test("alter table: recover partitions (sequential)") {
632+
withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
633+
testRecoverPartitions()
634+
}
635+
}
636+
637+
test("alter table: recover partition (parallel)") {
638+
withSQLConf("spark.rdd.parallelListingThreshold" -> "10") {
639+
testRecoverPartitions()
640+
}
641+
}
642+
643+
private def testRecoverPartitions() {
644+
val catalog = spark.sessionState.catalog
645+
// table to alter does not exist
646+
intercept[AnalysisException] {
647+
sql("ALTER TABLE does_not_exist RECOVER PARTITIONS")
648+
}
649+
650+
val tableIdent = TableIdentifier("tab1")
651+
createTable(catalog, tableIdent)
652+
val part1 = Map("a" -> "1", "b" -> "5")
653+
createTablePartition(catalog, part1, tableIdent)
654+
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
655+
656+
val part2 = Map("a" -> "2", "b" -> "6")
657+
val root = new Path(catalog.getTableMetadata(tableIdent).storage.locationUri.get)
658+
val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
659+
// valid
660+
fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
661+
fs.mkdirs(new Path(new Path(root, "A=2"), "B=6"))
662+
// invalid
663+
fs.mkdirs(new Path(new Path(root, "a"), "b")) // bad name
664+
fs.mkdirs(new Path(new Path(root, "b=1"), "a=1")) // wrong order
665+
fs.mkdirs(new Path(root, "a=4")) // not enough columns
666+
fs.createNewFile(new Path(new Path(root, "a=1"), "b=4")) // file
667+
fs.createNewFile(new Path(new Path(root, "a=1"), "_SUCCESS")) // _SUCCESS
668+
fs.mkdirs(new Path(new Path(root, "a=1"), "_temporary")) // _temporary
669+
fs.mkdirs(new Path(new Path(root, "a=1"), ".b=4")) // start with .
670+
671+
try {
672+
sql("ALTER TABLE tab1 RECOVER PARTITIONS")
673+
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
674+
Set(part1, part2))
675+
} finally {
676+
fs.delete(root, true)
677+
}
678+
}
679+
631680
test("alter table: add partition is not supported for views") {
632681
assertUnsupported("ALTER VIEW dbx.tab1 ADD IF NOT EXISTS PARTITION (b='2')")
633682
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.hive
1919

2020
import org.apache.spark.sql.AnalysisException
21+
import org.apache.spark.sql.catalyst.TableIdentifier
2122
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
2223
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
2324
import org.apache.spark.sql.catalyst.dsl.expressions._
@@ -520,8 +521,13 @@ class HiveDDLCommandSuite extends PlanTest {
520521
}
521522
}
522523

523-
test("MSCK repair table (not supported)") {
524-
assertUnsupported("MSCK REPAIR TABLE tab1")
524+
test("MSCK REPAIR table") {
525+
val sql = "MSCK REPAIR TABLE tab1"
526+
val parsed = parser.parsePlan(sql)
527+
val expected = AlterTableRecoverPartitionsCommand(
528+
TableIdentifier("tab1", None),
529+
"MSCK REPAIR TABLE")
530+
comparePlans(parsed, expected)
525531
}
526532

527533
test("create table like") {

0 commit comments

Comments
 (0)