Skip to content

Commit fa6050a

Browse files
address comments
1 parent e6b0bb3 commit fa6050a

File tree

3 files changed

+14
-5
lines changed

3 files changed

+14
-5
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1256,7 +1256,7 @@ object SQLConf {
12561256
val REMOVE_REDUNDANT_SORTS_ENABLED = buildConf("spark.sql.execution.removeRedundantSorts")
12571257
.internal()
12581258
.doc("Whether to remove redundant physical sort node")
1259-
.version("2.4.8")
1259+
.version("3.1.0")
12601260
.booleanConf
12611261
.createWithDefault(true)
12621262

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,8 @@ object QueryExecution {
344344
PlanSubqueries,
345345
RemoveRedundantProjects,
346346
EnsureRequirements,
347+
// `RemoveRedundantSorts` needs to be added before `EnsureRequirements` to guarantee the same
348+
// number of partitions when instantiating PartitioningCollection.
347349
RemoveRedundantSorts,
348350
DisableUnnecessaryBucketedScan,
349351
ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.columnarRules),

sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
package org.apache.spark.sql.execution
1919

2020
import org.apache.spark.sql.{DataFrame, QueryTest}
21+
import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning
2122
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
23+
import org.apache.spark.sql.execution.joins.ShuffledJoin
2224
import org.apache.spark.sql.internal.SQLConf
2325
import org.apache.spark.sql.test.SharedSparkSession
2426

@@ -136,13 +138,11 @@ abstract class RemoveRedundantSortsSuiteBase
136138
}
137139
}
138140

139-
test("shuffled join with different left and right side partition numbers") {
141+
test("SPARK-33472: shuffled join with different left and right side partition numbers") {
140142
withTempView("t1", "t2") {
141143
spark.range(0, 100, 1, 2).select('id as "key").createOrReplaceTempView("t1")
142144
(0 to 100).toDF("key").createOrReplaceTempView("t2")
143145

144-
// left side partitioning: RangePartitioning(key ASC, 2)
145-
// right side partitioning: UnknownPartitioning(0)
146146
val queryTemplate = """
147147
|SELECT /*+ %s(t1) */ t1.key
148148
|FROM t1 JOIN t2 ON t1.key = t2.key
@@ -151,7 +151,14 @@ abstract class RemoveRedundantSortsSuiteBase
151151
""".stripMargin
152152

153153
Seq(("MERGE", 3), ("SHUFFLE_HASH", 1)).foreach { case (hint, count) =>
154-
checkSorts(queryTemplate.format(hint), count, count)
154+
val query = queryTemplate.format(hint)
155+
val df = sql(query)
156+
val sparkPlan = df.queryExecution.sparkPlan
157+
val join = sparkPlan.collect { case j: ShuffledJoin => j }.head
158+
val range = sparkPlan.collect { case r: RangeExec => r }.head
159+
assert(join.left.outputPartitioning == range.outputPartitioning)
160+
assert(join.right.outputPartitioning == UnknownPartitioning(0))
161+
checkSorts(query, count, count)
155162
}
156163
}
157164
}

0 commit comments

Comments
 (0)