Skip to content

Commit c5edbdf

Browse files
author
Davies Liu
committed
support ddl: MSCK REPAIR TABLE
1 parent 03d46aa commit c5edbdf

File tree

8 files changed

+224
-9
lines changed

8 files changed

+224
-9
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ statement
8484
| ALTER VIEW tableIdentifier
8585
DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* #dropTablePartitions
8686
| ALTER TABLE tableIdentifier partitionSpec? SET locationSpec #setTableLocation
87+
| ALTER TABLE tableIdentifier RECOVER PARTITIONS #recoverPartitions
8788
| DROP TABLE (IF EXISTS)? tableIdentifier PURGE? #dropTable
8889
| DROP VIEW (IF EXISTS)? tableIdentifier #dropTable
8990
| CREATE (OR REPLACE)? TEMPORARY? VIEW (IF NOT EXISTS)? tableIdentifier
@@ -121,6 +122,7 @@ statement
121122
| LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE
122123
tableIdentifier partitionSpec? #loadData
123124
| TRUNCATE TABLE tableIdentifier partitionSpec? #truncateTable
125+
| MSCK REPAIR TABLE tableIdentifier #repairTable
124126
| op=(ADD | LIST) identifier .*? #manageResource
125127
| SET ROLE .*? #failNativeCommand
126128
| SET .*? #setConfiguration
@@ -154,7 +156,6 @@ unsupportedHiveNativeCommands
154156
| kw1=UNLOCK kw2=DATABASE
155157
| kw1=CREATE kw2=TEMPORARY kw3=MACRO
156158
| kw1=DROP kw2=TEMPORARY kw3=MACRO
157-
| kw1=MSCK kw2=REPAIR kw3=TABLE
158159
| kw1=ALTER kw2=TABLE tableIdentifier kw3=NOT kw4=CLUSTERED
159160
| kw1=ALTER kw2=TABLE tableIdentifier kw3=CLUSTERED kw4=BY
160161
| kw1=ALTER kw2=TABLE tableIdentifier kw3=NOT kw4=SORTED
@@ -652,7 +653,7 @@ nonReserved
652653
| CASCADE | RESTRICT | BUCKETS | CLUSTERED | SORTED | PURGE | INPUTFORMAT | OUTPUTFORMAT
653654
| DBPROPERTIES | DFS | TRUNCATE | COMPUTE | LIST
654655
| STATISTICS | ANALYZE | PARTITIONED | EXTERNAL | DEFINED | RECORDWRITER
655-
| REVOKE | GRANT | LOCK | UNLOCK | MSCK | REPAIR | EXPORT | IMPORT | LOAD | VALUES | COMMENT | ROLE
656+
| REVOKE | GRANT | LOCK | UNLOCK | MSCK | REPAIR | RECOVER | EXPORT | IMPORT | LOAD | VALUES | COMMENT | ROLE
656657
| ROLES | COMPACTIONS | PRINCIPALS | TRANSACTIONS | INDEX | INDEXES | LOCKS | OPTION | LOCAL | INPATH
657658
| ASC | DESC | LIMIT | RENAME | SETS
658659
| AT | NULLS | OVERWRITE | ALL | ALTER | AS | BETWEEN | BY | CREATE | DELETE
@@ -865,6 +866,7 @@ LOCK: 'LOCK';
865866
UNLOCK: 'UNLOCK';
866867
MSCK: 'MSCK';
867868
REPAIR: 'REPAIR';
869+
RECOVER: 'RECOVER';
868870
EXPORT: 'EXPORT';
869871
IMPORT: 'IMPORT';
870872
LOAD: 'LOAD';

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,18 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
408408
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))
409409
}
410410

