Skip to content

Commit 3d283f6

Browse files
Davies Liudavies
authored andcommitted
[SPARK-17063] [SQL] Improve performance of MSCK REPAIR TABLE with Hive metastore
This PR split the the single `createPartitions()` call into smaller batches, which could prevent Hive metastore from OOM (caused by millions of partitions). It will also try to gather all the fast stats (number of files and total size of all files) in parallel to avoid the bottle neck of listing the files in metastore sequential, which is controlled by spark.sql.gatherFastStats (enabled by default). Tested locally with 10000 partitions and 100 files with embedded metastore, without gathering fast stats in parallel, adding partitions took 153 seconds, after enable that, gathering the fast stats took about 34 seconds, adding these partitions took 25 seconds (most of the time spent in object store), 59 seconds in total, 2.5X faster (with larger cluster, gathering will much faster). Author: Davies Liu <[email protected]> Closes #14607 from davies/repair_batch. (cherry picked from commit 48caec2) Signed-off-by: Davies Liu <[email protected]>
1 parent eec0371 commit 3d283f6

File tree

7 files changed

+200
-33
lines changed

7 files changed

+200
-33
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,10 +103,12 @@ case class CatalogColumn(
103103
*
104104
* @param spec partition spec values indexed by column name
105105
* @param storage storage format of the partition
106+
* @param parameters some parameters for the partition, for example, stats.
106107
*/
107108
case class CatalogTablePartition(
108109
spec: CatalogTypes.TablePartitionSpec,
109-
storage: CatalogStorageFormat)
110+
storage: CatalogStorageFormat,
111+
parameters: Map[String, String] = Map.empty)
110112

111113

112114
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 129 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@
1717

1818
package org.apache.spark.sql.execution.command
1919

20-
import scala.collection.GenSeq
20+
import scala.collection.{GenMap, GenSeq}
2121
import scala.collection.parallel.ForkJoinTaskSupport
2222
import scala.concurrent.forkjoin.ForkJoinPool
2323
import scala.util.control.NonFatal
2424

25-
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
25+
import org.apache.hadoop.conf.Configuration
26+
import org.apache.hadoop.fs._
2627
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
2728

2829
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
@@ -34,6 +35,7 @@ import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
3435
import org.apache.spark.sql.execution.datasources.BucketSpec
3536
import org.apache.spark.sql.execution.datasources.PartitioningUtils
3637
import org.apache.spark.sql.types._
38+
import org.apache.spark.util.SerializableConfiguration
3739

3840
// Note: The definition of these commands are based on the ones described in
3941
// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
@@ -429,6 +431,9 @@ case class AlterTableDropPartitionCommand(
429431

430432
}
431433

