Skip to content

Commit 48caec2

Browse files
Davies Liudavies
authored andcommitted
[SPARK-17063] [SQL] Improve performance of MSCK REPAIR TABLE with Hive metastore
## What changes were proposed in this pull request? This PR split the the single `createPartitions()` call into smaller batches, which could prevent Hive metastore from OOM (caused by millions of partitions). It will also try to gather all the fast stats (number of files and total size of all files) in parallel to avoid the bottle neck of listing the files in metastore sequential, which is controlled by spark.sql.gatherFastStats (enabled by default). ## How was this patch tested? Tested locally with 10000 partitions and 100 files with embedded metastore, without gathering fast stats in parallel, adding partitions took 153 seconds, after enable that, gathering the fast stats took about 34 seconds, adding these partitions took 25 seconds (most of the time spent in object store), 59 seconds in total, 2.5X faster (with larger cluster, gathering will much faster). Author: Davies Liu <[email protected]> Closes #14607 from davies/repair_batch.
1 parent 6a0fda2 commit 48caec2

File tree

7 files changed

+200
-33
lines changed

7 files changed

+200
-33
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,12 @@ object CatalogStorageFormat {
8181
*
8282
* @param spec partition spec values indexed by column name
8383
* @param storage storage format of the partition
84+
* @param parameters some parameters for the partition, for example, stats.
8485
*/
8586
case class CatalogTablePartition(
8687
spec: CatalogTypes.TablePartitionSpec,
87-
storage: CatalogStorageFormat)
88+
storage: CatalogStorageFormat,
89+
parameters: Map[String, String] = Map.empty)
8890

8991

9092
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 129 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@
1717

1818
package org.apache.spark.sql.execution.command
1919

20-
import scala.collection.GenSeq
20+
import scala.collection.{GenMap, GenSeq}
2121
import scala.collection.parallel.ForkJoinTaskSupport
2222
import scala.concurrent.forkjoin.ForkJoinPool
2323
import scala.util.control.NonFatal
2424

25-
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
25+
import org.apache.hadoop.conf.Configuration
26+
import org.apache.hadoop.fs._
2627
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
2728

2829
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
@@ -32,6 +33,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
3233
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
3334
import org.apache.spark.sql.execution.datasources.PartitioningUtils
3435
import org.apache.spark.sql.types._
36+
import org.apache.spark.util.SerializableConfiguration
3537

3638
// Note: The definition of these commands are based on the ones described in
3739
// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
@@ -422,6 +424,9 @@ case class AlterTableDropPartitionCommand(
422424

423425
}
424426