411+
/**
412+
* Create a [[RepairTableCommand]] command.
413+
*
414+
* For example:
415+
* {{{
416+
* MSCK REPAIR TABLE tablename
417+
* }}}
418+
*/
419+
override def visitRepairTable(ctx: RepairTableContext): LogicalPlan = withOrigin(ctx) {
420+
RepairTableCommand(visitTableIdentifier(ctx.tableIdentifier))
421+
}
422+
411423
/**
412424
* Convert a table property list into a key-value map.
413425
* This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]].
@@ -778,6 +790,23 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
778790
ctx.PURGE != null)
779791
}
780792

793+
/**
794+
* Create an [[AlterTableDiscoverPartitionsCommand]] command
795+
*
796+
* For example:
797+
* {{{
798+
* ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE];
799+
* ALTER VIEW view DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...];
800+
* }}}
801+
*
802+
* ALTER VIEW ... DROP PARTITION ... is not supported because the concept of partitioning
803+
* is associated with physical tables
804+
*/
805+
override def visitRecoverPartitions(
806+
ctx: RecoverPartitionsContext): LogicalPlan = withOrigin(ctx) {
807+
AlterTableRecoverPartitionsCommand(visitTableIdentifier(ctx.tableIdentifier))
808+
}
809+
781810
/**
782811
* Create an [[AlterTableSetLocationCommand]] command
783812
*

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 95 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,19 @@
1717

1818
package org.apache.spark.sql.execution.command
1919

20+
import java.io.File
21+
22+
import scala.collection.GenSeq
23+
import scala.collection.parallel.ForkJoinTaskSupport
24+
import scala.concurrent.forkjoin.ForkJoinPool
2025
import scala.util.control.NonFatal
2126

27+
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
28+
2229
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
2330
import org.apache.spark.sql.catalyst.TableIdentifier
24-
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogTable}
25-
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType, SessionCatalog}
26-
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
31+
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogTable, CatalogTablePartition, CatalogTableType, SessionCatalog}
32+
import org.apache.spark.sql.catalyst.catalog.CatalogTypes._
2733
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
2834
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
2935
import org.apache.spark.sql.types._
@@ -425,6 +431,92 @@ case class AlterTableDropPartitionCommand(
425431

426432
}
427433

434+
/**
435+
* Discover Partitions in ALTER TABLE: discover all the partition in the directory of a table and
436+
* update the catalog.
437+
*
438+
* The syntax of this command is:
439+
* {{{
440+
* ALTER TABLE table DISCOVER PARTITIONS;
441+
* }}}
442+
*/
443+
case class AlterTableRecoverPartitionsCommand(
444+
tableName: TableIdentifier) extends RunnableCommand {
445+
override def run(spark: SparkSession): Seq[Row] = {
446+
val catalog = spark.sessionState.catalog
447+
if (!catalog.tableExists(tableName)) {
448+
throw new AnalysisException(
449+
s"Table $tableName in ALTER TABLE RECOVER PARTITIONS does not exist.")
450+
}
451+
val table = catalog.getTableMetadata(tableName)
452+
if (catalog.isTemporaryTable(tableName)) {
453+
throw new AnalysisException(
454+
s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS on temporary tables: $tableName")
455+
}
456+
if (table.tableType != CatalogTableType.EXTERNAL) {
457+
throw new AnalysisException(
458+
s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS only works on external " +
459+
s"tables: $tableName")
460+
}
461+
if (table.partitionColumnNames.isEmpty) {
462+
throw new AnalysisException(
463+
s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS only works on partitioned " +
464+
s"tables: $tableName")
465+
}
466+
if (table.storage.locationUri.isEmpty) {
467+
throw new AnalysisException(
468+
s"Operation not allowed: ALTER TABLE RECOVER PARTITIONS only works on tables with " +
469+
s"location provided: $tableName")
470+
}
471+
472+
recoverPartitions(spark, table)
473+
Seq.empty[Row]
474+
}
475+
476+
def recoverPartitions(spark: SparkSession, table: CatalogTable): Unit = {
477+
val root = new Path(table.storage.locationUri.get)
478+
val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
479+
val partitionSpecsAndLocs = scanPartitions(spark, fs, root, Map(), table.partitionSchema.size)
480+
val parts = partitionSpecsAndLocs.map { case (spec, location) =>
481+
// inherit table storage format (possibly except for location)
482+
CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
483+
}
484+
spark.sessionState.catalog.createPartitions(tableName,
485+
parts.toArray[CatalogTablePartition], ignoreIfExists = true)
486+
}
487+
488+
@transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
489+
490+
private def scanPartitions(
491+
spark: SparkSession,
492+
fs: FileSystem,
493+
path: Path,
494+
spec: TablePartitionSpec,
495+
numPartitionsLeft: Int): GenSeq[(TablePartitionSpec, Path)] = {
496+
if (numPartitionsLeft == 0) {
497+
return Seq(spec -> path)
498+
}
499+
500+
val statuses = fs.listStatus(path)
501+
val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
502+
val statusPar: GenSeq[FileStatus] =
503+
if (numPartitionsLeft > 1 && statuses.length > threshold || numPartitionsLeft > 2) {
504+
val parArray = statuses.par
505+
parArray.tasksupport = evalTaskSupport
506+
parArray
507+
} else {
508+
statuses
509+
}
510+
statusPar.flatMap { st =>
511+
val ps = st.getPath.getName.split("=", 2)
512+
if (ps.length != 2) {
513+
throw new AnalysisException(s"Invalid partition path: ${st.getPath}")
514+
}
515+
scanPartitions(spark, fs, st.getPath, spec ++ Map(ps(0) -> ps(1)), numPartitionsLeft - 1)
516+
}
517+
}
518+
}
519+
428520

429521
/**
430522
* A command that sets the location of a table or a partition.

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
3535
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
3636
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode}
3737
import org.apache.spark.sql.catalyst.util.quoteIdentifier
38-
import org.apache.spark.sql.execution.datasources.PartitioningUtils
38+
import org.apache.spark.sql.execution.datasources.{PartitioningUtils}
3939
import org.apache.spark.sql.types._
4040
import org.apache.spark.util.Utils
4141

@@ -388,6 +388,46 @@ case class TruncateTableCommand(
388388
}
389389
}
390390

391+
/**
392+
* A command to repair a table by discovery all the partitions in the directory.
393+
*
394+
* The syntax of this command is:
395+
* {{{
396+
* MSCK REPAIR TABLE table_name;
397+
* }}}
398+
*
399+
* This command is the same as AlterTableRecoverPartitions
400+
*/
401+
case class RepairTableCommand(tableName: TableIdentifier) extends RunnableCommand {
402+
override def run(spark: SparkSession): Seq[Row] = {
403+
val catalog = spark.sessionState.catalog
404+
val table = catalog.getTableMetadata(tableName)
405+
if (!catalog.tableExists(tableName)) {
406+
throw new AnalysisException(s"Table $tableName in MSCK REPAIR TABLE does not exist.")
407+
}
408+
if (catalog.isTemporaryTable(tableName)) {
409+
throw new AnalysisException(
410+
s"Operation not allowed: MSCK REPAIR TABLE on temporary tables: $tableName")
411+
}
412+
if (table.tableType != CatalogTableType.EXTERNAL) {
413+
throw new AnalysisException(
414+
s"Operation not allowed: MSCK REPAIR TABLE only works on external tables: $tableName")
415+
}
416+
if (table.partitionColumnNames.isEmpty) {
417+
throw new AnalysisException(
418+
s"Operation not allowed: MSCK REPAIR TABLE only works on partitioned tables: $tableName")
419+
}
420+
if (table.storage.locationUri.isEmpty) {
421+
throw new AnalysisException(
422+
s"Operation not allowed: MSCK REPAIR TABLE only works on tables with location provided: " +
423+
s"$tableName")
424+
}
425+
426+
AlterTableRecoverPartitionsCommand(tableName).recoverPartitions(spark, table)
427+
Seq.empty[Row]
428+
}
429+
}
430+
391431
/**
392432
* Command that looks like
393433
* {{{

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,14 @@ class DDLCommandSuite extends PlanTest {
563563
comparePlans(parsed2, expected2)
564564
}
565565

566+
test("alter table: recover partitions") {
567+
val sql = "ALTER TABLE table_name RECOVER PARTITIONS"
568+
val parsed = parser.parsePlan(sql)
569+
val expected = AlterTableRecoverPartitionsCommand(
570+
TableIdentifier("table_name", None))
571+
comparePlans(parsed, expected)
572+
}
573+
566574
test("alter view: add partition (not supported)") {
567575
assertUnsupported(
568576
"""

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -827,6 +827,45 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
827827
testAddPartitions(isDatasourceTable = true)
828828
}
829829

830+
test("alter table: recover partitions (sequential)") {
831+
withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
832+
testRecoverPartitions()
833+
}
834+
}
835+
836+
test("after table: recover partition (parallel)") {
837+
withSQLConf("spark.rdd.parallelListingThreshold" -> "10") {
838+
testRecoverPartitions()
839+
}
840+
}
841+
842+
private def testRecoverPartitions() {
843+
val catalog = spark.sessionState.catalog
844+
// table to alter does not exist
845+
intercept[AnalysisException] {
846+
sql("ALTER TABLE does_not_exist RECOVER PARTITIONS")
847+
}
848+
849+
val tableIdent = TableIdentifier("tab1")
850+
createTable(catalog, tableIdent)
851+
val part1 = Map("a" -> "1", "b" -> "5")
852+
createTablePartition(catalog, part1, tableIdent)
853+
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
854+
855+
val part2 = Map("a" -> "2", "b" -> "6")
856+
val root = new Path(catalog.getTableMetadata(tableIdent).storage.locationUri.get)
857+
val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
858+
fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
859+
fs.mkdirs(new Path(new Path(root, "a=2"), "b=6"))
860+
try {
861+
sql("ALTER TABLE tab1 RECOVER PARTITIONS")
862+
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
863+
Set(part1, part2))
864+
} finally {
865+
fs.delete(root, true)
866+
}
867+
}
868+
830869
test("alter table: add partition is not supported for views") {
831870
assertUnsupported("ALTER VIEW dbx.tab1 ADD IF NOT EXISTS PARTITION (b='2')")
832871
}

sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
188188
"input42",
189189
"input_dfs",
190190
"metadata_export_drop",
191-
"repair",
192191

193192
// Uses a serde that isn't on the classpath... breaks other tests.
194193
"bucketizedhiveinputformat",
@@ -937,6 +936,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
937936
"reduce_deduplicate_exclude_gby",
938937
"reduce_deduplicate_exclude_join",
939938
"reduce_deduplicate_extended",
939+
"repair",
940940
"router_join_ppr",
941941
"select_as_omitted",
942942
"select_unquote_and",

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.hive
1919

2020
import org.apache.spark.sql.AnalysisException
21+
import org.apache.spark.sql.catalyst.TableIdentifier
2122
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
2223
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
2324
import org.apache.spark.sql.catalyst.dsl.expressions._
@@ -499,8 +500,12 @@ class HiveDDLCommandSuite extends PlanTest {
499500
}
500501
}
501502

502-
test("MSCK repair table (not supported)") {
503-
assertUnsupported("MSCK REPAIR TABLE tab1")
503+
test("MSCK REPAIR table") {
504+
val sql = "MSCK REPAIR TABLE tab1"
505+
val parsed = parser.parsePlan(sql)
506+
val expected = RepairTableCommand(
507+
TableIdentifier("tab1", None))
508+
comparePlans(parsed, expected)
504509
}
505510

506511
test("create table like") {

0 commit comments

Comments
 (0)