From 8bbfe24d8265c1d7af6553c865dd72b87339d827 Mon Sep 17 00:00:00 2001 From: jiake Date: Sat, 22 Feb 2020 10:16:01 +0800 Subject: [PATCH] remove the max splits cofig in skewed join --- .../apache/spark/sql/internal/SQLConf.scala | 8 ----- .../adaptive/OptimizeSkewedJoin.scala | 8 ++--- .../adaptive/AdaptiveQueryExecSuite.scala | 31 ++++++++++--------- 3 files changed, 18 insertions(+), 29 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 7f3cbe31b5e02..894cb7b1392a6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -447,14 +447,6 @@ object SQLConf { .intConf .createWithDefault(10) - val ADAPTIVE_EXECUTION_SKEWED_PARTITION_MAX_SPLITS = - buildConf("spark.sql.adaptive.skewedJoinOptimization.skewedPartitionMaxSplits") - .doc("Configures the maximum number of task to handle a skewed partition in adaptive skewed" + - "join.") - .intConf - .checkValue( _ >= 1, "The split size at least be 1") - .createWithDefault(5) - val NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN = buildConf("spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin") .doc("The relation with a non-empty partition ratio lower than this config will not be " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala index a716497c274b8..578d2d744f85c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala @@ -75,9 +75,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { private def getMapStartIndices(stage: ShuffleQueryStageExec, partitionId: Int): Array[Int] = { val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId) - val maxSplits = math.min(conf.getConf( - SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_MAX_SPLITS), mapPartitionSizes.length) - val avgPartitionSize = mapPartitionSizes.sum / maxSplits + val avgPartitionSize = mapPartitionSizes.sum / mapPartitionSizes.length val advisoryPartitionSize = math.max(avgPartitionSize, conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD)) val partitionStartIndices = ArrayBuffer[Int]() @@ -95,9 +93,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { i += 1 } - if (partitionStartIndices.size > maxSplits) { - partitionStartIndices.take(maxSplits).toArray - } else partitionStartIndices.toArray + partitionStartIndices.toArray } private def getStatistics(stage: ShuffleQueryStageExec): MapOutputStatistics = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 4edb35ea30fde..379e17f01d6be 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -609,7 +609,7 @@ class AdaptiveQueryExecSuite withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", - SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD.key -> "100", + SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD.key -> "2000", SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "700") { withTempView("skewData1", "skewData2") { spark @@ -636,14 +636,15 @@ class AdaptiveQueryExecSuite "SELECT * FROM skewData1 join skewData2 ON key1 = key2") // left stats: [3496, 0, 0, 0, 4014] // right stats:[6292, 0, 0, 0, 0] - // Partition 0: both left and right sides are skewed, and divide into 5 splits, so - // 5 x 5 sub-partitions. + // Partition 0: both left and right sides are skewed, left side is divided + // into 2 splits and right side is divided into 4 splits, so + // 2 x 4 sub-partitions. // Partition 1, 2, 3: not skewed, and coalesced into 1 partition. - // Partition 4: only left side is skewed, and divide into 5 splits, so - // 5 sub-partitions. - // So total (25 + 1 + 5) partitions. + // Partition 4: only left side is skewed, and divide into 3 splits, so + // 3 sub-partitions. + // So total (8 + 1 + 3) partitions. val innerSmj = findTopLevelSortMergeJoin(innerAdaptivePlan) - checkSkewJoin(innerSmj, 25 + 1 + 5) + checkSkewJoin(innerSmj, 8 + 1 + 3) // skewed left outer join optimization val (_, leftAdaptivePlan) = runAdaptiveAndVerifyResult( @@ -651,13 +652,13 @@ class AdaptiveQueryExecSuite // left stats: [3496, 0, 0, 0, 4014] // right stats:[6292, 0, 0, 0, 0] // Partition 0: both left and right sides are skewed, but left join can't split right side, - // so only left side is divided into 5 splits, and thus 5 sub-partitions. + // so only left side is divided into 2 splits, and thus 2 sub-partitions. // Partition 1, 2, 3: not skewed, and coalesced into 1 partition. - // Partition 4: only left side is skewed, and divide into 5 splits, so - // 5 sub-partitions. - // So total (5 + 1 + 5) partitions. + // Partition 4: only left side is skewed, and divide into 3 splits, so + // 3 sub-partitions. + // So total (2 + 1 + 3) partitions. val leftSmj = findTopLevelSortMergeJoin(leftAdaptivePlan) - checkSkewJoin(leftSmj, 5 + 1 + 5) + checkSkewJoin(leftSmj, 2 + 1 + 3) // skewed right outer join optimization val (_, rightAdaptivePlan) = runAdaptiveAndVerifyResult( @@ -665,13 +666,13 @@ class AdaptiveQueryExecSuite // left stats: [3496, 0, 0, 0, 4014] // right stats:[6292, 0, 0, 0, 0] // Partition 0: both left and right sides are skewed, but right join can't split left side, - // so only right side is divided into 5 splits, and thus 5 sub-partitions. + // so only right side is divided into 4 splits, and thus 4 sub-partitions. // Partition 1, 2, 3: not skewed, and coalesced into 1 partition. // Partition 4: only left side is skewed, but right join can't split left side, so just // 1 partition. - // So total (5 + 1 + 1) partitions. + // So total (4 + 1 + 1) partitions. val rightSmj = findTopLevelSortMergeJoin(rightAdaptivePlan) - checkSkewJoin(rightSmj, 5 + 1 + 1) + checkSkewJoin(rightSmj, 4 + 1 + 1) } } }