Skip to content

Commit 8a18bf7

Browse files
author
Davies Liu
committed
add config, add partition in sequential
1 parent a4d07db commit 8a18bf7

File tree

2 files changed

+28
-15
lines changed

2 files changed

+28
-15
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ case class AlterTableRecoverPartitionsCommand(
449449

450450
// These are two fast stats in Hive Metastore
451451
// see https://github.com/apache/hive/blob/master/
452-
// common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java#L88
452+
// common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java
453453
val NUM_FILES = "numFiles"
454454
val TOTAL_SIZE = "totalSize"
455455
val DDL_TIME = "transient_lastDdlTime"
@@ -506,8 +506,11 @@ case class AlterTableRecoverPartitionsCommand(
506506
val total = partitionSpecsAndLocs.length
507507
logInfo(s"Found $total partitions in $root")
508508

509-
val partitionStats = gatherPartitionStats(
510-
spark, partitionSpecsAndLocs, fs, pathFilter, threshold)
509+
val partitionStats = if (spark.sqlContext.conf.gatherFastStats) {
510+
gatherPartitionStats(spark, partitionSpecsAndLocs, fs, pathFilter, threshold)
511+
} else {
512+
GenMap.empty[String, (Int, Long)]
513+
}
511514
logInfo(s"Finished to gather the fast stats for all $total partitions.")
512515

513516
addPartitions(spark, table, partitionSpecsAndLocs, partitionStats)
@@ -574,7 +577,8 @@ case class AlterTableRecoverPartitionsCommand(
574577

575578
// Set the number of parallelism to prevent following file listing from generating many tasks
576579
// in case of large #defaultParallelism.
577-
val numParallelism = Math.min(serializedPaths.length, 10000)
580+
val numParallelism = Math.min(serializedPaths.length,
581+
Math.min(spark.sparkContext.defaultParallelism, 10000))
578582
// gather the fast stats for all the partitions otherwise Hive metastore will list all the
579583
// files for all the new partitions in sequential way, which is super slow.
580584
logInfo(s"Gather the fast stats in parallel using $numParallelism tasks.")
@@ -603,20 +607,20 @@ case class AlterTableRecoverPartitionsCommand(
603607
val total = partitionSpecsAndLocs.length
604608
var done = 0L
605609
// Hive metastore may not have enough memory to handle millions of partitions in single RPC,
606-
// we should split them into smaller batches.
607-
val parArray = partitionSpecsAndLocs.toArray.grouped(100).toArray.par
608-
parArray.tasksupport = evalTaskSupport
609-
parArray.foreach { batch =>
610+
// we should split them into smaller batches. Since Hive client is not thread safe, we cannot
611+
// do this in parallel.
612+
partitionSpecsAndLocs.toIterator.grouped(1000).foreach { batch =>
610613
val now = System.currentTimeMillis() / 1000
611614
val parts = batch.map { case (spec, location) =>
615+
val params = partitionStats.get(location.toString).map { case (numFiles, totalSize) =>
616+
// This two fast stat could prevent Hive metastore to list the files again.
617+
Map(NUM_FILES -> numFiles.toString,
618+
TOTAL_SIZE -> totalSize.toString,
619+
// Workaround a bug in HiveMetastore that try to mutate a read-only parameters.
620+
// see metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
621+
DDL_TIME -> now.toString)
622+
}.getOrElse(Map.empty)
612623
// inherit table storage format (possibly except for location)
613-
val (numFiles, totalSize) = partitionStats(location.toString)
614-
// This two fast stat could prevent Hive metastore to list the files again.
615-
val params = Map(NUM_FILES -> numFiles.toString,
616-
TOTAL_SIZE -> totalSize.toString,
617-
// Workaround a bug in HiveMetastore that try to mutate a read-only parameters.
618-
// see metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java:L2394
619-
DDL_TIME -> now.toString)
620624
CatalogTablePartition(
621625
spec,
622626
table.storage.copy(locationUri = Some(location.toUri.toString)),

sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,13 @@ object SQLConf {
310310
.booleanConf
311311
.createWithDefault(false)
312312

313+
val GATHER_FASTSTAT = SQLConfigBuilder("spark.sql.hive.gatherFastStats")
314+
.internal()
315+
.doc("When true, fast stats (number of files and total size of all files) will be gathered" +
316+
"while reparing table partitions.")
317+
.booleanConf
318+
.createWithDefault(false)
319+
313320
// This is used to control the when we will split a schema's JSON string to multiple pieces
314321
// in order to fit the JSON string in metastore's table property (by default, the value has
315322
// a length restriction of 4000 characters). We will split the JSON string of a schema
@@ -605,6 +612,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
605612

606613
def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING)
607614

615+
def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT)
616+
608617
def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY)
609618

610619
def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED)

0 commit comments

Comments
 (0)