434+
435+
case class PartitionStatistics(numFiles: Int, totalSize: Long)
436+
432437
/**
433438
* Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and
434439
* update the catalog.
@@ -442,6 +447,31 @@ case class AlterTableDropPartitionCommand(
442447
case class AlterTableRecoverPartitionsCommand(
443448
tableName: TableIdentifier,
444449
cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {
450+
451+
// These are list of statistics that can be collected quickly without requiring a scan of the data
452+
// see https://github.com/apache/hive/blob/master/
453+
// common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java
454+
val NUM_FILES = "numFiles"
455+
val TOTAL_SIZE = "totalSize"
456+
val DDL_TIME = "transient_lastDdlTime"
457+
458+
private def getPathFilter(hadoopConf: Configuration): PathFilter = {
459+
// Dummy jobconf to get to the pathFilter defined in configuration
460+
// It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
461+
val jobConf = new JobConf(hadoopConf, this.getClass)
462+
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
463+
new PathFilter {
464+
override def accept(path: Path): Boolean = {
465+
val name = path.getName
466+
if (name != "_SUCCESS" && name != "_temporary" && !name.startsWith(".")) {
467+
pathFilter == null || pathFilter.accept(path)
468+
} else {
469+
false
470+
}
471+
}
472+
}
473+
}
474+
445475
override def run(spark: SparkSession): Seq[Row] = {
446476
val catalog = spark.sessionState.catalog
447477
if (!catalog.tableExists(tableName)) {
@@ -456,10 +486,6 @@ case class AlterTableRecoverPartitionsCommand(
456486
throw new AnalysisException(
457487
s"Operation not allowed: $cmd on datasource tables: $tableName")
458488
}
459-
if (table.tableType != CatalogTableType.EXTERNAL) {
460-
throw new AnalysisException(
461-
s"Operation not allowed: $cmd only works on external tables: $tableName")
462-
}
463489
if (!DDLUtils.isTablePartitioned(table)) {
464490
throw new AnalysisException(
465491
s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
@@ -470,19 +496,26 @@ case class AlterTableRecoverPartitionsCommand(
470496
}
471497

472498
val root = new Path(table.storage.locationUri.get)
499+
logInfo(s"Recover all the partitions in $root")
473500
val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
474-
// Dummy jobconf to get to the pathFilter defined in configuration
475-
// It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
476-
val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, this.getClass)
477-
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
501+
502+
val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
503+
val hadoopConf = spark.sparkContext.hadoopConfiguration
504+
val pathFilter = getPathFilter(hadoopConf)
478505
val partitionSpecsAndLocs = scanPartitions(
479-
spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase))
480-
val parts = partitionSpecsAndLocs.map { case (spec, location) =>
481-
// inherit table storage format (possibly except for location)
482-
CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
506+
spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase), threshold)
507+
val total = partitionSpecsAndLocs.length
508+
logInfo(s"Found $total partitions in $root")
509+
510+
val partitionStats = if (spark.sqlContext.conf.gatherFastStats) {
511+
gatherPartitionStats(spark, partitionSpecsAndLocs, fs, pathFilter, threshold)
512+
} else {
513+
GenMap.empty[String, PartitionStatistics]
483514
}
484-
spark.sessionState.catalog.createPartitions(tableName,
485-
parts.toArray[CatalogTablePartition], ignoreIfExists = true)
515+
logInfo(s"Finished to gather the fast stats for all $total partitions.")
516+
517+
addPartitions(spark, table, partitionSpecsAndLocs, partitionStats)
518+
logInfo(s"Recovered all partitions ($total).")
486519
Seq.empty[Row]
487520
}
488521

@@ -494,15 +527,16 @@ case class AlterTableRecoverPartitionsCommand(
494527
filter: PathFilter,
495528
path: Path,
496529
spec: TablePartitionSpec,
497-
partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
498-
if (partitionNames.length == 0) {
530+
partitionNames: Seq[String],
531+
threshold: Int): GenSeq[(TablePartitionSpec, Path)] = {
532+
if (partitionNames.isEmpty) {
499533
return Seq(spec -> path)
500534
}
501535

502-
val statuses = fs.listStatus(path)
503-
val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
536+
val statuses = fs.listStatus(path, filter)
504537
val statusPar: GenSeq[FileStatus] =
505538
if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
539+
// parallelize the list of partitions here, then we can have better parallelism later.
506540
val parArray = statuses.par
507541
parArray.tasksupport = evalTaskSupport
508542
parArray
@@ -517,21 +551,89 @@ case class AlterTableRecoverPartitionsCommand(
517551
// TODO: Validate the value
518552
val value = PartitioningUtils.unescapePathName(ps(1))
519553
// comparing with case-insensitive, but preserve the case
520-
if (columnName == partitionNames(0)) {
521-
scanPartitions(
522-
spark, fs, filter, st.getPath, spec ++ Map(columnName -> value), partitionNames.drop(1))
554+
if (columnName == partitionNames.head) {
555+
scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(columnName -> value),
556+
partitionNames.drop(1), threshold)
523557
} else {
524-
logWarning(s"expect partition column ${partitionNames(0)}, but got ${ps(0)}, ignore it")
558+
logWarning(s"expect partition column ${partitionNames.head}, but got ${ps(0)}, ignore it")
525559
Seq()
526560
}
527561
} else {
528-
if (name != "_SUCCESS" && name != "_temporary" && !name.startsWith(".")) {
529-
logWarning(s"ignore ${new Path(path, name)}")
530-
}
562+
logWarning(s"ignore ${new Path(path, name)}")
531563
Seq()
532564
}
533565
}
534566
}
567+
568+
private def gatherPartitionStats(
569+
spark: SparkSession,
570+
partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)],
571+
fs: FileSystem,
572+
pathFilter: PathFilter,
573+
threshold: Int): GenMap[String, PartitionStatistics] = {
574+
if (partitionSpecsAndLocs.length > threshold) {
575+
val hadoopConf = spark.sparkContext.hadoopConfiguration
576+
val serializableConfiguration = new SerializableConfiguration(hadoopConf)
577+
val serializedPaths = partitionSpecsAndLocs.map(_._2.toString).toArray
578+
579+
// Set the number of parallelism to prevent following file listing from generating many tasks
580+
// in case of large #defaultParallelism.
581+
val numParallelism = Math.min(serializedPaths.length,
582+
Math.min(spark.sparkContext.defaultParallelism, 10000))
583+
// gather the fast stats for all the partitions otherwise Hive metastore will list all the
584+
// files for all the new partitions in sequential way, which is super slow.
585+
logInfo(s"Gather the fast stats in parallel using $numParallelism tasks.")
586+
spark.sparkContext.parallelize(serializedPaths, numParallelism)
587+
.mapPartitions { paths =>
588+
val pathFilter = getPathFilter(serializableConfiguration.value)
589+
paths.map(new Path(_)).map{ path =>
590+
val fs = path.getFileSystem(serializableConfiguration.value)
591+
val statuses = fs.listStatus(path, pathFilter)
592+
(path.toString, PartitionStatistics(statuses.length, statuses.map(_.getLen).sum))
593+
}
594+
}.collectAsMap()
595+
} else {
596+
partitionSpecsAndLocs.map { case (_, location) =>
597+
val statuses = fs.listStatus(location, pathFilter)
598+
(location.toString, PartitionStatistics(statuses.length, statuses.map(_.getLen).sum))
599+
}.toMap
600+
}
601+
}
602+
603+
private def addPartitions(
604+
spark: SparkSession,
605+
table: CatalogTable,
606+
partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)],
607+
partitionStats: GenMap[String, PartitionStatistics]): Unit = {
608+
val total = partitionSpecsAndLocs.length
609+
var done = 0L
610+
// Hive metastore may not have enough memory to handle millions of partitions in single RPC,
611+
// we should split them into smaller batches. Since Hive client is not thread safe, we cannot
612+
// do this in parallel.
613+
val batchSize = 100
614+
partitionSpecsAndLocs.toIterator.grouped(batchSize).foreach { batch =>
615+
val now = System.currentTimeMillis() / 1000
616+
val parts = batch.map { case (spec, location) =>
617+
val params = partitionStats.get(location.toString).map {
618+
case PartitionStatistics(numFiles, totalSize) =>
619+
// This two fast stat could prevent Hive metastore to list the files again.
620+
Map(NUM_FILES -> numFiles.toString,
621+
TOTAL_SIZE -> totalSize.toString,
622+
// Workaround a bug in HiveMetastore that try to mutate a read-only parameters.
623+
// see metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
624+
DDL_TIME -> now.toString)
625+
}.getOrElse(Map.empty)
626+
// inherit table storage format (possibly except for location)
627+
CatalogTablePartition(
628+
spec,
629+
table.storage.copy(locationUri = Some(location.toUri.toString)),
630+
params)
631+
}
632+
spark.sessionState.catalog.createPartitions(tableName, parts, ignoreIfExists = true)
633+
done += parts.length
634+
logDebug(s"Recovered ${parts.length} partitions ($done/$total so far)")
635+
}
636+
}
535637
}
536638

537639

sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,14 @@ object SQLConf {
319319
.booleanConf
320320
.createWithDefault(false)
321321

322+
val GATHER_FASTSTAT = SQLConfigBuilder("spark.sql.hive.gatherFastStats")
323+
.internal()
324+
.doc("When true, fast stats (number of files and total size of all files) will be gathered" +
325+
" in parallel while repairing table partitions to avoid the sequential listing in Hive" +
326+
" metastore.")
327+
.booleanConf
328+
.createWithDefault(true)
329+
322330
// This is used to control the when we will split a schema's JSON string to multiple pieces
323331
// in order to fit the JSON string in metastore's table property (by default, the value has
324332
// a length restriction of 4000 characters). We will split the JSON string of a schema
@@ -616,6 +624,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
616624

617625
def nativeView: Boolean = getConf(NATIVE_VIEW)
618626

627+
def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT)
628+
619629
def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED)
620630

621631
def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS)

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -643,13 +643,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
643643
}
644644

645645
test("alter table: recover partitions (sequential)") {
646-
withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
646+
withSQLConf("spark.rdd.parallelListingThreshold" -> "10") {
647647
testRecoverPartitions()
648648
}
649649
}
650650

651651
test("alter table: recover partition (parallel)") {
652-
withSQLConf("spark.rdd.parallelListingThreshold" -> "10") {
652+
withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
653653
testRecoverPartitions()
654654
}
655655
}
@@ -672,7 +672,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
672672
val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
673673
// valid
674674
fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
675+
fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "a.csv")) // file
676+
fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "_SUCCESS")) // file
675677
fs.mkdirs(new Path(new Path(root, "A=2"), "B=6"))
678+
fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "b.csv")) // file
679+
fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "c.csv")) // file
680+
fs.createNewFile(new Path(new Path(root, "A=2/B=6"), ".hiddenFile")) // file
681+
fs.mkdirs(new Path(new Path(root, "A=2/B=6"), "_temporary"))
682+
676683
// invalid
677684
fs.mkdirs(new Path(new Path(root, "a"), "b")) // bad name
678685
fs.mkdirs(new Path(new Path(root, "b=1"), "a=1")) // wrong order
@@ -686,6 +693,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
686693
sql("ALTER TABLE tab1 RECOVER PARTITIONS")
687694
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
688695
Set(part1, part2))
696+
assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") == "1")
697+
assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2")
689698
} finally {
690699
fs.delete(root, true)
691700
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -835,6 +835,8 @@ private[hive] class HiveClientImpl(
835835
serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib),
836836
compressed = apiPartition.getSd.isCompressed,
837837
serdeProperties = Option(apiPartition.getSd.getSerdeInfo.getParameters)
838-
.map(_.asScala.toMap).orNull))
838+
.map(_.asScala.toMap).orNull),
839+
parameters =
840+
if (hp.getParameters() != null) hp.getParameters().asScala.toMap else Map.empty)
839841
}
840842
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ private[client] class Shim_v0_12 extends Shim with Logging {
251251
val table = hive.getTable(database, tableName)
252252
parts.foreach { s =>
253253
val location = s.storage.locationUri.map(new Path(table.getPath, _)).orNull
254+
val params = if (s.parameters.nonEmpty) s.parameters.asJava else null
254255
val spec = s.spec.asJava
255256
if (hive.getPartition(table, spec, false) != null && ignoreIfExists) {
256257
// Ignore this partition since it already exists and ignoreIfExists == true
@@ -264,7 +265,7 @@ private[client] class Shim_v0_12 extends Shim with Logging {
264265
table,
265266
spec,
266267
location,
267-
null, // partParams
268+
params, // partParams
268269
null, // inputFormat
269270
null, // outputFormat
270271
-1: JInteger, // numBuckets
@@ -417,8 +418,11 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
417418
parts: Seq[CatalogTablePartition],
418419
ignoreIfExists: Boolean): Unit = {
419420
val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists)
420-
parts.foreach { s =>
421+
parts.zipWithIndex.foreach { case (s, i) =>
421422
addPartitionDesc.addPartition(s.spec.asJava, s.storage.locationUri.orNull)
423+
if (s.parameters.nonEmpty) {
424+
addPartitionDesc.getPartition(i).setPartParams(s.parameters.asJava)
425+
}
422426
}
423427
hive.createPartitions(addPartitionDesc)
424428
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,44 @@ class HiveDDLSuite
368368
expectedSerdeProps)
369369
}
370370

371+
test("MSCK REPAIR RABLE") {
372+
val catalog = spark.sessionState.catalog
373+
val tableIdent = TableIdentifier("tab1")
374+
sql("CREATE TABLE tab1 (height INT, length INT) PARTITIONED BY (a INT, b INT)")
375+
val part1 = Map("a" -> "1", "b" -> "5")
376+
val part2 = Map("a" -> "2", "b" -> "6")
377+
val root = new Path(catalog.getTableMetadata(tableIdent).storage.locationUri.get)
378+
val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
379+
// valid
380+
fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
381+
fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "a.csv")) // file
382+
fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "_SUCCESS")) // file
383+
fs.mkdirs(new Path(new Path(root, "A=2"), "B=6"))
384+
fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "b.csv")) // file
385+
fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "c.csv")) // file
386+
fs.createNewFile(new Path(new Path(root, "A=2/B=6"), ".hiddenFile")) // file
387+
fs.mkdirs(new Path(new Path(root, "A=2/B=6"), "_temporary"))
388+
389+
// invalid
390+
fs.mkdirs(new Path(new Path(root, "a"), "b")) // bad name
391+
fs.mkdirs(new Path(new Path(root, "b=1"), "a=1")) // wrong order
392+
fs.mkdirs(new Path(root, "a=4")) // not enough columns
393+
fs.createNewFile(new Path(new Path(root, "a=1"), "b=4")) // file
394+
fs.createNewFile(new Path(new Path(root, "a=1"), "_SUCCESS")) // _SUCCESS
395+
fs.mkdirs(new Path(new Path(root, "a=1"), "_temporary")) // _temporary
396+
fs.mkdirs(new Path(new Path(root, "a=1"), ".b=4")) // start with .
397+
398+
try {
399+
sql("MSCK REPAIR TABLE tab1")
400+
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
401+
Set(part1, part2))
402+
assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") == "1")
403+
assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2")
404+
} finally {
405+
fs.delete(root, true)
406+
}
407+
}
408+
371409
test("drop table using drop view") {
372410
withTable("tab1") {
373411
sql("CREATE TABLE tab1(c1 int)")

0 commit comments

Comments
 (0)