From 479d56be0a530d0d7fb13196b5120f0202cfc704 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Tue, 7 Jul 2020 15:00:44 +0800 Subject: [PATCH 01/13] [SPARK-32201][SQL] More general skew join pattern matching --- .../adaptive/OptimizeSkewedJoin.scala | 222 +++++++++++------- .../adaptive/AdaptiveQueryExecSuite.scala | 78 ++++-- 2 files changed, 196 insertions(+), 104 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala index 396c9c9d6b4e5..1bf72c22b3b59 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala @@ -144,6 +144,21 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { sizes.sum / sizes.length } + private def findShuffleStage(plan: SparkPlan): Option[ShuffleStageInfo] = { + plan collectFirst { + case _ @ ShuffleStage(shuffleStageInfo) => + shuffleStageInfo + } + } + + private def replaceSkewedShufleReader( + smj: SparkPlan, newCtm: CustomShuffleReaderExec): SparkPlan = { + smj transformUp { + case _ @ CustomShuffleReaderExec(child, _) if child.sameResult(newCtm.child) => + newCtm + } + } + /* * This method aim to optimize the skewed join with the following steps: * 1. Check whether the shuffle partition is skewed based on the median size @@ -158,95 +173,107 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { */ def optimizeSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp { case smj @ SortMergeJoinExec(_, _, joinType, _, - s1 @ SortExec(_, _, ShuffleStage(left: ShuffleStageInfo), _), - s2 @ SortExec(_, _, ShuffleStage(right: ShuffleStageInfo), _), _) + s1 @ SortExec(_, _, _, _), + s2 @ SortExec(_, _, _, _), _) if supportedJoinTypes.contains(joinType) => - assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length) - val numPartitions = left.partitionsWithSizes.length - // We use the median size of the original shuffle partitions to detect skewed partitions. - val leftMedSize = medianSize(left.mapStats) - val rightMedSize = medianSize(right.mapStats) - logDebug( - s""" - |Optimizing skewed join. - |Left side partitions size info: - |${getSizeInfo(leftMedSize, left.mapStats.bytesByPartitionId)} - |Right side partitions size info: - |${getSizeInfo(rightMedSize, right.mapStats.bytesByPartitionId)} - """.stripMargin) - val canSplitLeft = canSplitLeftSide(joinType) - val canSplitRight = canSplitRightSide(joinType) - // We use the actual partition sizes (may be coalesced) to calculate target size, so that - // the final data distribution is even (coalesced partitions + split partitions). - val leftActualSizes = left.partitionsWithSizes.map(_._2) - val rightActualSizes = right.partitionsWithSizes.map(_._2) - val leftTargetSize = targetSize(leftActualSizes, leftMedSize) - val rightTargetSize = targetSize(rightActualSizes, rightMedSize) - - val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec] - val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec] - var numSkewedLeft = 0 - var numSkewedRight = 0 - for (partitionIndex <- 0 until numPartitions) { - val leftActualSize = leftActualSizes(partitionIndex) - val isLeftSkew = isSkewed(leftActualSize, leftMedSize) && canSplitLeft - val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1 - val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex - - val rightActualSize = rightActualSizes(partitionIndex) - val isRightSkew = isSkewed(rightActualSize, rightMedSize) && canSplitRight - val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1 - val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex - - // A skewed partition should never be coalesced, but skip it here just to be safe. - val leftParts = if (isLeftSkew && !isLeftCoalesced) { - val reducerId = leftPartSpec.startReducerIndex - val skewSpecs = createSkewPartitionSpecs( - left.shuffleStage.shuffle.shuffleDependency.shuffleId, reducerId, leftTargetSize) - if (skewSpecs.isDefined) { - logDebug(s"Left side partition $partitionIndex " + - s"(${FileUtils.byteCountToDisplaySize(leftActualSize)}) is skewed, " + - s"split it into ${skewSpecs.get.length} parts.") - numSkewedLeft += 1 + // find the shuffleStage from the plan tree + val leftOpt = findShuffleStage(s1) + val rightOpt = findShuffleStage(s2) + if (leftOpt.isEmpty || rightOpt.isEmpty) { + smj + } else { + val left = leftOpt.get + val right = rightOpt.get + assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length) + val numPartitions = left.partitionsWithSizes.length + // We use the median size of the original shuffle partitions to detect skewed partitions. + val leftMedSize = medianSize(left.mapStats) + val rightMedSize = medianSize(right.mapStats) + logDebug( + s""" + |Optimizing skewed join. + |Left side partitions size info: + |${getSizeInfo(leftMedSize, left.mapStats.bytesByPartitionId)} + + |Right side partitio + + |${getSizeInfo(rightMedSize, right.mapStats.bytesByPartitionId)} + """.stripMargin) + val canSplitLeft = canSplitLeftSide(joinType) + val canSplitRight = canSplitRightSide(joinType) + // We use the actual partition sizes (may be coalesced) to calculate target size, so that + // the final data distribution is even (coalesced partitions + split partitions). + val leftActualSizes = left.partitionsWithSizes.map(_._2) + val rightActualSizes = right.partitionsWithSizes.map(_._2) + val leftTargetSize = targetSize(leftActualSizes, leftMedSize) + val rightTargetSize = targetSize(rightActualSizes, rightMedSize) + + val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec] + val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec] + var numSkewedLeft = 0 + var numSkewedRight = 0 + for (partitionIndex <- 0 until numPartitions) { + val leftActualSize = leftActualSizes(partitionIndex) + val isLeftSkew = isSkewed(leftActualSize, leftMedSize) && canSplitLeft + val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1 + val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex + + val rightActualSize = rightActualSizes(partitionIndex) + val isRightSkew = isSkewed(rightActualSize, rightMedSize) && canSplitRight + val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1 + val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex + + // A skewed partition should never be coalesced, but skip it here just to be safe. + val leftParts = if (isLeftSkew && !isLeftCoalesced) { + val reducerId = leftPartSpec.startReducerIndex + val skewSpecs = createSkewPartitionSpecs( + left.shuffleStage.shuffle.shuffleDependency.shuffleId, reducerId, leftTargetSize) + if (skewSpecs.isDefined) { + logDebug(s"Left side partition $partitionIndex " + + s"(${FileUtils.byteCountToDisplaySize(leftActualSize)}) is skewed, " + + s"split it into ${skewSpecs.get.length} parts.") + numSkewedLeft += 1 + } + skewSpecs.getOrElse(Seq(leftPartSpec)) + } else { + Seq(leftPartSpec) } - skewSpecs.getOrElse(Seq(leftPartSpec)) - } else { - Seq(leftPartSpec) - } - // A skewed partition should never be coalesced, but skip it here just to be safe. - val rightParts = if (isRightSkew && !isRightCoalesced) { - val reducerId = rightPartSpec.startReducerIndex - val skewSpecs = createSkewPartitionSpecs( - right.shuffleStage.shuffle.shuffleDependency.shuffleId, reducerId, rightTargetSize) - if (skewSpecs.isDefined) { - logDebug(s"Right side partition $partitionIndex " + - s"(${FileUtils.byteCountToDisplaySize(rightActualSize)}) is skewed, " + - s"split it into ${skewSpecs.get.length} parts.") - numSkewedRight += 1 + // A skewed partition should never be coalesced, but skip it here just to be safe. + val rightParts = if (isRightSkew && !isRightCoalesced) { + val reducerId = rightPartSpec.startReducerIndex + val skewSpecs = createSkewPartitionSpecs( + right.shuffleStage.shuffle.shuffleDependency.shuffleId, reducerId, rightTargetSize) + if (skewSpecs.isDefined) { + logDebug(s"Right side partition $partitionIndex " + + s"(${FileUtils.byteCountToDisplaySize(rightActualSize)}) is skewed, " + + s"split it into ${skewSpecs.get.length} parts.") + numSkewedRight += 1 + } + skewSpecs.getOrElse(Seq(rightPartSpec)) + } else { + Seq(rightPartSpec) } - skewSpecs.getOrElse(Seq(rightPartSpec)) - } else { - Seq(rightPartSpec) - } - for { - leftSidePartition <- leftParts - rightSidePartition <- rightParts - } { - leftSidePartitions += leftSidePartition - rightSidePartitions += rightSidePartition + for { + leftSidePartition <- leftParts + rightSidePartition <- rightParts + } { + leftSidePartitions += leftSidePartition + rightSidePartitions += rightSidePartition + } } - } - logDebug(s"number of skewed partitions: left $numSkewedLeft, right $numSkewedRight") - if (numSkewedLeft > 0 || numSkewedRight > 0) { - val newLeft = CustomShuffleReaderExec(left.shuffleStage, leftSidePartitions) - val newRight = CustomShuffleReaderExec(right.shuffleStage, rightSidePartitions) - smj.copy( - left = s1.copy(child = newLeft), right = s2.copy(child = newRight), isSkewJoin = true) - } else { - smj + logDebug(s"number of skewed partitions: left $numSkewedLeft, right $numSkewedRight") + if (numSkewedLeft > 0 || numSkewedRight > 0) { + val newLeft = CustomShuffleReaderExec(left.shuffleStage, leftSidePartitions) + val newRight = CustomShuffleReaderExec(right.shuffleStage, rightSidePartitions) + val newSmj = replaceSkewedShufleReader( + replaceSkewedShufleReader(smj, newLeft), newRight).asInstanceOf[SortMergeJoinExec] + newSmj.copy(isSkewJoin = true) + } else { + smj + } } } @@ -263,15 +290,19 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { val shuffleStages = collectShuffleStages(plan) if (shuffleStages.length == 2) { - // When multi table join, there will be too many complex combination to consider. - // Currently we only handle 2 table join like following use case. + // SPARK-32201. Skew join supports below pattern, ".." may contain any number of nodes, + // includes such as BroadcastHashJoinExec. So it can handle more than two tables join. // SMJ // Sort - // Shuffle + // .. + // Shuffle // Sort - // Shuffle + // .. + // Shuffle val optimizePlan = optimizeSkewJoin(plan) - val numShuffles = ensureRequirements.apply(optimizePlan).collect { + val ensuredPlan = ensureRequirements.apply(optimizePlan) + println(ensuredPlan) + val numShuffles = ensuredPlan.collect { case e: ShuffleExchangeExec => e }.length @@ -316,6 +347,23 @@ private object ShuffleStage { } Some(ShuffleStageInfo(s, mapStats, partitions)) + case _: LeafExecNode => None + + case _ @ UnaryExecNode((_, ShuffleStage(ss: ShuffleStageInfo))) => + Some(ss) + + case b: BinaryExecNode => + b.left match { + case _ @ ShuffleStage(ss: ShuffleStageInfo) => + Some(ss) + case _ => + b.right match { + case _ @ ShuffleStage(ss: ShuffleStageInfo) => + Some(ss) + case _ => None + } + } + case _ => None } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index c696d3f648ed1..7865ee8bb67a4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -692,23 +692,6 @@ class AdaptiveQueryExecSuite 'id as "value2") .createOrReplaceTempView("skewData2") - def checkSkewJoin( - joins: Seq[SortMergeJoinExec], - leftSkewNum: Int, - rightSkewNum: Int): Unit = { - assert(joins.size == 1 && joins.head.isSkewJoin) - assert(joins.head.left.collect { - case r: CustomShuffleReaderExec => r - }.head.partitionSpecs.collect { - case p: PartialReducerPartitionSpec => p.reducerIndex - }.distinct.length == leftSkewNum) - assert(joins.head.right.collect { - case r: CustomShuffleReaderExec => r - }.head.partitionSpecs.collect { - case p: PartialReducerPartitionSpec => p.reducerIndex - }.distinct.length == rightSkewNum) - } - // skewed inner join optimization val (_, innerAdaptivePlan) = runAdaptiveAndVerifyResult( "SELECT * FROM skewData1 join skewData2 ON key1 = key2") @@ -730,6 +713,67 @@ class AdaptiveQueryExecSuite } } + private def checkSkewJoin( + joins: Seq[SortMergeJoinExec], + leftSkewNum: Int, + rightSkewNum: Int): Unit = { + assert(joins.size == 1 && joins.head.isSkewJoin) + assert(joins.head.left.collect { + case r: CustomShuffleReaderExec => r + }.head.partitionSpecs.collect { + case p: PartialReducerPartitionSpec => p.reducerIndex + }.distinct.length == leftSkewNum) + assert(joins.head.right.collect { + case r: CustomShuffleReaderExec => r + }.head.partitionSpecs.collect { + case p: PartialReducerPartitionSpec => p.reducerIndex + }.distinct.length == rightSkewNum) + } + + test("SPARK-32201: handle general skew join pattern") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1", + SQLConf.SHUFFLE_PARTITIONS.key -> "100", + SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800", + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") { + withTempView("skewData1", "skewData2") { + spark + .range(0, 1000, 1, 10) + .select( + when('id < 250, 249) + .when('id >= 750, 1000) + .otherwise('id).as("key1"), + 'id as "value1") + .createOrReplaceTempView("skewData1") + + spark + .range(0, 1000, 1, 10) + .select( + when('id < 250, 249) + .otherwise('id).as("key2"), + 'id as "value2") + .createOrReplaceTempView("skewData2") + val sqlText = + """ + |SELECT * FROM + | skewData1 AS data1 + | INNER JOIN + | ( + | SELECT skewData2.key2, sum(skewData2.value2) AS sum2 + | FROM skewData2 GROUP BY skewData2.key2 + | ) AS data2 + |ON data1.key1 = data2.key2 + |""".stripMargin + + val (_, adaptivePlan) = runAdaptiveAndVerifyResult(sqlText) + val innerSmj = findTopLevelSortMergeJoin(adaptivePlan) + checkSkewJoin(innerSmj, 2, 0) + } + } + } + test("SPARK-30291: AQE should catch the exceptions when doing materialize") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { From 947927a067a0e3a22b4c1220baaac66d31724028 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Tue, 7 Jul 2020 15:38:04 +0800 Subject: [PATCH 02/13] remove the println --- .../spark/sql/execution/adaptive/OptimizeSkewedJoin.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala index 1bf72c22b3b59..11de8d40dc482 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala @@ -300,9 +300,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { // .. // Shuffle val optimizePlan = optimizeSkewJoin(plan) - val ensuredPlan = ensureRequirements.apply(optimizePlan) - println(ensuredPlan) - val numShuffles = ensuredPlan.collect { + val numShuffles = ensureRequirements.apply(optimizePlan).collect { case e: ShuffleExchangeExec => e }.length From 607eb0897f6403f800d29eacf56a07b8a698e02a Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Tue, 7 Jul 2020 20:00:29 +0800 Subject: [PATCH 03/13] cannot split if AggExec in one side --- .../execution/adaptive/OptimizeSkewedJoin.scala | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala index 11de8d40dc482..e32480574ef11 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala @@ -25,6 +25,7 @@ import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.aggregate.HashAggregateExec import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.SortMergeJoinExec import org.apache.spark.sql.internal.SQLConf @@ -130,13 +131,15 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { } } - private def canSplitLeftSide(joinType: JoinType) = { - joinType == Inner || joinType == Cross || joinType == LeftSemi || - joinType == LeftAnti || joinType == LeftOuter + private def canSplitLeftSide(joinType: JoinType, plan: SparkPlan) = { + (joinType == Inner || joinType == Cross || joinType == LeftSemi || + joinType == LeftAnti || joinType == LeftOuter) && + plan.find(_.isInstanceOf[HashAggregateExec]).isEmpty } - private def canSplitRightSide(joinType: JoinType) = { - joinType == Inner || joinType == Cross || joinType == RightOuter + private def canSplitRightSide(joinType: JoinType, plan: SparkPlan) = { + (joinType == Inner || joinType == Cross || joinType == RightOuter) && + plan.find(_.isInstanceOf[HashAggregateExec]).isEmpty } private def getSizeInfo(medianSize: Long, sizes: Seq[Long]): String = { @@ -199,8 +202,8 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { |${getSizeInfo(rightMedSize, right.mapStats.bytesByPartitionId)} """.stripMargin) - val canSplitLeft = canSplitLeftSide(joinType) - val canSplitRight = canSplitRightSide(joinType) + val canSplitLeft = canSplitLeftSide(joinType, s1) + val canSplitRight = canSplitRightSide(joinType, s2) // We use the actual partition sizes (may be coalesced) to calculate target size, so that // the final data distribution is even (coalesced partitions + split partitions). val leftActualSizes = left.partitionsWithSizes.map(_._2) From 80bef0d2c22bf91d216784f29839b95f22fb230f Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Wed, 8 Jul 2020 11:33:56 +0800 Subject: [PATCH 04/13] add more agg exec --- .../adaptive/OptimizeSkewedJoin.scala | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala index e32480574ef11..4b20e3d66dec2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala @@ -25,7 +25,7 @@ import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.aggregate.HashAggregateExec +import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.SortMergeJoinExec import org.apache.spark.sql.internal.SQLConf @@ -133,13 +133,21 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { private def canSplitLeftSide(joinType: JoinType, plan: SparkPlan) = { (joinType == Inner || joinType == Cross || joinType == LeftSemi || - joinType == LeftAnti || joinType == LeftOuter) && - plan.find(_.isInstanceOf[HashAggregateExec]).isEmpty + joinType == LeftAnti || joinType == LeftOuter) && !containsAggregateExec(plan) } private def canSplitRightSide(joinType: JoinType, plan: SparkPlan) = { - (joinType == Inner || joinType == Cross || joinType == RightOuter) && - plan.find(_.isInstanceOf[HashAggregateExec]).isEmpty + (joinType == Inner || joinType == Cross || + joinType == RightOuter) && !containsAggregateExec(plan) + } + + private def containsAggregateExec(plan: SparkPlan) = { + plan.find { + case _: HashAggregateExec => true + case _: SortAggregateExec => true + case _: ObjectHashAggregateExec => true + case _ => false + }.isDefined } private def getSizeInfo(medianSize: Long, sizes: Seq[Long]): String = { From dd09e702f79e39bf021dc9bc950f75e1bfef78f6 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Fri, 10 Jul 2020 16:12:32 +0800 Subject: [PATCH 05/13] Add a new parititioning CoalescedHashPartitioning --- .../plans/physical/partitioning.scala | 25 +++++++++++++++++++ .../adaptive/CustomShuffleReaderExec.scala | 10 +++++++- .../adaptive/AdaptiveQueryExecSuite.scala | 22 ++++++++++++++-- 3 files changed, 54 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 17e1cb416fc8a..72f12fea0666b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -340,3 +340,28 @@ case class BroadcastPartitioning(mode: BroadcastMode) extends Partitioning { case _ => false } } + +/** + * With AE, multiple partitions in hash partitioned output could be coalesced + * to a single partition. CoalescedHashPartitioning is designed for such case. + */ +case class CoalescedHashPartitioning( + expressions: Seq[Expression], + numPartitions: Int) + extends Expression with Partitioning with Unevaluable { + + override def children: Seq[Expression] = expressions + override def nullable: Boolean = false + override def dataType: DataType = IntegerType + + override def satisfies0(required: Distribution): Boolean = { + super.satisfies0(required) || { + required match { + case ClusteredDistribution(requiredClustering, requiredNumPartitions) => + expressions.forall(x => requiredClustering.exists(_.semanticEquals(x))) && + (requiredNumPartitions.isEmpty || requiredNumPartitions.get == numPartitions) + case _ => false + } + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala index b2633c774f532..339c58c956d1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala @@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} -import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.plans.physical.{CoalescedHashPartitioning, HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} @@ -65,6 +65,14 @@ case class CustomShuffleReaderExec private( case _ => throw new IllegalStateException("operating on canonicalization plan") } + } else if (partitionSpecs.nonEmpty && + partitionSpecs.forall(_.isInstanceOf[CoalescedPartitionSpec])) { + child match { + case ShuffleQueryStageExec(_, ShuffleExchangeExec(p: HashPartitioning, _, _)) => + CoalescedHashPartitioning(p.expressions, partitionSpecs.size) + case _ => + throw new IllegalStateException("operating on canonicalization plan") + } } else { UnknownPartitioning(partitionSpecs.length) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 7865ee8bb67a4..08c9cdb0b7a8a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -738,6 +738,24 @@ class AdaptiveQueryExecSuite SQLConf.SHUFFLE_PARTITIONS.key -> "100", SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800", SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") { + + // SMJ + // Sort + // CustomShuffleReader(coalesced) + // Shuffle + // Sort + // HashAggregate + // CustomShuffleReader(coalesced) + // Shuffle + // --> + // SMJ + // Sort + // CustomShuffleReader(coalesced and skew) + // Shuffle + // Sort + // HashAggregate + // CustomShuffleReader(coalesced) + // Shuffle withTempView("skewData1", "skewData2") { spark .range(0, 1000, 1, 10) @@ -747,7 +765,6 @@ class AdaptiveQueryExecSuite .otherwise('id).as("key1"), 'id as "value1") .createOrReplaceTempView("skewData1") - spark .range(0, 1000, 1, 10) .select( @@ -755,6 +772,7 @@ class AdaptiveQueryExecSuite .otherwise('id).as("key2"), 'id as "value2") .createOrReplaceTempView("skewData2") + val sqlText = """ |SELECT * FROM @@ -764,7 +782,7 @@ class AdaptiveQueryExecSuite | SELECT skewData2.key2, sum(skewData2.value2) AS sum2 | FROM skewData2 GROUP BY skewData2.key2 | ) AS data2 - |ON data1.key1 = data2.key2 + |ON data1.key1 = data2.key2 LIMIT 10 |""".stripMargin val (_, adaptivePlan) = runAdaptiveAndVerifyResult(sqlText) From 76dc3ea8aea6bc77b7bbeaaf1ee8d9b66fe97d27 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Sat, 11 Jul 2020 13:58:35 +0800 Subject: [PATCH 06/13] fix ut --- .../spark/sql/execution/adaptive/CustomShuffleReaderExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala index 339c58c956d1f..17be37b1fb27e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala @@ -71,7 +71,7 @@ case class CustomShuffleReaderExec private( case ShuffleQueryStageExec(_, ShuffleExchangeExec(p: HashPartitioning, _, _)) => CoalescedHashPartitioning(p.expressions, partitionSpecs.size) case _ => - throw new IllegalStateException("operating on canonicalization plan") + UnknownPartitioning(partitionSpecs.length) } } else { UnknownPartitioning(partitionSpecs.length) From 7465bfa2f0cc4a55e34f28600951a5e1290edf1c Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Wed, 15 Jul 2020 13:23:10 +0800 Subject: [PATCH 07/13] more general canSplitXSide --- .../adaptive/OptimizeSkewedJoin.scala | 36 +++++++++++++------ 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala index 4b20e3d66dec2..7c9560545dbfd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala @@ -23,10 +23,10 @@ import org.apache.commons.io.FileUtils import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv} import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.physical.UnspecifiedDistribution import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} -import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec} +import org.apache.spark.sql.execution.exchange.{EnsureRequirements, Exchange, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.SortMergeJoinExec import org.apache.spark.sql.internal.SQLConf @@ -133,21 +133,37 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { private def canSplitLeftSide(joinType: JoinType, plan: SparkPlan) = { (joinType == Inner || joinType == Cross || joinType == LeftSemi || - joinType == LeftAnti || joinType == LeftOuter) && !containsAggregateExec(plan) + joinType == LeftAnti || joinType == LeftOuter) && canBypass(plan) } private def canSplitRightSide(joinType: JoinType, plan: SparkPlan) = { (joinType == Inner || joinType == Cross || - joinType == RightOuter) && !containsAggregateExec(plan) + joinType == RightOuter) && canBypass(plan) } - private def containsAggregateExec(plan: SparkPlan) = { - plan.find { - case _: HashAggregateExec => true - case _: SortAggregateExec => true - case _: ObjectHashAggregateExec => true + // Bypass the node which its requiredChildDistribution contains [[UnspecifiedDistribution]] + private def canBypass(plan: SparkPlan) = { + val nodesCanBypass = plan.find { + case p: SparkPlan if p.requiredChildDistribution.exists { + case UnspecifiedDistribution => true + case _ => false + } => false // false means we bypass this node + case _ @ BypassTerminator() => true // terminate traverse + case _ => true // get the node which cannot bypass + } + nodesCanBypass.exists { + case _ @ BypassTerminator() => true + case _ => false + } + } + + private object BypassTerminator { + def unapply(plan: SparkPlan): Boolean = plan match { + case _: ShuffleQueryStageExec => true + case _: CustomShuffleReaderExec => true + case _: Exchange => true case _ => false - }.isDefined + } } private def getSizeInfo(medianSize: Long, sizes: Seq[Long]): String = { From 0950e9a99f26dc80243f87eb914b76b28d89d1be Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Thu, 16 Jul 2020 13:31:25 +0800 Subject: [PATCH 08/13] fix ut --- .../spark/sql/execution/adaptive/OptimizeSkewedJoin.scala | 4 +--- .../spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala index 7c9560545dbfd..b6bb48ae9cc38 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala @@ -199,9 +199,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { * 3 tasks separately. */ def optimizeSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp { - case smj @ SortMergeJoinExec(_, _, joinType, _, - s1 @ SortExec(_, _, _, _), - s2 @ SortExec(_, _, _, _), _) + case smj @ SortMergeJoinExec(_, _, joinType, _, s1: SortExec, s2: SortExec, _) if supportedJoinTypes.contains(joinType) => // find the shuffleStage from the plan tree val leftOpt = findShuffleStage(s1) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 08c9cdb0b7a8a..ed7fe7a84b297 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -660,9 +660,9 @@ class AdaptiveQueryExecSuite checkSkewJoin( "SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2", true) - // Additional shuffle introduced, so disable the "OptimizeSkewedJoin" optimization + // After patched SPARK-32201, this query won't introduce additional shuffle anymore. checkSkewJoin( - "SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 GROUP BY key1", false) + "SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 GROUP BY key1", true) } } } From 973d87e84d33cf35733ecf4d4ddbc208d9cfce4a Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Thu, 16 Jul 2020 16:18:31 +0800 Subject: [PATCH 09/13] add another skew test case --- .../adaptive/AdaptiveQueryExecSuite.scala | 39 ++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index ed7fe7a84b297..66730fb7e2d57 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -733,12 +733,13 @@ class AdaptiveQueryExecSuite test("SPARK-32201: handle general skew join pattern") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1199", SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1", SQLConf.SHUFFLE_PARTITIONS.key -> "100", SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800", SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") { + // CASE 1: // SMJ // Sort // CustomShuffleReader(coalesced) @@ -765,6 +766,7 @@ class AdaptiveQueryExecSuite .otherwise('id).as("key1"), 'id as "value1") .createOrReplaceTempView("skewData1") + spark .range(0, 1000, 1, 10) .select( @@ -788,6 +790,41 @@ class AdaptiveQueryExecSuite val (_, adaptivePlan) = runAdaptiveAndVerifyResult(sqlText) val innerSmj = findTopLevelSortMergeJoin(adaptivePlan) checkSkewJoin(innerSmj, 2, 0) + + // CASE 2: + // SMJ + // Sort + // SMJ + // CustomShuffleReader(coalesced) + // Shuffle + // Sort + // CustomShuffleReader(coalesced) + // Shuffle + // --> + // SMJ + // Sort + // BroadcastHashJoin <-- SMJ change to BCJ + // CustomShuffleReader(coalesced and skew) + // Shuffle + // Sort + // CustomShuffleReader(coalesced) + // Shuffle + val sqlText2 = + """ + |SELECT * FROM + | ( + | SELECT t1.* + | FROM skewData1 t1 LEFT JOIN testData t2 + | ON t1.value1 = t2.key + | AND t2.value = '2' || t2.value = '1' + | ) AS data1 + | LEFT JOIN + | skewData2 AS data2 + |ON data1.key1 = data2.key2 LIMIT 10 + |""".stripMargin + val (_, adaptivePlan2) = runAdaptiveAndVerifyResult(sqlText2) + val innerSmj2 = findTopLevelSortMergeJoin(adaptivePlan2) + checkSkewJoin(innerSmj2, 2, 0) } } } From d65a210d31a334fd17ff25d82504964f27f28ca9 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Thu, 16 Jul 2020 18:54:29 +0800 Subject: [PATCH 10/13] canUseLocalShuffleReader should consider skew optimization --- .../adaptive/OptimizeLocalShuffleReader.scala | 8 +++++++ .../adaptive/AdaptiveQueryExecSuite.scala | 22 +++++++++++++------ 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala index 3620f27058af2..dd27417337d97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala @@ -142,6 +142,14 @@ object OptimizeLocalShuffleReader { def canUseLocalShuffleReader(plan: SparkPlan): Boolean = plan match { case s: ShuffleQueryStageExec => s.shuffle.canChangeNumPartitions + // This CustomShuffleReaderExec used in skew side, its numPartitions increased. + case CustomShuffleReaderExec(_, partitionSpecs) + if partitionSpecs.exists(_.isInstanceOf[PartialReducerPartitionSpec]) => false + // This CustomShuffleReaderExec used in non-skew side, its numPartitions equals to + // the skew side CustomShuffleReaderExec. + case CustomShuffleReaderExec(_, partitionSpecs) + if partitionSpecs.forall(_.isInstanceOf[CoalescedPartitionSpec]) && + partitionSpecs.toSet.size == partitionSpecs.size => false case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs) => s.shuffle.canChangeNumPartitions && partitionSpecs.nonEmpty case _ => false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 66730fb7e2d57..b6f8c76fe3c55 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -733,7 +733,7 @@ class AdaptiveQueryExecSuite test("SPARK-32201: handle general skew join pattern") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1199", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80", SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1", SQLConf.SHUFFLE_PARTITIONS.key -> "100", SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800", @@ -757,7 +757,7 @@ class AdaptiveQueryExecSuite // HashAggregate // CustomShuffleReader(coalesced) // Shuffle - withTempView("skewData1", "skewData2") { + withTempView("skewData1", "skewData2", "smallData") { spark .range(0, 1000, 1, 10) .select( @@ -809,22 +809,30 @@ class AdaptiveQueryExecSuite // Sort // CustomShuffleReader(coalesced) // Shuffle + spark + .range(0, 100, 1, 10) + .select( + when('id < 250, 249) + .otherwise('id).as("key3"), + expr("concat(id, 'aaa')") as "value3") + .createOrReplaceTempView("smallData") + val sqlText2 = """ |SELECT * FROM | ( | SELECT t1.* - | FROM skewData1 t1 LEFT JOIN testData t2 - | ON t1.value1 = t2.key - | AND t2.value = '2' || t2.value = '1' + | FROM skewData1 t1 LEFT JOIN smallData t2 + | ON t1.key1 = t2.key3 + | AND t2.value3 = 'xyz' | ) AS data1 - | LEFT JOIN + | INNER JOIN | skewData2 AS data2 |ON data1.key1 = data2.key2 LIMIT 10 |""".stripMargin val (_, adaptivePlan2) = runAdaptiveAndVerifyResult(sqlText2) val innerSmj2 = findTopLevelSortMergeJoin(adaptivePlan2) - checkSkewJoin(innerSmj2, 2, 0) + checkSkewJoin(innerSmj2, 2, 1) } } } From 6f7dbc2ad47fb7425e08cb9ae4ebb1795cacecd9 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Thu, 16 Jul 2020 19:06:29 +0800 Subject: [PATCH 11/13] fix ut --- .../execution/adaptive/OptimizeLocalShuffleReader.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala index dd27417337d97..7a70a0c245bc7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala @@ -144,12 +144,12 @@ object OptimizeLocalShuffleReader { s.shuffle.canChangeNumPartitions // This CustomShuffleReaderExec used in skew side, its numPartitions increased. case CustomShuffleReaderExec(_, partitionSpecs) - if partitionSpecs.exists(_.isInstanceOf[PartialReducerPartitionSpec]) => false + if partitionSpecs.exists(_.isInstanceOf[PartialReducerPartitionSpec]) => false // This CustomShuffleReaderExec used in non-skew side, its numPartitions equals to // the skew side CustomShuffleReaderExec. - case CustomShuffleReaderExec(_, partitionSpecs) - if partitionSpecs.forall(_.isInstanceOf[CoalescedPartitionSpec]) && - partitionSpecs.toSet.size == partitionSpecs.size => false + case CustomShuffleReaderExec(_, partitionSpecs) if partitionSpecs.size > 1 && + partitionSpecs.forall(_.isInstanceOf[CoalescedPartitionSpec]) && + partitionSpecs.toSet.size != partitionSpecs.size => false case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs) => s.shuffle.canChangeNumPartitions && partitionSpecs.nonEmpty case _ => false From aa3af93ed44d4b7e673814d676e8abbde7f88bf5 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Mon, 20 Jul 2020 15:48:23 +0800 Subject: [PATCH 12/13] approach 2: remove 'CoalescedHashPartitioning' and refine code --- .../plans/physical/partitioning.scala | 25 -------- .../adaptive/CustomShuffleReaderExec.scala | 10 +--- .../adaptive/OptimizeSkewedJoin.scala | 59 ++++++++----------- .../execution/joins/SortMergeJoinExec.scala | 1 + .../adaptive/AdaptiveQueryExecSuite.scala | 14 +++-- 5 files changed, 33 insertions(+), 76 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 72f12fea0666b..17e1cb416fc8a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -340,28 +340,3 @@ case class BroadcastPartitioning(mode: BroadcastMode) extends Partitioning { case _ => false } } - -/** - * With AE, multiple partitions in hash partitioned output could be coalesced - * to a single partition. CoalescedHashPartitioning is designed for such case. - */ -case class CoalescedHashPartitioning( - expressions: Seq[Expression], - numPartitions: Int) - extends Expression with Partitioning with Unevaluable { - - override def children: Seq[Expression] = expressions - override def nullable: Boolean = false - override def dataType: DataType = IntegerType - - override def satisfies0(required: Distribution): Boolean = { - super.satisfies0(required) || { - required match { - case ClusteredDistribution(requiredClustering, requiredNumPartitions) => - expressions.forall(x => requiredClustering.exists(_.semanticEquals(x))) && - (requiredNumPartitions.isEmpty || requiredNumPartitions.get == numPartitions) - case _ => false - } - } - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala index b3b38b5e8ee85..af18ee065aa86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala @@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} -import org.apache.spark.sql.catalyst.plans.physical.{CoalescedHashPartitioning, HashPartitioning, Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} @@ -65,14 +65,6 @@ case class CustomShuffleReaderExec private( case _ => throw new IllegalStateException("operating on canonicalization plan") } - } else if (partitionSpecs.nonEmpty && - partitionSpecs.forall(_.isInstanceOf[CoalescedPartitionSpec])) { - child match { - case ShuffleQueryStageExec(_, ShuffleExchangeExec(p: HashPartitioning, _, _)) => - CoalescedHashPartitioning(p.expressions, partitionSpecs.size) - case _ => - UnknownPartitioning(partitionSpecs.length) - } } else { UnknownPartitioning(partitionSpecs.length) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala index 6f7b7aca2029c..864e43b855de1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala @@ -133,29 +133,22 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { private def canSplitLeftSide(joinType: JoinType, plan: SparkPlan) = { (joinType == Inner || joinType == Cross || joinType == LeftSemi || - joinType == LeftAnti || joinType == LeftOuter) && canBypass(plan) + joinType == LeftAnti || joinType == LeftOuter) && allUnspecifiedDistribution(plan) } private def canSplitRightSide(joinType: JoinType, plan: SparkPlan) = { (joinType == Inner || joinType == Cross || - joinType == RightOuter) && canBypass(plan) + joinType == RightOuter) && allUnspecifiedDistribution(plan) } - // Bypass the node which its requiredChildDistribution contains [[UnspecifiedDistribution]] - private def canBypass(plan: SparkPlan) = { - val nodesCanBypass = plan.find { - case p: SparkPlan if p.requiredChildDistribution.exists { - case UnspecifiedDistribution => true - case _ => false - } => false // false means we bypass this node - case _ @ BypassTerminator() => true // terminate traverse - case _ => true // get the node which cannot bypass + // Check if there is a node in the tree that the requiredChildDistribution is specified, + // other than UnspecifiedDistribution. + private def allUnspecifiedDistribution(plan: SparkPlan): Boolean = plan.find { p => + p.requiredChildDistribution.exists { + case UnspecifiedDistribution => false + case _ => true } - nodesCanBypass.exists { - case _ @ BypassTerminator() => true - case _ => false - } - } + }.isEmpty private object BypassTerminator { def unapply(plan: SparkPlan): Boolean = plan match { @@ -325,10 +318,21 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { // .. // Shuffle val optimizePlan = optimizeSkewJoin(plan) - val numShuffles = ensureRequirements.apply(optimizePlan).collect { - case e: ShuffleExchangeExec => e - }.length + def countAdditionalShuffleInAncestorsOfSkewJoin(optimizePlan: SparkPlan): Int = { + val newPlan = ensureRequirements.apply(optimizePlan) + val totalAdditionalShuffles = newPlan.collect { case e: ShuffleExchangeExec => e }.size + val numShufflesFromDescendants = + newPlan.collectFirst { case j: SortMergeJoinExec if j.isSkewJoin => j }.map { smj => + smj.collect { case e: ShuffleExchangeExec => e }.size + }.getOrElse(0) + totalAdditionalShuffles - numShufflesFromDescendants + } + + // Check if we introduced new shuffles in the ancestors of the skewed join operator. + // And we don't care if new shuffles are introduced in the descendants of the join operator, + // since they will not actually be executed in the current adaptive execution framework. + val numShuffles = countAdditionalShuffleInAncestorsOfSkewJoin(optimizePlan) if (numShuffles > 0) { logDebug("OptimizeSkewedJoin rule is not applied due" + " to additional shuffles will be introduced.") @@ -370,23 +374,6 @@ private object ShuffleStage { } Some(ShuffleStageInfo(s, mapStats, partitions)) - case _: LeafExecNode => None - - case _ @ UnaryExecNode((_, ShuffleStage(ss: ShuffleStageInfo))) => - Some(ss) - - case b: BinaryExecNode => - b.left match { - case _ @ ShuffleStage(ss: ShuffleStageInfo) => - Some(ss) - case _ => - b.right match { - case _ @ ShuffleStage(ss: ShuffleStageInfo) => - Some(ss) - case _ => None - } - } - case _ => None } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index 2c57956de5bca..7b86d5b344999 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -73,6 +73,7 @@ case class SortMergeJoinExec( } override def outputPartitioning: Partitioning = joinType match { + case _ if isSkewJoin => UnknownPartitioning(0) case _: InnerLike => PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning)) // For left and right outer joins, the output is partitioned by the streamed input's join keys. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index b6f8c76fe3c55..b568ca636080e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -68,7 +68,9 @@ class AdaptiveQueryExecSuite val result = dfAdaptive.collect() withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { val df = sql(query) - QueryTest.sameRows(result.toSeq, df.collect().toSeq) + QueryTest.sameRows(result.toSeq, df.collect().toSeq).foreach { + error => fail(error) + } } val planAfter = dfAdaptive.queryExecution.executedPlan assert(planAfter.toString.startsWith("AdaptiveSparkPlan isFinalPlan=true")) @@ -660,9 +662,9 @@ class AdaptiveQueryExecSuite checkSkewJoin( "SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2", true) - // After patched SPARK-32201, this query won't introduce additional shuffle anymore. + // Additional shuffle introduced, so disable the "OptimizeSkewedJoin" optimization checkSkewJoin( - "SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 GROUP BY key1", true) + "SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 GROUP BY key1", false) } } } @@ -804,10 +806,10 @@ class AdaptiveQueryExecSuite // SMJ // Sort // BroadcastHashJoin <-- SMJ change to BCJ - // CustomShuffleReader(coalesced and skew) + // CustomShuffleReader(coalesced) // Shuffle // Sort - // CustomShuffleReader(coalesced) + // CustomShuffleReader(coalesced and skew) // Shuffle spark .range(0, 100, 1, 10) @@ -832,7 +834,7 @@ class AdaptiveQueryExecSuite |""".stripMargin val (_, adaptivePlan2) = runAdaptiveAndVerifyResult(sqlText2) val innerSmj2 = findTopLevelSortMergeJoin(adaptivePlan2) - checkSkewJoin(innerSmj2, 2, 1) + checkSkewJoin(innerSmj2, 0, 1) } } } From 3cd411f81a19cb7b9f04c56e1ebc796dcd52b3cd Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Mon, 20 Jul 2020 15:50:47 +0800 Subject: [PATCH 13/13] remove dead code --- .../sql/execution/adaptive/OptimizeSkewedJoin.scala | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala index 864e43b855de1..7820df9c9a2bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical.UnspecifiedDistribution import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.exchange.{EnsureRequirements, Exchange, ShuffleExchangeExec} +import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.SortMergeJoinExec import org.apache.spark.sql.internal.SQLConf @@ -150,15 +150,6 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { } }.isEmpty - private object BypassTerminator { - def unapply(plan: SparkPlan): Boolean = plan match { - case _: ShuffleQueryStageExec => true - case _: CustomShuffleReaderExec => true - case _: Exchange => true - case _ => false - } - } - private def getSizeInfo(medianSize: Long, sizes: Seq[Long]): String = { s"median size: $medianSize, max size: ${sizes.max}, min size: ${sizes.min}, avg size: " + sizes.sum / sizes.length