Skip to content

Commit d5f5720

Browse files
cloud-fangatorsmile
authored andcommitted
[SPARK-31070][SQL] make skew join split skewed partitions more evenly
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html 2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'. 4. Be sure to keep the PR description updated to reflect all changes. 5. Please write your PR title to summarize what this PR proposes. 6. If possible, provide a concise example to reproduce the issue for a faster review. 7. If you want to add a new configuration, please read the guideline first for naming configurations in 'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'. --> ### What changes were proposed in this pull request? <!-- Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below. 1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers. 2. If you fix some SQL features, you can provide some references of other DBMSes. 3. If there is design documentation, please add the link. 4. If there is a discussion in the mailing list, please add the link. --> There are two problems when splitting skewed partitions: 1. It's impossible that we can't split the skewed partition, then we shouldn't create a skew join. 2. When splitting, it's possible that we create a partition for very small amount of data.. This PR fixes them 1. don't create `PartialReducerPartitionSpec` if we can't split. 2. merge small partitions to the previous partition. ### Why are the changes needed? <!-- Please clarify why the changes are needed. For instance, 1. If you propose a new API, clarify the use case for a new API. 2. If you fix a bug, you can clarify why it is a bug. --> make skew join split skewed partitions more evenly ### Does this PR introduce any user-facing change? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If no, write 'No'. --> no ### How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible. If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future. If tests were not added, please describe why they were not added and/or why it was difficult to add. --> updated test Closes apache#27833 from cloud-fan/aqe. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: gatorsmile <[email protected]>
1 parent 93def95 commit d5f5720

File tree

5 files changed

+102
-40
lines changed

5 files changed

