Skip to content

Conversation

@davies
Copy link
Contributor

@davies davies commented Aug 4, 2016

What changes were proposed in this pull request?

MSCK REPAIR TABLE could be used to recover the partitions in external catalog based on partitions in file system.

Another syntax is: ALTER TABLE table RECOVER PARTITIONS

The implementation in this PR will only list partitions (not the files with a partition) in driver (in parallel if needed).

How was this patch tested?

Added unit tests for it and Hive compatibility test suite.

@davies
Copy link
Contributor Author

davies commented Aug 4, 2016

@yhuai Could you help to generate the golden result for this suite?

@JoshRosen
Copy link
Contributor

Jenkins, retest this please.

@yhuai
Copy link
Contributor

yhuai commented Aug 4, 2016

We do not generate golden files anymore. Let's port those tests. Thanks.

@davies
Copy link
Contributor Author

davies commented Aug 4, 2016

@yhuai Just checked the repair.q, it's kind of useless, already covered by out unit test, we could just ignore it.

case class RepairTableCommand(tableName: TableIdentifier) extends RunnableCommand {
override def run(spark: SparkSession): Seq[Row] = {
val catalog = spark.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a dead code. The previous line already checks whether the table exists or not.

val table = catalog.getTableMetadata(tableName)

@SparkQA
Copy link

SparkQA commented Aug 4, 2016

Test build #63242 has finished for PR 14500 at commit c5edbdf.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class AlterTableRecoverPartitionsCommand(
    • case class RepairTableCommand(tableName: TableIdentifier) extends RunnableCommand

@SparkQA
Copy link

SparkQA commented Aug 4, 2016

Test build #63243 has finished for PR 14500 at commit c5edbdf.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class AlterTableRecoverPartitionsCommand(
    • case class RepairTableCommand(tableName: TableIdentifier) extends RunnableCommand

@SparkQA
Copy link

SparkQA commented Aug 5, 2016

Test build #63244 has finished for PR 14500 at commit 89d22f4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 5, 2016

Test build #63246 has finished for PR 14500 at commit f338516.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* Create an [[AlterTableDiscoverPartitionsCommand]] command
*
* For example:
* {{{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Update the syntax and the comments here.

@davies davies changed the title [SPARK-] SQL DDL: MSCK REPAIR TABLE [SPARK-16905] SQL DDL: MSCK REPAIR TABLE Aug 5, 2016
@SparkQA
Copy link

SparkQA commented Aug 5, 2016

Test build #63279 has finished for PR 14500 at commit 7f4f38d.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 5, 2016

Test build #63283 has finished for PR 14500 at commit e478c3a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • public class ShuffleIndexInformation
    • public class ShuffleIndexRecord
    • case class Least(children: Seq[Expression]) extends Expression
    • case class Greatest(children: Seq[Expression]) extends Expression
    • case class CreateTable(tableDesc: CatalogTable, mode: SaveMode, query: Option[LogicalPlan])
    • case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan]

val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
val statusPar: GenSeq[FileStatus] =
if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
val parArray = statuses.par
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i didn't look carefully - but if you are using the default exec context, please create a new one. otherwise it'd block.

Copy link
Contributor

@hvanhovell hvanhovell Aug 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool. can we make it explicit, e.g. statuses.par(evalTaskSupport)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is copied from UnionRDD.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not figure out how it work, at least statuses.par(evalTaskSupport) does not work.

if (st.isDirectory && name.contains("=")) {
val ps = name.split("=", 2)
val columnName = PartitioningUtils.unescapePathName(ps(0)).toLowerCase
val value = PartitioningUtils.unescapePathName(ps(1))
Copy link
Member

@viirya viirya Aug 6, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check if the value is valid? E.g., for a partition column "a" of IntegerType, "a=abc" is invalid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could have a TODO here.

@SparkQA
Copy link

SparkQA commented Aug 8, 2016

Test build #63375 has finished for PR 14500 at commit e5906cf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val ps = name.split("=", 2)
val columnName = PartitioningUtils.unescapePathName(ps(0)).toLowerCase
// TODO: Validate the value
val value = PartitioningUtils.unescapePathName(ps(1))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this escaping cause problems in (say) S3 for objects of the form "foo%20bar"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the partitions are generated by Spark, they could be unescape back correctly. For others, they could be compatibility issues. For example, Spark does not escape in Linux, the unescaping for%20 could be wrong (we could show an warning?). I think these are not in the scope of this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that makes sense.

@sameeragarwal
Copy link
Member

LGTM

@davies
Copy link
Contributor Author

davies commented Aug 9, 2016

Merging into master and 2.0 branch, thanks!

@asfgit asfgit closed this in 92da228 Aug 9, 2016
@yhuai
Copy link
Contributor

yhuai commented Aug 9, 2016

@liancheng Can you do a post-hoc review?

asfgit pushed a commit that referenced this pull request Aug 9, 2016
MSCK REPAIR TABLE could be used to recover the partitions in external catalog based on partitions in file system.

Another syntax is: ALTER TABLE table RECOVER PARTITIONS

The implementation in this PR will only list partitions (not the files with a partition) in driver (in parallel if needed).

Added unit tests for it and Hive compatibility test suite.

Author: Davies Liu <[email protected]>

Closes #14500 from davies/repair_table.
@hvanhovell
Copy link
Contributor

LGTM

CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
}
spark.sessionState.catalog.createPartitions(tableName,
parts.toArray[CatalogTablePartition], ignoreIfExists = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if we get thousands of new partitions of tens thousands of new partitions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, see the implementation in HiveShim:

  // Follows exactly the same logic of DDLTask.createPartitions in Hive 0.12
  override def createPartitions(
      hive: Hive,
      database: String,
      tableName: String,
      parts: Seq[CatalogTablePartition],
      ignoreIfExists: Boolean): Unit = {
    val table = hive.getTable(database, tableName)
    parts.foreach { s =>
      val location = s.storage.locationUri.map(new Path(table.getPath, _)).orNull
      val spec = s.spec.asJava
      if (hive.getPartition(table, spec, false) != null && ignoreIfExists) {
        // Ignore this partition since it already exists and ignoreIfExists == true
      } else {
        if (location == null && table.isView()) {
          throw new HiveException("LOCATION clause illegal for view partition");
        }

        createPartitionMethod.invoke(
          hive,
          table,
          spec,
          location,
          null, // partParams
          null, // inputFormat
          null, // outputFormat
          -1: JInteger, // numBuckets
          null, // cols
          null, // serializationLib
          null, // serdeParams
          null, // bucketCols
          null) // sortCols
      }
    }
  }

All these partitions will be insert into Hive in sequential way, so group them as batches will not help here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, this is true for Hive <=0.12, for Hive 0.13+, they are sent in single RPC. so we should verify that what's limit for a single RPC

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the Hive Metastore can't handle a RPC with millions of partitions, I will send a patch to do it in batch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants