Skip to content

Commit f2c2408

Browse files
committed
update
1 parent 4902a30 commit f2c2408

File tree

6 files changed

+61
-66
lines changed

6 files changed

+61
-66
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,7 @@ object SQLConf {
488488
val SKEW_JOIN_SKEWED_PARTITION_FACTOR =
489489
buildConf("spark.sql.adaptive.skewJoin.skewedPartitionFactor")
490490
.doc("A partition is considered as skewed if its size is larger than this factor " +
491-
"multiplying the median partition size and also larger than 2 multiplying " +
491+
"multiplying the median partition size and also larger than " +
492492
s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'")
493493
.version("3.0.0")
494494
.intConf

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ case class CoalesceShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] {
6666
val distinctNumPreShufflePartitions =
6767
validMetrics.map(stats => stats.bytesByPartitionId.length).distinct
6868
if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) {
69-
val partitionSpecs = ShufflePartitionsCoalescer.coalescePartitions(
69+
val partitionSpecs = ShufflePartitionsUtil.coalescePartitions(
7070
validMetrics.toArray,
7171
firstPartitionIndex = 0,
7272
lastPartitionIndex = distinctNumPreShufflePartitions.head,

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,11 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
6363
/**
6464
* A partition is considered as a skewed partition if its size is larger than the median
6565
* partition size * ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR and also larger than
66-
* ADVISORY_PARTITION_SIZE_IN_BYTES * 2.
66+
* ADVISORY_PARTITION_SIZE_IN_BYTES.
6767
*/
6868
private def isSkewed(size: Long, medianSize: Long): Boolean = {
6969
size > medianSize * conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR) &&
70-
size > conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) * 2
70+
size > conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
7171
}
7272

7373
private def medianSize(stats: MapOutputStatistics): Long = {
@@ -110,7 +110,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
110110
targetSize: Long): Array[Int] = {
111111
val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId
112112
val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId)
113-
ShufflePartitionsCoalescer.splitSizeListByTargetSize(mapPartitionSizes, targetSize)
113+
ShufflePartitionsUtil.splitSizeListByTargetSize(mapPartitionSizes, targetSize)
114114
}
115115

116116
private def getStatistics(stage: ShuffleQueryStageExec): MapOutputStatistics = {
@@ -195,21 +195,25 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
195195
}
196196

197197
val leftParts = if (isLeftSkew) {
198-
leftSkewDesc.addPartitionSize(leftSize)
199-
createSkewPartitions(
200-
partitionIndex,
201-
getMapStartIndices(left, partitionIndex, leftTargetSize),
202-
getNumMappers(left))
198+
val mapStartIndices = getMapStartIndices(left, partitionIndex, leftTargetSize)
199+
if (mapStartIndices.length > 1) {
200+
leftSkewDesc.addPartitionSize(leftSize)
201+
createSkewPartitions(partitionIndex, mapStartIndices, getNumMappers(left))
202+
} else {
203+
Seq(CoalescedPartitionSpec(partitionIndex, partitionIndex + 1))
204+
}
203205
} else {
204206
Seq(CoalescedPartitionSpec(partitionIndex, partitionIndex + 1))
205207
}
206208

207209
val rightParts = if (isRightSkew) {
208-
rightSkewDesc.addPartitionSize(rightSize)
209-
createSkewPartitions(
210-
partitionIndex,
211-
getMapStartIndices(right, partitionIndex, rightTargetSize),
212-
getNumMappers(right))
210+
val mapStartIndices = getMapStartIndices(right, partitionIndex, rightTargetSize)
211+
if (mapStartIndices.length > 1) {
212+
rightSkewDesc.addPartitionSize(rightSize)
213+
createSkewPartitions(partitionIndex, mapStartIndices, getNumMappers(right))
214+
} else {
215+
Seq(CoalescedPartitionSpec(partitionIndex, partitionIndex + 1))
216+
}
213217
} else {
214218
Seq(CoalescedPartitionSpec(partitionIndex, partitionIndex + 1))
215219
}
@@ -259,7 +263,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
259263
if (!shouldCoalesce || nonSkewPartitionIndices.length == 1) {
260264
nonSkewPartitionIndices.map(i => CoalescedPartitionSpec(i, i + 1))
261265
} else {
262-
ShufflePartitionsCoalescer.coalescePartitions(
266+
ShufflePartitionsUtil.coalescePartitions(
263267
Array(leftStats, rightStats),
264268
firstPartitionIndex = nonSkewPartitionIndices.head,
265269
// `lastPartitionIndex` is exclusive.
Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ import org.apache.spark.MapOutputStatistics
2323
import org.apache.spark.internal.Logging
2424
import org.apache.spark.sql.execution.{CoalescedPartitionSpec, ShufflePartitionSpec}
2525

26-
object ShufflePartitionsCoalescer extends Logging {
26+
object ShufflePartitionsUtil extends Logging {
27+
final val SMALL_PARTITION_FACTOR = 0.2
28+
final val MERGED_PARTITION_FACTOR = 1.2
2729

2830
/**
2931
* Coalesce the same range of partitions (`firstPartitionIndex` to `lastPartitionIndex`, the
@@ -117,38 +119,43 @@ object ShufflePartitionsCoalescer extends Logging {
117119

118120
/**
119121
* Given a list of size, return an array of indices to split the list into multiple partitions,
120-
* so that the size sum of each partition is close to target size. Each index indicates the start
121-
* of a partition.
122+
* so that the size sum of each partition is close to the target size. Each index indicates the
123+
* start of a partition.
122124
*/
123125
def splitSizeListByTargetSize(sizes: Seq[Long], targetSize: Long): Array[Int] = {
124126
val partitionStartIndices = ArrayBuffer[Int]()
125127
partitionStartIndices += 0
126128
var i = 0
127-
var currentSizeSum = 0L
129+
var currentPartitionSize = 0L
128130
var lastPartitionSize = -1L
129131

130132
def tryMergePartitions() = {
131133
// When we are going to start a new partition, it's possible that the current partition or
132134
// the previous partition is very small and it's better to merge the current partition into
133135
// the previous partition.
134136
val shouldMergePartitions = lastPartitionSize > -1 &&
135-
((currentSizeSum + lastPartitionSize) < targetSize * 1.3 ||
136-
(currentSizeSum < targetSize * 0.3 || lastPartitionSize < targetSize * 0.3))
137+
((currentPartitionSize + lastPartitionSize) < targetSize * MERGED_PARTITION_FACTOR ||
138+
(currentPartitionSize < targetSize * SMALL_PARTITION_FACTOR ||
139+
lastPartitionSize < targetSize * SMALL_PARTITION_FACTOR))
137140
if (shouldMergePartitions) {
141+
// We decide to merge the current partition into the previous one, so the start index of
142+
// the current partition should be removed.
138143
partitionStartIndices.remove(partitionStartIndices.length - 1)
139-
lastPartitionSize += currentSizeSum
144+
lastPartitionSize += currentPartitionSize
140145
} else {
141-
lastPartitionSize = currentSizeSum
146+
lastPartitionSize = currentPartitionSize
142147
}
143148
}
144149

145150
while (i < sizes.length) {
146-
if (i > 0 && currentSizeSum + sizes(i) > targetSize) {
151+
// If including the next size in the current partition exceeds the target size, package the
152+
// current partition and start a new partition.
153+
if (i > 0 && currentPartitionSize + sizes(i) > targetSize) {
147154
tryMergePartitions()
148155
partitionStartIndices += i
149-
currentSizeSum = sizes(i)
156+
currentPartitionSize = sizes(i)
150157
} else {
151-
currentSizeSum += sizes(i)
158+
currentPartitionSize += sizes(i)
152159
}
153160
i += 1
154161
}
Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
package org.apache.spark.sql.execution
1919

2020
import org.apache.spark.{MapOutputStatistics, SparkFunSuite}
21-
import org.apache.spark.sql.execution.adaptive.ShufflePartitionsCoalescer
21+
import org.apache.spark.sql.execution.adaptive.ShufflePartitionsUtil
2222

23-
class ShufflePartitionsCoalescerSuite extends SparkFunSuite {
23+
class ShufflePartitionsUtilSuite extends SparkFunSuite {
2424

2525
private def checkEstimation(
2626
bytesByPartitionIdArray: Array[Array[Long]],
@@ -31,7 +31,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite {
3131
case (bytesByPartitionId, index) =>
3232
new MapOutputStatistics(index, bytesByPartitionId)
3333
}
34-
val estimatedPartitionStartIndices = ShufflePartitionsCoalescer.coalescePartitions(
34+
val estimatedPartitionStartIndices = ShufflePartitionsUtil.coalescePartitions(
3535
mapOutputStatistics,
3636
0,
3737
bytesByPartitionIdArray.head.length,
@@ -257,23 +257,23 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite {
257257
val targetSize = 100
258258

259259
// merge the small partitions at the beginning/end
260-
val sizeList1 = Seq[Long](20, 90, 20, 25, 80, 20)
261-
assert(ShufflePartitionsCoalescer.splitSizeListByTargetSize(sizeList1, targetSize).toSeq ==
262-
Seq(0, 2, 4))
260+
val sizeList1 = Seq[Long](15, 90, 15, 15, 15, 90, 15)
261+
assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList1, targetSize).toSeq ==
262+
Seq(0, 2, 5))
263263

264264
// merge the small partitions in the middle
265-
val sizeList2 = Seq[Long](20, 25, 90, 20, 90, 20, 25)
266-
assert(ShufflePartitionsCoalescer.splitSizeListByTargetSize(sizeList2, targetSize).toSeq ==
265+
val sizeList2 = Seq[Long](30, 15, 90, 10, 90, 15, 30)
266+
assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList2, targetSize).toSeq ==
267267
Seq(0, 2, 4, 5))
268268

269269
// merge the small partition even if it leads to a very large partition
270-
val sizeList3 = Seq[Long](20, 1000, 20, 1000)
271-
assert(ShufflePartitionsCoalescer.splitSizeListByTargetSize(sizeList3, targetSize).toSeq ==
270+
val sizeList3 = Seq[Long](15, 1000, 15, 1000)
271+
assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList3, targetSize).toSeq ==
272272
Seq(0, 3))
273273

274-
// merge the small partitions even if it exceeds targetSize * 0.3
274+
// merge the small partitions even if it exceeds targetSize * 0.2
275275
val sizeList4 = Seq[Long](35, 75, 90, 20, 35, 35, 35)
276-
assert(ShufflePartitionsCoalescer.splitSizeListByTargetSize(sizeList4, targetSize).toSeq ==
276+
assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList4, targetSize).toSeq ==
277277
Seq(0, 2, 3))
278278
}
279279
}

sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala

Lines changed: 11 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,7 @@ class AdaptiveQueryExecSuite
614614
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "2000") {
615615
withTempView("skewData1", "skewData2") {
616616
spark
617-
.range(0, 1300, 1, 10)
617+
.range(0, 1000, 1, 10)
618618
.selectExpr("id % 2 as key1", "id as value1")
619619
.createOrReplaceTempView("skewData1")
620620
spark
@@ -635,36 +635,36 @@ class AdaptiveQueryExecSuite
635635
// skewed inner join optimization
636636
val (_, innerAdaptivePlan) = runAdaptiveAndVerifyResult(
637637
"SELECT * FROM skewData1 join skewData2 ON key1 = key2")
638-
// left stats: [4297, 0, 0, 0, 4674]
638+
// left stats: [3496, 0, 0, 0, 4014]
639639
// right stats:[6292, 0, 0, 0, 0]
640640
// Partition 0: both left and right sides are skewed, left side is divided
641641
// into 2 splits and right side is divided into 4 splits, so
642642
// 2 x 4 sub-partitions.
643643
// Partition 1, 2, 3: not skewed, and coalesced into 1 partition.
644-
// Partition 4: only left side is skewed, and divide into 3 splits, so
645-
// 3 sub-partitions.
644+
// Partition 4: only left side is skewed, and divide into 2 splits, so
645+
// 2 sub-partitions.
646646
// So total (8 + 1 + 3) partitions.
647647
val innerSmj = findTopLevelSortMergeJoin(innerAdaptivePlan)
648-
checkSkewJoin(innerSmj, 8 + 1 + 3)
648+
checkSkewJoin(innerSmj, 8 + 1 + 2)
649649

650650
// skewed left outer join optimization
651651
val (_, leftAdaptivePlan) = runAdaptiveAndVerifyResult(
652652
"SELECT * FROM skewData1 left outer join skewData2 ON key1 = key2")
653-
// left stats: [4297, 0, 0, 0, 4674]
653+
// left stats: [3496, 0, 0, 0, 4014]
654654
// right stats:[6292, 0, 0, 0, 0]
655655
// Partition 0: both left and right sides are skewed, but left join can't split right side,
656656
// so only left side is divided into 2 splits, and thus 2 sub-partitions.
657657
// Partition 1, 2, 3: not skewed, and coalesced into 1 partition.
658-
// Partition 4: only left side is skewed, and divide into 3 splits, so
659-
// 3 sub-partitions.
660-
// So total (2 + 1 + 3) partitions.
658+
// Partition 4: only left side is skewed, and divide into 2 splits, so
659+
// 2 sub-partitions.
660+
// So total (2 + 1 + 2) partitions.
661661
val leftSmj = findTopLevelSortMergeJoin(leftAdaptivePlan)
662-
checkSkewJoin(leftSmj, 2 + 1 + 3)
662+
checkSkewJoin(leftSmj, 2 + 1 + 2)
663663

664664
// skewed right outer join optimization
665665
val (_, rightAdaptivePlan) = runAdaptiveAndVerifyResult(
666666
"SELECT * FROM skewData1 right outer join skewData2 ON key1 = key2")
667-
// left stats: [4297, 0, 0, 0, 4674]
667+
// left stats: [3496, 0, 0, 0, 4014]
668668
// right stats:[6292, 0, 0, 0, 0]
669669
// Partition 0: both left and right sides are skewed, but right join can't split left side,
670670
// so only right side is divided into 4 splits, and thus 4 sub-partitions.
@@ -674,22 +674,6 @@ class AdaptiveQueryExecSuite
674674
// So total (4 + 1 + 1) partitions.
675675
val rightSmj = findTopLevelSortMergeJoin(rightAdaptivePlan)
676676
checkSkewJoin(rightSmj, 4 + 1 + 1)
677-
678-
withSQLConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "3000") {
679-
val (_, innerAdaptivePlan) = runAdaptiveAndVerifyResult(
680-
"SELECT * FROM skewData1 join skewData2 ON key1 = key2")
681-
// left stats: [4297, 0, 0, 0, 4674]
682-
// right stats:[6292, 0, 0, 0, 0]
683-
// Partition 0: left side is smaller than 3000 * 2, so it's not skewed,
684-
// right side is skewed divided into 2 splits, so
685-
// 2 sub-partitions.
686-
// Partition 1, 2, 3: not skewed, and coalesced into 1 partition.
687-
// Partition 4: left side is smaller than 3000 * 2, so it's not skewed,
688-
// right side is not skewed either, so just 1 partition.
689-
// So total (2 + 1 + 1) partitions.
690-
val innerSmj = findTopLevelSortMergeJoin(innerAdaptivePlan)
691-
checkSkewJoin(innerSmj, 2 + 1 + 1)
692-
}
693677
}
694678
}
695679
}

0 commit comments

Comments
 (0)