+102
-40
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ case class CoalesceShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] {
6666
val distinctNumPreShufflePartitions =
6767
validMetrics.map(stats => stats.bytesByPartitionId.length).distinct
6868
if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) {
69-
val partitionSpecs = ShufflePartitionsCoalescer.coalescePartitions(
69+
val partitionSpecs = ShufflePartitionsUtil.coalescePartitions(
7070
validMetrics.toArray,
7171
firstPartitionIndex = 0,
7272
lastPartitionIndex = distinctNumPreShufflePartitions.head,

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala

Lines changed: 16 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.sql.execution.adaptive
1919

2020
import scala.collection.mutable
21-
import scala.collection.mutable.ArrayBuffer
2221

2322
import org.apache.commons.io.FileUtils
2423

@@ -111,22 +110,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
111110
targetSize: Long): Seq[Int] = {
112111
val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId
113112
val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId)
114-
val partitionStartIndices = ArrayBuffer[Int]()
115-
partitionStartIndices += 0
116-
var i = 0
117-
var postMapPartitionSize = 0L
118-
while (i < mapPartitionSizes.length) {
119-
val nextMapPartitionSize = mapPartitionSizes(i)
120-
if (i > 0 && postMapPartitionSize + nextMapPartitionSize > targetSize) {
121-
partitionStartIndices += i
122-
postMapPartitionSize = nextMapPartitionSize
123-
} else {
124-
postMapPartitionSize += nextMapPartitionSize
125-
}
126-
i += 1
127-
}
128-
129-
partitionStartIndices
113+
ShufflePartitionsUtil.splitSizeListByTargetSize(mapPartitionSizes, targetSize)
130114
}
131115

132116
private def getStatistics(stage: ShuffleQueryStageExec): MapOutputStatistics = {
@@ -211,21 +195,25 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
211195
}
212196

213197
val leftParts = if (isLeftSkew) {
214-
leftSkewDesc.addPartitionSize(leftSize)
215-
createSkewPartitions(
216-
partitionIndex,
217-
getMapStartIndices(left, partitionIndex, leftTargetSize),
218-
getNumMappers(left))
198+
val mapStartIndices = getMapStartIndices(left, partitionIndex, leftTargetSize)
199+
if (mapStartIndices.length > 1) {
200+
leftSkewDesc.addPartitionSize(leftSize)
201+
createSkewPartitions(partitionIndex, mapStartIndices, getNumMappers(left))
202+
} else {
203+
Seq(CoalescedPartitionSpec(partitionIndex, partitionIndex + 1))
204+
}
219205
} else {
220206
Seq(CoalescedPartitionSpec(partitionIndex, partitionIndex + 1))
221207
}
222208

223209
val rightParts = if (isRightSkew) {
224-
rightSkewDesc.addPartitionSize(rightSize)
225-
createSkewPartitions(
226-
partitionIndex,
227-
getMapStartIndices(right, partitionIndex, rightTargetSize),
228-
getNumMappers(right))
210+
val mapStartIndices = getMapStartIndices(right, partitionIndex, rightTargetSize)
211+
if (mapStartIndices.length > 1) {
212+
rightSkewDesc.addPartitionSize(rightSize)
213+
createSkewPartitions(partitionIndex, mapStartIndices, getNumMappers(right))
214+
} else {
215+
Seq(CoalescedPartitionSpec(partitionIndex, partitionIndex + 1))
216+
}
229217
} else {
230218
Seq(CoalescedPartitionSpec(partitionIndex, partitionIndex + 1))
231219
}
@@ -273,7 +261,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
273261
if (!shouldCoalesce || nonSkewPartitionIndices.length == 1) {
274262
nonSkewPartitionIndices.map(i => CoalescedPartitionSpec(i, i + 1))
275263
} else {
276-
ShufflePartitionsCoalescer.coalescePartitions(
264+
ShufflePartitionsUtil.coalescePartitions(
277265
Array(leftStats, rightStats),
278266
firstPartitionIndex = nonSkewPartitionIndices.head,
279267
// `lastPartitionIndex` is exclusive.
Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ import org.apache.spark.MapOutputStatistics
2323
import org.apache.spark.internal.Logging
2424
import org.apache.spark.sql.execution.{CoalescedPartitionSpec, ShufflePartitionSpec}
2525

26-
object ShufflePartitionsCoalescer extends Logging {
26+
object ShufflePartitionsUtil extends Logging {
27+
final val SMALL_PARTITION_FACTOR = 0.2
28+
final val MERGED_PARTITION_FACTOR = 1.2
2729

2830
/**
2931
* Coalesce the same range of partitions (`firstPartitionIndex` to `lastPartitionIndex`, the
@@ -114,4 +116,50 @@ object ShufflePartitionsCoalescer extends Logging {
114116

115117
partitionSpecs
116118
}
119+
120+
/**
121+
* Given a list of size, return an array of indices to split the list into multiple partitions,
122+
* so that the size sum of each partition is close to the target size. Each index indicates the
123+
* start of a partition.
124+
*/
125+
def splitSizeListByTargetSize(sizes: Seq[Long], targetSize: Long): Array[Int] = {
126+
val partitionStartIndices = ArrayBuffer[Int]()
127+
partitionStartIndices += 0
128+
var i = 0
129+
var currentPartitionSize = 0L
130+
var lastPartitionSize = -1L
131+
132+
def tryMergePartitions() = {
133+
// When we are going to start a new partition, it's possible that the current partition or
134+
// the previous partition is very small and it's better to merge the current partition into
135+
// the previous partition.
136+
val shouldMergePartitions = lastPartitionSize > -1 &&
137+
((currentPartitionSize + lastPartitionSize) < targetSize * MERGED_PARTITION_FACTOR ||
138+
(currentPartitionSize < targetSize * SMALL_PARTITION_FACTOR ||
139+
lastPartitionSize < targetSize * SMALL_PARTITION_FACTOR))
140+
if (shouldMergePartitions) {
141+
// We decide to merge the current partition into the previous one, so the start index of
142+
// the current partition should be removed.
143+
partitionStartIndices.remove(partitionStartIndices.length - 1)
144+
lastPartitionSize += currentPartitionSize
145+
} else {
146+
lastPartitionSize = currentPartitionSize
147+
}
148+
}
149+
150+
while (i < sizes.length) {
151+
// If including the next size in the current partition exceeds the target size, package the
152+
// current partition and start a new partition.
153+
if (i > 0 && currentPartitionSize + sizes(i) > targetSize) {
154+
tryMergePartitions()
155+
partitionStartIndices += i
156+
currentPartitionSize = sizes(i)
157+
} else {
158+
currentPartitionSize += sizes(i)
159+
}
160+
i += 1
161+
}
162+
tryMergePartitions()
163+
partitionStartIndices.toArray
164+
}
117165
}
Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
package org.apache.spark.sql.execution
1919

2020
import org.apache.spark.{MapOutputStatistics, SparkFunSuite}
21-
import org.apache.spark.sql.execution.adaptive.ShufflePartitionsCoalescer
21+
import org.apache.spark.sql.execution.adaptive.ShufflePartitionsUtil
2222

23-
class ShufflePartitionsCoalescerSuite extends SparkFunSuite {
23+
class ShufflePartitionsUtilSuite extends SparkFunSuite {
2424

2525
private def checkEstimation(
2626
bytesByPartitionIdArray: Array[Array[Long]],
@@ -31,7 +31,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite {
3131
case (bytesByPartitionId, index) =>
3232
new MapOutputStatistics(index, bytesByPartitionId)
3333
}
34-
val estimatedPartitionStartIndices = ShufflePartitionsCoalescer.coalescePartitions(
34+
val estimatedPartitionStartIndices = ShufflePartitionsUtil.coalescePartitions(
3535
mapOutputStatistics,
3636
0,
3737
bytesByPartitionIdArray.head.length,
@@ -252,4 +252,30 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite {
252252
targetSize, minNumPartitions)
253253
}
254254
}
255+
256+
test("splitSizeListByTargetSize") {
257+
val targetSize = 100
258+
259+
// merge the small partitions at the beginning/end
260+
val sizeList1 = Seq[Long](15, 90, 15, 15, 15, 90, 15)
261+
assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList1, targetSize).toSeq ==
262+
Seq(0, 2, 5))
263+
264+
// merge the small partitions in the middle
265+
val sizeList2 = Seq[Long](30, 15, 90, 10, 90, 15, 30)
266+
assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList2, targetSize).toSeq ==
267+
Seq(0, 2, 4, 5))
268+
269+
// merge small partitions if the partition itself is smaller than
270+
// targetSize * SMALL_PARTITION_FACTOR
271+
val sizeList3 = Seq[Long](15, 1000, 15, 1000)
272+
assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList3, targetSize).toSeq ==
273+
Seq(0, 3))
274+
275+
// merge small partitions if the combined size is smaller than
276+
// targetSize * MERGED_PARTITION_FACTOR
277+
val sizeList4 = Seq[Long](35, 75, 90, 20, 35, 25, 35)
278+
assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList4, targetSize).toSeq ==
279+
Seq(0, 2, 3))
280+
}
255281
}

sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -645,11 +645,11 @@ class AdaptiveQueryExecSuite
645645
// into 2 splits and right side is divided into 4 splits, so
646646
// 2 x 4 sub-partitions.
647647
// Partition 1, 2, 3: not skewed, and coalesced into 1 partition.
648-
// Partition 4: only left side is skewed, and divide into 3 splits, so
649-
// 3 sub-partitions.
648+
// Partition 4: only left side is skewed, and divide into 2 splits, so
649+
// 2 sub-partitions.
650650
// So total (8 + 1 + 3) partitions.
651651
val innerSmj = findTopLevelSortMergeJoin(innerAdaptivePlan)
652-
checkSkewJoin(innerSmj, 8 + 1 + 3)
652+
checkSkewJoin(innerSmj, 8 + 1 + 2)
653653

654654
// skewed left outer join optimization
655655
val (_, leftAdaptivePlan) = runAdaptiveAndVerifyResult(
@@ -659,11 +659,11 @@ class AdaptiveQueryExecSuite
659659
// Partition 0: both left and right sides are skewed, but left join can't split right side,
660660
// so only left side is divided into 2 splits, and thus 2 sub-partitions.
661661
// Partition 1, 2, 3: not skewed, and coalesced into 1 partition.
662-
// Partition 4: only left side is skewed, and divide into 3 splits, so
663-
// 3 sub-partitions.
664-
// So total (2 + 1 + 3) partitions.
662+
// Partition 4: only left side is skewed, and divide into 2 splits, so
663+
// 2 sub-partitions.
664+
// So total (2 + 1 + 2) partitions.
665665
val leftSmj = findTopLevelSortMergeJoin(leftAdaptivePlan)
666-
checkSkewJoin(leftSmj, 2 + 1 + 3)
666+
checkSkewJoin(leftSmj, 2 + 1 + 2)
667667

668668
// skewed right outer join optimization
669669
val (_, rightAdaptivePlan) = runAdaptiveAndVerifyResult(

0 commit comments

Comments
 (0)