Skip to content

Commit 369b014

Browse files
xzhoucloud-fan
authored andcommitted
[SPARK-38034][SQL] Optimize TransposeWindow rule
### What changes were proposed in this pull request? Optimize the TransposeWindow rule to extend applicable cases and optimize time complexity. TransposeWindow rule will try to eliminate unnecessary shuffle: but the function compatiblePartitions will only take the first n elements of the window2 partition sequence, for some cases, this will not take effect, like the case below:  val df = spark.range(10).selectExpr("id AS a", "id AS b", "id AS c", "id AS d") df.selectExpr( "sum(`d`) OVER(PARTITION BY `b`,`a`) as e", "sum(`c`) OVER(PARTITION BY `a`) as f" ).explain Current plan == Physical Plan == *(5) Project [e#10L, f#11L] +- Window [sum(c#4L) windowspecdefinition(a#2L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS f#11L], [a#2L]    +- *(4) Sort [a#2L ASC NULLS FIRST], false, 0       +- Exchange hashpartitioning(a#2L, 200), true, [id=#41]          +- *(3) Project [a#2L, c#4L, e#10L]             +- Window [sum(d#5L) windowspecdefinition(b#3L, a#2L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS e#10L], [b#3L, a#2L]                +- *(2) Sort [b#3L ASC NULLS FIRST, a#2L ASC NULLS FIRST], false, 0                   +- Exchange hashpartitioning(b#3L, a#2L, 200), true, [id=#33]                      +- *(1) Project [id#0L AS d#5L, id#0L AS b#3L, id#0L AS a#2L, id#0L AS c#4L]                         +- *(1) Range (0, 10, step=1, splits=10) Expected plan: == Physical Plan == *(4) Project [e#924L, f#925L] +- Window [sum(d#43L) windowspecdefinition(b#41L, a#40L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS e#924L], [b#41L, a#40L]  +- *(3) Sort [b#41L ASC NULLS FIRST, a#40L ASC NULLS FIRST], false, 0       +- *(3) Project [d#43L, b#41L, a#40L, f#925L]          +- Window [sum(c#42L) windowspecdefinition(a#40L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS f#925L], [a#40L]             +- *(2) Sort [a#40L ASC NULLS FIRST], false, 0                +- Exchange hashpartitioning(a#40L, 200), true, [id=#282]                   +- *(1) Project [id#38L AS d#43L, id#38L AS b#41L, id#38L AS a#40L, id#38L AS c#42L]                      +- *(1) Range (0, 10, step=1, splits=10) Also the permutations method has a O(n!) time complexity, which is very expensive when there are many partition columns, we could try to optimize it. ### Why are the changes needed? We could apply the rule for more cases, which could improve the execution performance by eliminate unnecessary shuffle, and by reducing the time complexity from O(n!) to O(n2), the performance for the rule itself could improve ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? UT Closes apache#35334 from constzhou/SPARK-38034_optimize_transpose_window_rule. Authored-by: xzhou <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 0cc331d) Signed-off-by: Wenchen Fan <[email protected]>
1 parent 26f0d50 commit 369b014

File tree

2 files changed

+20
-3
lines changed

2 files changed

+20
-3
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1176,9 +1176,9 @@ object CollapseWindow extends Rule[LogicalPlan] {
11761176
*/
11771177
object TransposeWindow extends Rule[LogicalPlan] {
11781178
private def compatiblePartitions(ps1 : Seq[Expression], ps2: Seq[Expression]): Boolean = {
1179-
ps1.length < ps2.length && ps2.take(ps1.length).permutations.exists(ps1.zip(_).forall {
1180-
case (l, r) => l.semanticEquals(r)
1181-
})
1179+
ps1.length < ps2.length && ps1.forall { expr1 =>
1180+
ps2.exists(expr1.semanticEquals)
1181+
}
11821182
}
11831183

11841184
private def windowsCompatible(w1: Window, w2: Window): Boolean = {

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,4 +142,21 @@ class TransposeWindowSuite extends PlanTest {
142142
comparePlans(optimized, analyzed)
143143
}
144144

145+
test("SPARK-38034: transpose two adjacent windows with compatible partitions " +
146+
"which is not a prefix") {
147+
val query = testRelation
148+
.window(Seq(sum(c).as('sum_a_2)), partitionSpec4, orderSpec2)
149+
.window(Seq(sum(c).as('sum_a_1)), partitionSpec3, orderSpec1)
150+
151+
val analyzed = query.analyze
152+
val optimized = Optimize.execute(analyzed)
153+
154+
val correctAnswer = testRelation
155+
.window(Seq(sum(c).as('sum_a_1)), partitionSpec3, orderSpec1)
156+
.window(Seq(sum(c).as('sum_a_2)), partitionSpec4, orderSpec2)
157+
.select('a, 'b, 'c, 'd, 'sum_a_2, 'sum_a_1)
158+
159+
comparePlans(optimized, correctAnswer.analyze)
160+
}
161+
145162
}

0 commit comments

Comments
 (0)