From af4248b2d661d04fec89b37857a47713246d9465 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Mon, 23 Mar 2020 16:39:35 +0800 Subject: [PATCH 1/4] SPARK-31220 repartition obeys initialPartitionNum when adaptiveExecutionEnabled --- .../org/apache/spark/sql/internal/SQLConf.scala | 11 +++++++---- .../execution/exchange/EnsureRequirements.scala | 7 +------ .../adaptive/AdaptiveQueryExecSuite.scala | 16 ++++++++++++++++ 3 files changed, 24 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 9a524defb2816..be191488affc9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2701,7 +2701,13 @@ class SQLConf extends Serializable with Logging { def cacheVectorizedReaderEnabled: Boolean = getConf(CACHE_VECTORIZED_READER_ENABLED) - def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS) + def numShufflePartitions: Int = { + if (adaptiveExecutionEnabled && coalesceShufflePartitionsEnabled) { + getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(getConf(SHUFFLE_PARTITIONS)) + } else { + getConf(SHUFFLE_PARTITIONS) + } + } def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED) @@ -2714,9 +2720,6 @@ class SQLConf extends Serializable with Logging { def coalesceShufflePartitionsEnabled: Boolean = getConf(COALESCE_PARTITIONS_ENABLED) - def initialShufflePartitionNum: Int = - getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(numShufflePartitions) - def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN) def maxBatchesToRetainInMemory: Int = getConf(MAX_BATCHES_TO_RETAIN_IN_MEMORY) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 28ef793ed62db..918a5aee93214 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -35,12 +35,7 @@ import org.apache.spark.sql.internal.SQLConf * the input partition ordering requirements are met. */ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { - private def defaultNumPreShufflePartitions: Int = - if (conf.adaptiveExecutionEnabled && conf.coalesceShufflePartitionsEnabled) { - conf.initialShufflePartitionNum - } else { - conf.numShufflePartitions - } + private def defaultNumPreShufflePartitions: Int = conf.numShufflePartitions private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = { val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index fcca23d7751f0..3560a920bb631 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -780,5 +780,21 @@ class AdaptiveQueryExecSuite } } } + + test("SPARK-31220 repartition obeys initialPartitionNum when adaptiveExecutionEnabled") { + Seq(true, false).foreach { adaptiveExecutionEnabled => + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> s"$adaptiveExecutionEnabled", + SQLConf.SHUFFLE_PARTITIONS.key -> "6", + SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") { + val partitionsNum = spark.range(10).repartition($"id").rdd.collectPartitions().length + if (adaptiveExecutionEnabled) { + assert(partitionsNum === 7) + } else { + assert(partitionsNum === 6) + } + } + } + } } From 9a6fe6c7a7190f62ffb9b40e8e2def52e944cb8f Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Mon, 23 Mar 2020 18:51:38 +0800 Subject: [PATCH 2/4] Remove defaultNumPreShufflePartitions --- .../spark/sql/execution/exchange/EnsureRequirements.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 918a5aee93214..d472e3d9e5cea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -35,7 +35,6 @@ import org.apache.spark.sql.internal.SQLConf * the input partition ordering requirements are met. */ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { - private def defaultNumPreShufflePartitions: Int = conf.numShufflePartitions private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = { val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution @@ -52,7 +51,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { BroadcastExchangeExec(mode, child) case (child, distribution) => val numPartitions = distribution.requiredNumPartitions - .getOrElse(defaultNumPreShufflePartitions) + .getOrElse(conf.numShufflePartitions) ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child) } From abf3ef1e256fa50f3935dac233fda06af18bd2c4 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Mon, 23 Mar 2020 20:45:09 +0800 Subject: [PATCH 3/4] Fix test error --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 6 ++++-- .../spark/sql/execution/exchange/EnsureRequirements.scala | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index be191488affc9..5b3bdd39e9459 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2701,11 +2701,13 @@ class SQLConf extends Serializable with Logging { def cacheVectorizedReaderEnabled: Boolean = getConf(CACHE_VECTORIZED_READER_ENABLED) + def defaultNumShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS) + def numShufflePartitions: Int = { if (adaptiveExecutionEnabled && coalesceShufflePartitionsEnabled) { - getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(getConf(SHUFFLE_PARTITIONS)) + getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(defaultNumShufflePartitions) } else { - getConf(SHUFFLE_PARTITIONS) + defaultNumShufflePartitions } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index d472e3d9e5cea..3242ac21ab324 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -89,7 +89,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { // expected number of shuffle partitions. However, if it's smaller than // `conf.numShufflePartitions`, we pick `conf.numShufflePartitions` as the // expected number of shuffle partitions. - math.max(nonShuffleChildrenNumPartitions.max, conf.numShufflePartitions) + math.max(nonShuffleChildrenNumPartitions.max, conf.defaultNumShufflePartitions) } else { childrenNumPartitions.max } From c75244117705729b84c29ad6d2e41c249cc5036e Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Mon, 8 Jun 2020 21:06:12 +0800 Subject: [PATCH 4/4] adaptiveExecutionEnabled -> enableAQE --- .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 4bb206a9d6fe0..9fa97bffa8910 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1023,13 +1023,13 @@ class AdaptiveQueryExecSuite } test("SPARK-31220 repartition obeys initialPartitionNum when adaptiveExecutionEnabled") { - Seq(true, false).foreach { adaptiveExecutionEnabled => + Seq(true, false).foreach { enableAQE => withSQLConf( - SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> s"$adaptiveExecutionEnabled", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString, SQLConf.SHUFFLE_PARTITIONS.key -> "6", SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") { val partitionsNum = spark.range(10).repartition($"id").rdd.collectPartitions().length - if (adaptiveExecutionEnabled) { + if (enableAQE) { assert(partitionsNum === 7) } else { assert(partitionsNum === 6)