Skip to content

Commit d65a210

Browse files
committed
canUseLocalShuffleReader should consider skew optimization
1 parent 973d87e commit d65a210

File tree

2 files changed

+23
-7
lines changed

2 files changed

+23
-7
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,14 @@ object OptimizeLocalShuffleReader {
142142
def canUseLocalShuffleReader(plan: SparkPlan): Boolean = plan match {
143143
case s: ShuffleQueryStageExec =>
144144
s.shuffle.canChangeNumPartitions
145+
// This CustomShuffleReaderExec used in skew side, its numPartitions increased.
146+
case CustomShuffleReaderExec(_, partitionSpecs)
147+
if partitionSpecs.exists(_.isInstanceOf[PartialReducerPartitionSpec]) => false
148+
// This CustomShuffleReaderExec used in non-skew side, its numPartitions equals to
149+
// the skew side CustomShuffleReaderExec.
150+
case CustomShuffleReaderExec(_, partitionSpecs)
151+
if partitionSpecs.forall(_.isInstanceOf[CoalescedPartitionSpec]) &&
152+
partitionSpecs.toSet.size == partitionSpecs.size => false
145153
case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs) =>
146154
s.shuffle.canChangeNumPartitions && partitionSpecs.nonEmpty
147155
case _ => false

sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -733,7 +733,7 @@ class AdaptiveQueryExecSuite
733733
test("SPARK-32201: handle general skew join pattern") {
734734
withSQLConf(
735735
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
736-
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1199",
736+
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80",
737737
SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
738738
SQLConf.SHUFFLE_PARTITIONS.key -> "100",
739739
SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
@@ -757,7 +757,7 @@ class AdaptiveQueryExecSuite
757757
// HashAggregate
758758
// CustomShuffleReader(coalesced)
759759
// Shuffle
760-
withTempView("skewData1", "skewData2") {
760+
withTempView("skewData1", "skewData2", "smallData") {
761761
spark
762762
.range(0, 1000, 1, 10)
763763
.select(
@@ -809,22 +809,30 @@ class AdaptiveQueryExecSuite
809809
// Sort
810810
// CustomShuffleReader(coalesced)
811811
// Shuffle
812+
spark
813+
.range(0, 100, 1, 10)
814+
.select(
815+
when('id < 250, 249)
816+
.otherwise('id).as("key3"),
817+
expr("concat(id, 'aaa')") as "value3")
818+
.createOrReplaceTempView("smallData")
819+
812820
val sqlText2 =
813821
"""
814822
|SELECT * FROM
815823
| (
816824
| SELECT t1.*
817-
| FROM skewData1 t1 LEFT JOIN testData t2
818-
| ON t1.value1 = t2.key
819-
| AND t2.value = '2' || t2.value = '1'
825+
| FROM skewData1 t1 LEFT JOIN smallData t2
826+
| ON t1.key1 = t2.key3
827+
| AND t2.value3 = 'xyz'
820828
| ) AS data1
821-
| LEFT JOIN
829+
| INNER JOIN
822830
| skewData2 AS data2
823831
|ON data1.key1 = data2.key2 LIMIT 10
824832
|""".stripMargin
825833
val (_, adaptivePlan2) = runAdaptiveAndVerifyResult(sqlText2)
826834
val innerSmj2 = findTopLevelSortMergeJoin(adaptivePlan2)
827-
checkSkewJoin(innerSmj2, 2, 0)
835+
checkSkewJoin(innerSmj2, 2, 1)
828836
}
829837
}
830838
}

0 commit comments

Comments
 (0)