Skip to content

Commit d1465a1

Browse files
xuanyuankingcloud-fan
authored andcommitted
[SPARK-30074][SQL] The maxNumPostShufflePartitions config should obey reducePostShufflePartitions enabled
### What changes were proposed in this pull request? 1. Make maxNumPostShufflePartitions config obey reducePostShfflePartitions config. 2. Update the description for all the SQLConf affected by `spark.sql.adaptive.enabled`. ### Why are the changes needed? Make the relation between these confs clearer. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Existing UT. Closes #26664 from xuanyuanking/SPARK-9853-follow. Authored-by: Yuanjian Li <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 5a1896a commit d1465a1

File tree

2 files changed

+30
-21
lines changed

2 files changed

+30
-21
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -353,59 +353,68 @@ object SQLConf {
353353
.booleanConf
354354
.createWithDefault(false)
355355

356-
val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE =
357-
buildConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize")
358-
.doc("The target post-shuffle input size in bytes of a task.")
359-
.bytesConf(ByteUnit.BYTE)
360-
.createWithDefault(64 * 1024 * 1024)
356+
val REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED =
357+
buildConf("spark.sql.adaptive.shuffle.reducePostShufflePartitions.enabled")
358+
.doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled, this enables reducing " +
359+
"the number of post-shuffle partitions based on map output statistics.")
360+
.booleanConf
361+
.createWithDefault(true)
361362

362363
val FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED =
363364
buildConf("spark.sql.adaptive.shuffle.fetchShuffleBlocksInBatch.enabled")
364365
.doc("Whether to fetch the continuous shuffle blocks in batch. Instead of fetching blocks " +
365366
"one by one, fetching continuous shuffle blocks for the same map task in batch can " +
366-
"reduce IO and improve performance. Note, this feature also depends on a relocatable " +
367-
"serializer and the concatenation support codec in use.")
367+
"reduce IO and improve performance. Note, multiple continuous blocks exist in single " +
368+
s"fetch request only happen when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
369+
s"'${REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key}' is enabled, this feature also depends " +
370+
"on a relocatable serializer and the concatenation support codec in use.")
368371
.booleanConf
369372
.createWithDefault(true)
370373

371-
val REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED =
372-
buildConf("spark.sql.adaptive.shuffle.reducePostShufflePartitions.enabled")
373-
.doc("When true and adaptive execution is enabled, this enables reducing the number of " +
374-
"post-shuffle partitions based on map output statistics.")
375-
.booleanConf
376-
.createWithDefault(true)
377-
378374
val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS =
379375
buildConf("spark.sql.adaptive.shuffle.minNumPostShufflePartitions")
380-
.doc("The advisory minimum number of post-shuffle partitions used in adaptive execution.")
376+
.doc("The advisory minimum number of post-shuffle partitions used when " +
377+
s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
378+
s"'${REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key}' is enabled.")
381379
.intConf
382380
.checkValue(_ > 0, "The minimum shuffle partition number " +
383381
"must be a positive integer.")
384382
.createWithDefault(1)
385383

384+
val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE =
385+
buildConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize")
386+
.doc("The target post-shuffle input size in bytes of a task. This configuration only has " +
387+
s"an effect when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
388+
s"'${REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key}' is enabled.")
389+
.bytesConf(ByteUnit.BYTE)
390+
.createWithDefault(64 * 1024 * 1024)
391+
386392
val SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS =
387393
buildConf("spark.sql.adaptive.shuffle.maxNumPostShufflePartitions")
388394
.doc("The advisory maximum number of post-shuffle partitions used in adaptive execution. " +
389395
"This is used as the initial number of pre-shuffle partitions. By default it equals to " +
390-
"spark.sql.shuffle.partitions")
396+
"spark.sql.shuffle.partitions. This configuration only has an effect when " +
397+
s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
398+
s"'${REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key}' is enabled.")
391399
.intConf
392400
.checkValue(_ > 0, "The maximum shuffle partition number " +
393401
"must be a positive integer.")
394402
.createOptional
395403

396404
val LOCAL_SHUFFLE_READER_ENABLED =
397405
buildConf("spark.sql.adaptive.shuffle.localShuffleReader.enabled")
398-
.doc("When true and adaptive execution is enabled, this enables the optimization of" +
399-
" converting the shuffle reader to local shuffle reader for the shuffle exchange" +
400-
" of the broadcast hash join in probe side.")
406+
.doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled, this enables the " +
407+
"optimization of converting the shuffle reader to local shuffle reader for the shuffle " +
408+
"exchange of the broadcast hash join in probe side.")
401409
.booleanConf
402410
.createWithDefault(true)
403411

404412
val NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN =
405413
buildConf("spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin")
406414
.doc("The relation with a non-empty partition ratio lower than this config will not be " +
407415
"considered as the build side of a broadcast-hash join in adaptive execution regardless " +
408-
"of its size.")
416+
"of its size.This configuration only has an effect when " +
417+
s"'${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled.")
409418
.doubleConf
410419
.checkValue(_ >= 0, "The non-empty partition ratio must be positive number.")
411420
.createWithDefault(0.2)

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.apache.spark.sql.internal.SQLConf
3636
*/
3737
case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
3838
private def defaultNumPreShufflePartitions: Int =
39-
if (conf.adaptiveExecutionEnabled) {
39+
if (conf.adaptiveExecutionEnabled && conf.reducePostShufflePartitionsEnabled) {
4040
conf.maxNumPostShufflePartitions
4141
} else {
4242
conf.numShufflePartitions

0 commit comments

Comments
 (0)