diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 4bef66d0d3aa9..af86990026a1e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -432,19 +432,13 @@ object SQLConf { .booleanConf .createWithDefault(true) - val ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD = - buildConf("spark.sql.adaptive.skewedJoinOptimization.skewedPartitionSizeThreshold") - .doc("Configures the minimum size in bytes for a partition that is considered as a skewed " + - "partition in adaptive skewed join.") - .bytesConf(ByteUnit.BYTE) - .createWithDefaultString("64MB") - val ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR = buildConf("spark.sql.adaptive.skewedJoinOptimization.skewedPartitionFactor") .doc("A partition is considered as a skewed partition if its size is larger than" + " this factor multiple the median partition size and also larger than " + - s" ${ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD.key}") + s" ${SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key}") .intConf + .checkValue(_ > 0, "The skew factor must be positive.") .createWithDefault(10) val NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala index 578d2d744f85c..d3cb8645fc7ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala @@ -34,6 +34,30 @@ import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExcha import org.apache.spark.sql.execution.joins.SortMergeJoinExec import org.apache.spark.sql.internal.SQLConf +/** + * A rule to optimize skewed joins to avoid straggler tasks whose share of data are significantly + * larger than those of the rest of the tasks. + * + * The general idea is to divide each skew partition into smaller partitions and replicate its + * matching partition on the other side of the join so that they can run in parallel tasks. + * Note that when matching partitions from the left side and the right side both have skew, + * it will become a cartesian product of splits from left and right joining together. + * + * For example, assume the Sort-Merge join has 4 partitions: + * left: [L1, L2, L3, L4] + * right: [R1, R2, R3, R4] + * + * Let's say L2, L4 and R3, R4 are skewed, and each of them get split into 2 sub-partitions. This + * is scheduled to run 4 tasks at the beginning: (L1, R1), (L2, R2), (L2, R2), (L2, R2). + * This rule expands it to 9 tasks to increase parallelism: + * (L1, R1), + * (L2-1, R2), (L2-2, R2), + * (L3, R3-1), (L3, R3-2), + * (L4-1, R4-1), (L4-2, R4-1), (L4-1, R4-2), (L4-2, R4-2) + * + * Note that, when this rule is enabled, it also coalesces non-skewed partitions like + * `ReduceNumShufflePartitions` does. + */ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { private val ensureRequirements = EnsureRequirements(conf) @@ -43,12 +67,12 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { /** * A partition is considered as a skewed partition if its size is larger than the median - * partition size * spark.sql.adaptive.skewedPartitionFactor and also larger than - * spark.sql.adaptive.skewedPartitionSizeThreshold. + * partition size * ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR and also larger than + * SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE. */ private def isSkewed(size: Long, medianSize: Long): Boolean = { size > medianSize * conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR) && - size > conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD) + size > conf.getConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE) } private def medianSize(stats: MapOutputStatistics): Long = { @@ -61,6 +85,19 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { } } + /** + * The goal of skew join optimization is to make the data distribution more even. The target size + * to split skewed partitions is the average size of non-skewed partition, or the + * target post-shuffle partition size if avg size is smaller than it. + */ + private def targetSize(stats: MapOutputStatistics, medianSize: Long): Long = { + val targetPostShuffleSize = conf.getConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE) + val nonSkewSizes = stats.bytesByPartitionId.filterNot(isSkewed(_, medianSize)) + // It's impossible that all the partitions are skewed, as we use median size to define skew. + assert(nonSkewSizes.nonEmpty) + math.max(targetPostShuffleSize, nonSkewSizes.sum / nonSkewSizes.length) + } + /** * Get the map size of the specific reduce shuffle Id. */ @@ -72,19 +109,19 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { /** * Split the skewed partition based on the map size and the max split number. */ - private def getMapStartIndices(stage: ShuffleQueryStageExec, partitionId: Int): Array[Int] = { + private def getMapStartIndices( + stage: ShuffleQueryStageExec, + partitionId: Int, + targetSize: Long): Array[Int] = { val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId) - val avgPartitionSize = mapPartitionSizes.sum / mapPartitionSizes.length - val advisoryPartitionSize = math.max(avgPartitionSize, - conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD)) val partitionStartIndices = ArrayBuffer[Int]() partitionStartIndices += 0 var i = 0 var postMapPartitionSize = 0L while (i < mapPartitionSizes.length) { val nextMapPartitionSize = mapPartitionSizes(i) - if (i > 0 && postMapPartitionSize + nextMapPartitionSize > advisoryPartitionSize) { + if (i > 0 && postMapPartitionSize + nextMapPartitionSize > targetSize) { partitionStartIndices += i postMapPartitionSize = nextMapPartitionSize } else { @@ -152,6 +189,8 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { """.stripMargin) val canSplitLeft = canSplitLeftSide(joinType) val canSplitRight = canSplitRightSide(joinType) + val leftTargetSize = targetSize(leftStats, leftMedSize) + val rightTargetSize = targetSize(rightStats, rightMedSize) val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec] val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec] @@ -179,7 +218,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { leftSkewDesc.addPartitionSize(leftSize) createSkewPartitions( partitionIndex, - getMapStartIndices(left, partitionIndex), + getMapStartIndices(left, partitionIndex, leftTargetSize), getNumMappers(left)) } else { Seq(SinglePartitionSpec(partitionIndex)) @@ -189,7 +228,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { rightSkewDesc.addPartitionSize(rightSize) createSkewPartitions( partitionIndex, - getMapStartIndices(right, partitionIndex), + getMapStartIndices(right, partitionIndex, rightTargetSize), getNumMappers(right)) } else { Seq(SinglePartitionSpec(partitionIndex)) @@ -236,7 +275,8 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { rightStats: MapOutputStatistics, nonSkewPartitionIndices: Seq[Int]): Seq[ShufflePartitionSpec] = { assert(nonSkewPartitionIndices.nonEmpty) - if (nonSkewPartitionIndices.length == 1) { + val shouldCoalesce = conf.getConf(SQLConf.REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED) + if (!shouldCoalesce || nonSkewPartitionIndices.length == 1) { Seq(SinglePartitionSpec(nonSkewPartitionIndices.head)) } else { val startIndices = ShufflePartitionsCoalescer.coalescePartitions( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 379e17f01d6be..64566af332afb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -583,7 +583,6 @@ class AdaptiveQueryExecSuite withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", - SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD.key -> "100", SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "700") { withTempView("skewData1", "skewData2") { spark @@ -609,8 +608,7 @@ class AdaptiveQueryExecSuite withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", - SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD.key -> "2000", - SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "700") { + SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "2000") { withTempView("skewData1", "skewData2") { spark .range(0, 1000, 1, 10)