From deb18093b1a1b1c6b7e6ad1fd148448b761297ea Mon Sep 17 00:00:00 2001 From: guoxiaolong Date: Wed, 21 Nov 2018 21:36:24 +0800 Subject: [PATCH 1/4] [SPARK-26138][SQL] LimitPushDown cross join requires maybeBushLocalLimit --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 8d251eeab8484..24196c517f3db 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -459,6 +459,7 @@ object LimitPushDown extends Rule[LogicalPlan] { val newJoin = joinType match { case RightOuter => join.copy(right = maybePushLocalLimit(exp, right)) case LeftOuter => join.copy(left = maybePushLocalLimit(exp, left)) + case Cross => join.copy(left = maybePushLocalLimit(exp, left)) case _ => join } LocalLimit(exp, newJoin) From a0b9cb48348b1ccd42ddaef5f0e04ee28e20f39b Mon Sep 17 00:00:00 2001 From: guoxiaolong Date: Thu, 22 Nov 2018 19:19:59 +0800 Subject: [PATCH 2/4] [SPARK-26138][SQL] add three UTs --- .../optimizer/LimitPushdownSuite.scala | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala index 17fb9fc5d11e3..7ef13265e9c1e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions.Add -import org.apache.spark.sql.catalyst.plans.{FullOuter, LeftOuter, PlanTest, RightOuter} +import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, LeftOuter, PlanTest, RightOuter} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ @@ -171,4 +171,25 @@ class LimitPushdownSuite extends PlanTest { // No pushdown for FULL OUTER JOINS. comparePlans(optimized, originalQuery) } + + test("cross join") { + val originalQuery = x.join(y, Cross).limit(1) + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = Limit(1, LocalLimit(1, x).join(y, Cross)).analyze + comparePlans(optimized, correctAnswer) + } + + test("cross join and left sides are limited") { + val originalQuery = x.limit(2).join(y, Cross).limit(1) + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = Limit(1, LocalLimit(1, x).join(y, Cross)).analyze + comparePlans(optimized, correctAnswer) + } + + test("cross join and right sides are limited") { + val originalQuery = x.join(y.limit(2), Cross).limit(1) + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = Limit(1, LocalLimit(1, x).join(Limit(2, y), Cross)).analyze + comparePlans(optimized, correctAnswer) + } } From 588c1513906a7a5e5406b41fb1b25feaadcc74be Mon Sep 17 00:00:00 2001 From: guoxiaolong Date: Fri, 23 Nov 2018 12:15:30 +0800 Subject: [PATCH 3/4] [SPARK-26138][SQL] Push limit to the right side --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- .../spark/sql/catalyst/optimizer/LimitPushdownSuite.scala | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 24196c517f3db..94af9e053d319 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -459,7 +459,7 @@ object LimitPushDown extends Rule[LogicalPlan] { val newJoin = joinType match { case RightOuter => join.copy(right = maybePushLocalLimit(exp, right)) case LeftOuter => join.copy(left = maybePushLocalLimit(exp, left)) - case Cross => join.copy(left = maybePushLocalLimit(exp, left)) + case Cross => join.copy(left = maybePushLocalLimit(exp, left), right = maybePushLocalLimit(exp, right)) case _ => join } LocalLimit(exp, newJoin) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala index 7ef13265e9c1e..792c782fbc5b5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala @@ -175,21 +175,21 @@ class LimitPushdownSuite extends PlanTest { test("cross join") { val originalQuery = x.join(y, Cross).limit(1) val optimized = Optimize.execute(originalQuery.analyze) - val correctAnswer = Limit(1, LocalLimit(1, x).join(y, Cross)).analyze + val correctAnswer = Limit(1, LocalLimit(1, x).join(LocalLimit(1, y), Cross)).analyze comparePlans(optimized, correctAnswer) } test("cross join and left sides are limited") { val originalQuery = x.limit(2).join(y, Cross).limit(1) val optimized = Optimize.execute(originalQuery.analyze) - val correctAnswer = Limit(1, LocalLimit(1, x).join(y, Cross)).analyze + val correctAnswer = Limit(1, LocalLimit(1, x).join(LocalLimit(1, y), Cross)).analyze comparePlans(optimized, correctAnswer) } test("cross join and right sides are limited") { val originalQuery = x.join(y.limit(2), Cross).limit(1) val optimized = Optimize.execute(originalQuery.analyze) - val correctAnswer = Limit(1, LocalLimit(1, x).join(Limit(2, y), Cross)).analyze + val correctAnswer = Limit(1, LocalLimit(1, x).join(LocalLimit(1, y), Cross)).analyze comparePlans(optimized, correctAnswer) } } From e17396214261cdf3a842a977b1065619e4e8fb31 Mon Sep 17 00:00:00 2001 From: guoxiaolong Date: Tue, 18 Dec 2018 13:37:12 +0800 Subject: [PATCH 4/4] [SPARK-26138][SQL] Fix the scala style. File line length exceeds 100 characters. --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 94af9e053d319..96fcdfb052d2b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -459,7 +459,8 @@ object LimitPushDown extends Rule[LogicalPlan] { val newJoin = joinType match { case RightOuter => join.copy(right = maybePushLocalLimit(exp, right)) case LeftOuter => join.copy(left = maybePushLocalLimit(exp, left)) - case Cross => join.copy(left = maybePushLocalLimit(exp, left), right = maybePushLocalLimit(exp, right)) + case Cross => join.copy(left = maybePushLocalLimit(exp, left), + right = maybePushLocalLimit(exp, right)) case _ => join } LocalLimit(exp, newJoin)