From 2c1c4dd9765f5368eab00bd1a995e41253741a99 Mon Sep 17 00:00:00 2001 From: xzhou <15210830305@163.com> Date: Wed, 26 Jan 2022 19:22:00 +0800 Subject: [PATCH 1/2] optimize TransposeWindow rule --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 6 +++--- .../optimizer/TransposeWindowSuite.scala | 16 ++++++++++++++++ 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index c0f6e75ae4bfa..2415646acbbd9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1257,9 +1257,9 @@ object CollapseWindow extends Rule[LogicalPlan] { */ object TransposeWindow extends Rule[LogicalPlan] { private def compatiblePartitions(ps1 : Seq[Expression], ps2: Seq[Expression]): Boolean = { - ps1.length < ps2.length && ps2.take(ps1.length).permutations.exists(ps1.zip(_).forall { - case (l, r) => l.semanticEquals(r) - }) + ps1.length < ps2.length && ps1.forall { expr1 => + ps2.exists(expr1.semanticEquals) + } } private def windowsCompatible(w1: Window, w2: Window): Boolean = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala index 60869d7f94842..66a362aaae0cf 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala @@ -143,4 +143,20 @@ class TransposeWindowSuite extends PlanTest { comparePlans(optimized, analyzed) } + test("transpose two adjacent windows with compatible partitions which is not a prefix") { + val query = testRelation + .window(Seq(sum(c).as('sum_a_2)), partitionSpec4, orderSpec2) + .window(Seq(sum(c).as('sum_a_1)), partitionSpec3, orderSpec1) + + val analyzed = query.analyze + val optimized = Optimize.execute(analyzed) + + val correctAnswer = testRelation + .window(Seq(sum(c).as('sum_a_1)), partitionSpec3, orderSpec1) + .window(Seq(sum(c).as('sum_a_2)), partitionSpec4, orderSpec2) + .select('a, 'b, 'c, 'd, 'sum_a_2, 'sum_a_1) + + comparePlans(optimized, correctAnswer.analyze) + } + } From 0d8e472dea4d48f78734af31f7ffcc05e88775a4 Mon Sep 17 00:00:00 2001 From: xzhou <15210830305@163.com> Date: Thu, 4 Aug 2022 11:20:51 +0800 Subject: [PATCH 2/2] modify UT description --- .../spark/sql/catalyst/optimizer/TransposeWindowSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala index 66a362aaae0cf..a9796141c0c7e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TransposeWindowSuite.scala @@ -143,7 +143,8 @@ class TransposeWindowSuite extends PlanTest { comparePlans(optimized, analyzed) } - test("transpose two adjacent windows with compatible partitions which is not a prefix") { + test("SPARK-38034: transpose two adjacent windows with compatible partitions " + + "which is not a prefix") { val query = testRelation .window(Seq(sum(c).as('sum_a_2)), partitionSpec4, orderSpec2) .window(Seq(sum(c).as('sum_a_1)), partitionSpec3, orderSpec1)