427+
428+
case class PartitionStatistics(numFiles: Int, totalSize: Long)
429+
425430
/**
426431
* Recover Partitions in ALTER TABLE: recover all the partition in the directory of a table and
427432
* update the catalog.
@@ -435,6 +440,31 @@ case class AlterTableDropPartitionCommand(
435440
case class AlterTableRecoverPartitionsCommand(
436441
tableName: TableIdentifier,
437442
cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {
443+
444+
// These are list of statistics that can be collected quickly without requiring a scan of the data
445+
// see https://github.com/apache/hive/blob/master/
446+
// common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java
447+
val NUM_FILES = "numFiles"
448+
val TOTAL_SIZE = "totalSize"
449+
val DDL_TIME = "transient_lastDdlTime"
450+
451+
private def getPathFilter(hadoopConf: Configuration): PathFilter = {
452+
// Dummy jobconf to get to the pathFilter defined in configuration
453+
// It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
454+
val jobConf = new JobConf(hadoopConf, this.getClass)
455+
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
456+
new PathFilter {
457+
override def accept(path: Path): Boolean = {
458+
val name = path.getName
459+
if (name != "_SUCCESS" && name != "_temporary" && !name.startsWith(".")) {
460+
pathFilter == null || pathFilter.accept(path)
461+
} else {
462+
false
463+
}
464+
}
465+
}
466+
}
467+
438468
override def run(spark: SparkSession): Seq[Row] = {
439469
val catalog = spark.sessionState.catalog
440470
if (!catalog.tableExists(tableName)) {
@@ -449,10 +479,6 @@ case class AlterTableRecoverPartitionsCommand(
449479
throw new AnalysisException(
450480
s"Operation not allowed: $cmd on datasource tables: $tableName")
451481
}
452-
if (table.tableType != CatalogTableType.EXTERNAL) {
453-
throw new AnalysisException(
454-
s"Operation not allowed: $cmd only works on external tables: $tableName")
455-
}
456482
if (table.partitionColumnNames.isEmpty) {
457483
throw new AnalysisException(
458484
s"Operation not allowed: $cmd only works on partitioned tables: $tableName")
@@ -463,19 +489,26 @@ case class AlterTableRecoverPartitionsCommand(
463489
}
464490

465491
val root = new Path(table.storage.locationUri.get)
492+
logInfo(s"Recover all the partitions in $root")
466493
val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
467-
// Dummy jobconf to get to the pathFilter defined in configuration
468-
// It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
469-
val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration, this.getClass)
470-
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
494+
495+
val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
496+
val hadoopConf = spark.sparkContext.hadoopConfiguration
497+
val pathFilter = getPathFilter(hadoopConf)
471498
val partitionSpecsAndLocs = scanPartitions(
472-
spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase))
473-
val parts = partitionSpecsAndLocs.map { case (spec, location) =>
474-
// inherit table storage format (possibly except for location)
475-
CatalogTablePartition(spec, table.storage.copy(locationUri = Some(location.toUri.toString)))
499+
spark, fs, pathFilter, root, Map(), table.partitionColumnNames.map(_.toLowerCase), threshold)
500+
val total = partitionSpecsAndLocs.length
501+
logInfo(s"Found $total partitions in $root")
502+
503+
val partitionStats = if (spark.sqlContext.conf.gatherFastStats) {
504+
gatherPartitionStats(spark, partitionSpecsAndLocs, fs, pathFilter, threshold)
505+
} else {
506+
GenMap.empty[String, PartitionStatistics]
476507
}
477-
spark.sessionState.catalog.createPartitions(tableName,
478-
parts.toArray[CatalogTablePartition], ignoreIfExists = true)
508+
logInfo(s"Finished to gather the fast stats for all $total partitions.")
509+
510+
addPartitions(spark, table, partitionSpecsAndLocs, partitionStats)
511+
logInfo(s"Recovered all partitions ($total).")
479512
Seq.empty[Row]
480513
}
481514

@@ -487,15 +520,16 @@ case class AlterTableRecoverPartitionsCommand(
487520
filter: PathFilter,
488521
path: Path,
489522
spec: TablePartitionSpec,
490-
partitionNames: Seq[String]): GenSeq[(TablePartitionSpec, Path)] = {
491-
if (partitionNames.length == 0) {
523+
partitionNames: Seq[String],
524+
threshold: Int): GenSeq[(TablePartitionSpec, Path)] = {
525+
if (partitionNames.isEmpty) {
492526
return Seq(spec -> path)
493527
}
494528

495-
val statuses = fs.listStatus(path)
496-
val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt
529+
val statuses = fs.listStatus(path, filter)
497530
val statusPar: GenSeq[FileStatus] =
498531
if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
532+
// parallelize the list of partitions here, then we can have better parallelism later.
499533
val parArray = statuses.par
500534
parArray.tasksupport = evalTaskSupport
501535
parArray
@@ -510,21 +544,89 @@ case class AlterTableRecoverPartitionsCommand(
510544
// TODO: Validate the value
511545
val value = PartitioningUtils.unescapePathName(ps(1))
512546
// comparing with case-insensitive, but preserve the case
513-
if (columnName == partitionNames(0)) {
514-
scanPartitions(
515-
spark, fs, filter, st.getPath, spec ++ Map(columnName -> value), partitionNames.drop(1))
547+
if (columnName == partitionNames.head) {
548+
scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(columnName -> value),
549+
partitionNames.drop(1), threshold)
516550
} else {
517-
logWarning(s"expect partition column ${partitionNames(0)}, but got ${ps(0)}, ignore it")
551+
logWarning(s"expect partition column ${partitionNames.head}, but got ${ps(0)}, ignore it")
518552
Seq()
519553
}
520554
} else {
521-
if (name != "_SUCCESS" && name != "_temporary" && !name.startsWith(".")) {
522-
logWarning(s"ignore ${new Path(path, name)}")
523-
}
555+
logWarning(s"ignore ${new Path(path, name)}")
524556
Seq()
525557
}
526558
}
527559
}
560+
561+
private def gatherPartitionStats(
562+
spark: SparkSession,
563+
partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)],
564+
fs: FileSystem,
565+
pathFilter: PathFilter,
566+
threshold: Int): GenMap[String, PartitionStatistics] = {
567+
if (partitionSpecsAndLocs.length > threshold) {
568+
val hadoopConf = spark.sparkContext.hadoopConfiguration
569+
val serializableConfiguration = new SerializableConfiguration(hadoopConf)
570+
val serializedPaths = partitionSpecsAndLocs.map(_._2.toString).toArray
571+
572+
// Set the number of parallelism to prevent following file listing from generating many tasks
573+
// in case of large #defaultParallelism.
574+
val numParallelism = Math.min(serializedPaths.length,
575+
Math.min(spark.sparkContext.defaultParallelism, 10000))
576+
// gather the fast stats for all the partitions otherwise Hive metastore will list all the
577+
// files for all the new partitions in sequential way, which is super slow.
578+
logInfo(s"Gather the fast stats in parallel using $numParallelism tasks.")
579+
spark.sparkContext.parallelize(serializedPaths, numParallelism)
580+
.mapPartitions { paths =>
581+
val pathFilter = getPathFilter(serializableConfiguration.value)
582+
paths.map(new Path(_)).map{ path =>
583+
val fs = path.getFileSystem(serializableConfiguration.value)
584+
val statuses = fs.listStatus(path, pathFilter)
585+
(path.toString, PartitionStatistics(statuses.length, statuses.map(_.getLen).sum))
586+
}
587+
}.collectAsMap()
588+
} else {
589+
partitionSpecsAndLocs.map { case (_, location) =>
590+
val statuses = fs.listStatus(location, pathFilter)
591+
(location.toString, PartitionStatistics(statuses.length, statuses.map(_.getLen).sum))
592+
}.toMap
593+
}
594+
}
595+
596+
private def addPartitions(
597+
spark: SparkSession,
598+
table: CatalogTable,
599+
partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)],
600+
partitionStats: GenMap[String, PartitionStatistics]): Unit = {
601+
val total = partitionSpecsAndLocs.length
602+
var done = 0L
603+
// Hive metastore may not have enough memory to handle millions of partitions in single RPC,
604+
// we should split them into smaller batches. Since Hive client is not thread safe, we cannot
605+
// do this in parallel.
606+
val batchSize = 100
607+
partitionSpecsAndLocs.toIterator.grouped(batchSize).foreach { batch =>
608+
val now = System.currentTimeMillis() / 1000
609+
val parts = batch.map { case (spec, location) =>
610+
val params = partitionStats.get(location.toString).map {
611+
case PartitionStatistics(numFiles, totalSize) =>
612+
// This two fast stat could prevent Hive metastore to list the files again.
613+
Map(NUM_FILES -> numFiles.toString,
614+
TOTAL_SIZE -> totalSize.toString,
615+
// Workaround a bug in HiveMetastore that try to mutate a read-only parameters.
616+
// see metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
617+
DDL_TIME -> now.toString)
618+
}.getOrElse(Map.empty)
619+
// inherit table storage format (possibly except for location)
620+
CatalogTablePartition(
621+
spec,
622+
table.storage.copy(locationUri = Some(location.toUri.toString)),
623+
params)
624+
}
625+
spark.sessionState.catalog.createPartitions(tableName, parts, ignoreIfExists = true)
626+
done += parts.length
627+
logDebug(s"Recovered ${parts.length} partitions ($done/$total so far)")
628+
}
629+
}
528630
}
529631

530632

sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,14 @@ object SQLConf {
310310
.booleanConf
311311
.createWithDefault(false)
312312

313+
val GATHER_FASTSTAT = SQLConfigBuilder("spark.sql.hive.gatherFastStats")
314+
.internal()
315+
.doc("When true, fast stats (number of files and total size of all files) will be gathered" +
316+
" in parallel while repairing table partitions to avoid the sequential listing in Hive" +
317+
" metastore.")
318+
.booleanConf
319+
.createWithDefault(true)
320+
313321
// This is used to control the when we will split a schema's JSON string to multiple pieces
314322
// in order to fit the JSON string in metastore's table property (by default, the value has
315323
// a length restriction of 4000 characters). We will split the JSON string of a schema
@@ -608,6 +616,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
608616

609617
def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)
610618

619+
def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT)
620+
611621
def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY)
612622

613623
def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED)

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -824,13 +824,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
824824
}
825825

826826
test("alter table: recover partitions (sequential)") {
827-
withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
827+
withSQLConf("spark.rdd.parallelListingThreshold" -> "10") {
828828
testRecoverPartitions()
829829
}
830830
}
831831

832832
test("alter table: recover partition (parallel)") {
833-
withSQLConf("spark.rdd.parallelListingThreshold" -> "10") {
833+
withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
834834
testRecoverPartitions()
835835
}
836836
}
@@ -853,7 +853,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
853853
val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
854854
// valid
855855
fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
856+
fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "a.csv")) // file
857+
fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "_SUCCESS")) // file
856858
fs.mkdirs(new Path(new Path(root, "A=2"), "B=6"))
859+
fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "b.csv")) // file
860+
fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "c.csv")) // file
861+
fs.createNewFile(new Path(new Path(root, "A=2/B=6"), ".hiddenFile")) // file
862+
fs.mkdirs(new Path(new Path(root, "A=2/B=6"), "_temporary"))
863+
857864
// invalid
858865
fs.mkdirs(new Path(new Path(root, "a"), "b")) // bad name
859866
fs.mkdirs(new Path(new Path(root, "b=1"), "a=1")) // wrong order
@@ -867,6 +874,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
867874
sql("ALTER TABLE tab1 RECOVER PARTITIONS")
868875
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
869876
Set(part1, part2))
877+
assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") == "1")
878+
assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2")
870879
} finally {
871880
fs.delete(root, true)
872881
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -829,6 +829,8 @@ private[hive] class HiveClientImpl(
829829
serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib),
830830
compressed = apiPartition.getSd.isCompressed,
831831
properties = Option(apiPartition.getSd.getSerdeInfo.getParameters)
832-
.map(_.asScala.toMap).orNull))
832+
.map(_.asScala.toMap).orNull),
833+
parameters =
834+
if (hp.getParameters() != null) hp.getParameters().asScala.toMap else Map.empty)
833835
}
834836
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,7 @@ private[client] class Shim_v0_12 extends Shim with Logging {
267267
val table = hive.getTable(database, tableName)
268268
parts.foreach { s =>
269269
val location = s.storage.locationUri.map(new Path(table.getPath, _)).orNull
270+
val params = if (s.parameters.nonEmpty) s.parameters.asJava else null
270271
val spec = s.spec.asJava
271272
if (hive.getPartition(table, spec, false) != null && ignoreIfExists) {
272273
// Ignore this partition since it already exists and ignoreIfExists == true
@@ -280,7 +281,7 @@ private[client] class Shim_v0_12 extends Shim with Logging {
280281
table,
281282
spec,
282283
location,
283-
null, // partParams
284+
params, // partParams
284285
null, // inputFormat
285286
null, // outputFormat
286287
-1: JInteger, // numBuckets
@@ -459,8 +460,11 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
459460
parts: Seq[CatalogTablePartition],
460461
ignoreIfExists: Boolean): Unit = {
461462
val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists)
462-
parts.foreach { s =>
463+
parts.zipWithIndex.foreach { case (s, i) =>
463464
addPartitionDesc.addPartition(s.spec.asJava, s.storage.locationUri.orNull)
465+
if (s.parameters.nonEmpty) {
466+
addPartitionDesc.getPartition(i).setPartParams(s.parameters.asJava)
467+
}
464468
}
465469
hive.createPartitions(addPartitionDesc)
466470
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,44 @@ class HiveDDLSuite
378378
expectedSerdeProps)
379379
}
380380

381+
test("MSCK REPAIR RABLE") {
382+
val catalog = spark.sessionState.catalog
383+
val tableIdent = TableIdentifier("tab1")
384+
sql("CREATE TABLE tab1 (height INT, length INT) PARTITIONED BY (a INT, b INT)")
385+
val part1 = Map("a" -> "1", "b" -> "5")
386+
val part2 = Map("a" -> "2", "b" -> "6")
387+
val root = new Path(catalog.getTableMetadata(tableIdent).storage.locationUri.get)
388+
val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
389+
// valid
390+
fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
391+
fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "a.csv")) // file
392+
fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "_SUCCESS")) // file
393+
fs.mkdirs(new Path(new Path(root, "A=2"), "B=6"))
394+
fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "b.csv")) // file
395+
fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "c.csv")) // file
396+
fs.createNewFile(new Path(new Path(root, "A=2/B=6"), ".hiddenFile")) // file
397+
fs.mkdirs(new Path(new Path(root, "A=2/B=6"), "_temporary"))
398+
399+
// invalid
400+
fs.mkdirs(new Path(new Path(root, "a"), "b")) // bad name
401+
fs.mkdirs(new Path(new Path(root, "b=1"), "a=1")) // wrong order
402+
fs.mkdirs(new Path(root, "a=4")) // not enough columns
403+
fs.createNewFile(new Path(new Path(root, "a=1"), "b=4")) // file
404+
fs.createNewFile(new Path(new Path(root, "a=1"), "_SUCCESS")) // _SUCCESS
405+
fs.mkdirs(new Path(new Path(root, "a=1"), "_temporary")) // _temporary
406+
fs.mkdirs(new Path(new Path(root, "a=1"), ".b=4")) // start with .
407+
408+
try {
409+
sql("MSCK REPAIR TABLE tab1")
410+
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
411+
Set(part1, part2))
412+
assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") == "1")
413+
assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2")
414+
} finally {
415+
fs.delete(root, true)
416+
}
417+
}
418+
381419
test("drop table using drop view") {
382420
withTable("tab1") {
383421
sql("CREATE TABLE tab1(c1 int)")

0 commit comments

Comments
 (0)