From df0528456036a41794ace1caddc95f360efd5247 Mon Sep 17 00:00:00 2001 From: Aayushmaan Jain Date: Thu, 28 Mar 2019 11:11:48 +0530 Subject: [PATCH 1/6] Add case (rule) to optimize limit 0 queries --- .../optimizer/PropagateEmptyRelation.scala | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index b19e13870aa6..913f659410bd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -108,5 +108,27 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit case Generate(_: Explode, _, _, _, _, _) => empty(p) case _ => p } + + // Nodes below GlobalLimit or LocalLimit can be pruned if the limit value is zero (0). + // Any subtree in the logical plan that has GlobalLimit 0 or LocalLimit 0 as its root is + // semantically equivalent to an empty relation. + // + // In such cases, the effects of Limit 0 can be propagated through the Logical Plan by replacing + // the (Global/Local) Limit subtree with an empty LocalRelation, thereby pruning the subtree + // below and triggering other optimization rules of PropagateEmptyRelation to propagate the + // changes up the Logical Plan. + // + // Replace Global Limit 0 nodes with empty Local Relation + case p @ GlobalLimit(IntegerLiteral(limit), _) if limit == 0 => + empty(p) + + // Note: For all SQL queries, if a LocalLimit 0 node exists in the Logical Plan, then a + // GlobalLimit 0 node would also exist. Thus, the above case would be sufficient to handle + // almost all cases. However, if a user explicitly creates a Logical Plan with LocalLimit 0 node + // then the following rule will handle that case as well. + // + // Replace Local Limit 0 nodes with empty Local Relation + case p @ LocalLimit(IntegerLiteral(limit), _) if limit == 0 => + empty(p) } } From 84f944cfed2bd997b768960a93dfb662f650c3f8 Mon Sep 17 00:00:00 2001 From: Aayushmaan Jain Date: Thu, 28 Mar 2019 11:41:28 +0530 Subject: [PATCH 2/6] Add unit tests to verify limit 0 optimization --- .../PropagateEmptyRelationSuite.scala | 78 ++++++++++++++++++- 1 file changed, 77 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala index d395bba105a7..9bb4bed75dfa 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala @@ -23,7 +23,8 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.{Distinct, GlobalLimit, LocalLimit, + LocalRelation, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.types.{IntegerType, StructType} @@ -221,4 +222,79 @@ class PropagateEmptyRelationSuite extends PlanTest { val optimized = Optimize.execute(query.analyze) assert(optimized.resolved) } + + test("Limit 0: return empty local relation") { + val query = testRelation1.limit(0) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = LocalRelation('a.int) + + comparePlans(optimized, correctAnswer) + } + + test("Limit 0: individual LocalLimit 0 node") { + val query = LocalLimit(0, testRelation1) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = LocalRelation('a.int) + + comparePlans(optimized, correctAnswer) + } + + test("Limit 0: individual GlobalLimit 0 node") { + val query = GlobalLimit(0, testRelation1) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = LocalRelation('a.int) + + comparePlans(optimized, correctAnswer) + } + + test("Limit 0: Joins") { + val testcases = Seq( + (Inner, Some(LocalRelation('a.int, 'b.int))), + (LeftOuter, + Some(Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze)), + (RightOuter, Some(LocalRelation('a.int, 'b.int))), + (FullOuter, + Some(Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze)) + ) + + testcases.foreach { case (jt, answer) => + val query = testRelation1 + .join(testRelation2.limit(0), joinType = jt, condition = Some('a.attr == 'b.attr)) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = + answer.getOrElse(OptimizeWithoutPropagateEmptyRelation.execute(query.analyze)) + + comparePlans(optimized, correctAnswer) + } + } + + test("Limit 0: 3-way join") { + val testRelation3 = LocalRelation.fromExternalRows(Seq('c.int), data = Seq(Row(1))) + + val subJoinQuery = testRelation1 + .join(testRelation2, joinType = Inner, condition = Some('a.attr == 'b.attr)) + val query = subJoinQuery + .join(testRelation3.limit(0), joinType = Inner, condition = Some('a.attr == 'c.attr)) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = Some(LocalRelation('a.int, 'b.int, 'c.int)) + .getOrElse(OptimizeWithoutPropagateEmptyRelation.execute(query.analyze)) + + comparePlans(optimized, correctAnswer) + } + + test("Limit 0: intersect") { + val query = testRelation1 + .intersect(testRelation1.limit(0), isAll = false) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = Distinct(Some(LocalRelation('a.int)) + .getOrElse(OptimizeWithoutPropagateEmptyRelation.execute(query.analyze))) + + comparePlans(optimized, correctAnswer) + } } From d5fed7171c7094fc653b17b32aba554c492cb8f3 Mon Sep 17 00:00:00 2001 From: Aayushmaan Jain Date: Thu, 28 Mar 2019 12:23:16 +0530 Subject: [PATCH 3/6] Lint fix --- .../sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala index 9bb4bed75dfa..4c43d612446e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala @@ -23,8 +23,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.{Distinct, GlobalLimit, LocalLimit, - LocalRelation, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.{Distinct, GlobalLimit, LocalLimit, LocalRelation, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.types.{IntegerType, StructType} From 5dd33f8bfce28d3b051273760e323da92c6b2138 Mon Sep 17 00:00:00 2001 From: Aayushmaan Jain Date: Wed, 3 Apr 2019 15:11:32 +0530 Subject: [PATCH 4/6] added new rule for Limit 0 optimization --- .../sql/catalyst/optimizer/Optimizer.scala | 36 ++++++ .../optimizer/PropagateEmptyRelation.scala | 22 ---- .../optimizer/OptimizeLimitZeroSuite.scala | 112 ++++++++++++++++++ .../PropagateEmptyRelationSuite.scala | 77 +----------- 4 files changed, 149 insertions(+), 98 deletions(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index d4eb516534f1..819f40aabe21 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -172,6 +172,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) // optimizer rules that are triggered when there is a filter // (e.g. InferFiltersFromConstraints). If we run this batch earlier, the query becomes just // LocalRelation and does not trigger many rules. + Batch("OptimizeLimitZero", Once, + OptimizeLimitZero) :: Batch("LocalRelation early", fixedPoint, ConvertToLocalRelation, PropagateEmptyRelation) :: @@ -1711,3 +1713,37 @@ object RemoveRepetitionFromGroupExpressions extends Rule[LogicalPlan] { } } } + +/** + * Replaces GlobalLimit 0 and LocalLimit 0 nodes (subtree) with empty Local Relation, as they don't + * return any rows. + */ +object OptimizeLimitZero extends Rule[LogicalPlan] { + // returns empty Local Relation corresponding to given plan + private def empty(plan: LogicalPlan) = + LocalRelation(plan.output, data = Seq.empty, isStreaming = plan.isStreaming) + + def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { + // Nodes below GlobalLimit or LocalLimit can be pruned if the limit value is zero (0). + // Any subtree in the logical plan that has GlobalLimit 0 or LocalLimit 0 as its root is + // semantically equivalent to an empty relation. + // + // In such cases, the effects of Limit 0 can be propagated through the Logical Plan by replacing + // the (Global/Local) Limit subtree with an empty LocalRelation, thereby pruning the subtree + // below and triggering other optimization rules of PropagateEmptyRelation to propagate the + // changes up the Logical Plan. + // + // Replace Global Limit 0 nodes with empty Local Relation + case gl @ GlobalLimit(IntegerLiteral(limit), _) if limit == 0 => + empty(gl) + + // Note: For all SQL queries, if a LocalLimit 0 node exists in the Logical Plan, then a + // GlobalLimit 0 node would also exist. Thus, the above case would be sufficient to handle + // almost all cases. However, if a user explicitly creates a Logical Plan with LocalLimit 0 node + // then the following rule will handle that case as well. + // + // Replace Local Limit 0 nodes with empty Local Relation + case ll @ LocalLimit(IntegerLiteral(limit), _) if limit == 0 => + empty(ll) + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index 913f659410bd..b19e13870aa6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -108,27 +108,5 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit case Generate(_: Explode, _, _, _, _, _) => empty(p) case _ => p } - - // Nodes below GlobalLimit or LocalLimit can be pruned if the limit value is zero (0). - // Any subtree in the logical plan that has GlobalLimit 0 or LocalLimit 0 as its root is - // semantically equivalent to an empty relation. - // - // In such cases, the effects of Limit 0 can be propagated through the Logical Plan by replacing - // the (Global/Local) Limit subtree with an empty LocalRelation, thereby pruning the subtree - // below and triggering other optimization rules of PropagateEmptyRelation to propagate the - // changes up the Logical Plan. - // - // Replace Global Limit 0 nodes with empty Local Relation - case p @ GlobalLimit(IntegerLiteral(limit), _) if limit == 0 => - empty(p) - - // Note: For all SQL queries, if a LocalLimit 0 node exists in the Logical Plan, then a - // GlobalLimit 0 node would also exist. Thus, the above case would be sufficient to handle - // almost all cases. However, if a user explicitly creates a Logical Plan with LocalLimit 0 node - // then the following rule will handle that case as well. - // - // Replace Local Limit 0 nodes with empty Local Relation - case p @ LocalLimit(IntegerLiteral(limit), _) if limit == 0 => - empty(p) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala new file mode 100644 index 000000000000..b8d8a19a4199 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.{Distinct, GlobalLimit, LocalLimit, LocalRelation, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.types.IntegerType + +// Test class to verify correct functioning of OptimizeLimitZero rule in various scenarios +class OptimizeLimitZeroSuite extends PlanTest { + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = + Batch("OptimizeLimitZero", Once, + ReplaceIntersectWithSemiJoin, + OptimizeLimitZero, + PropagateEmptyRelation) :: Nil + } + + val testRelation1 = LocalRelation.fromExternalRows(Seq('a.int), data = Seq(Row(1))) + val testRelation2 = LocalRelation.fromExternalRows(Seq('b.int), data = Seq(Row(1))) + + + test("Limit 0: return empty local relation") { + val query = testRelation1.limit(0) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = LocalRelation('a.int) + + comparePlans(optimized, correctAnswer) + } + + test("Limit 0: individual LocalLimit 0 node") { + val query = LocalLimit(0, testRelation1) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = LocalRelation('a.int) + + comparePlans(optimized, correctAnswer) + } + + test("Limit 0: individual GlobalLimit 0 node") { + val query = GlobalLimit(0, testRelation1) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = LocalRelation('a.int) + + comparePlans(optimized, correctAnswer) + } + + test("Limit 0: Joins") { + val testcases = Seq( + (Inner, LocalRelation('a.int, 'b.int)), + (LeftOuter, Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze), + (RightOuter, LocalRelation('a.int, 'b.int)), + (FullOuter, Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze) + ) + + testcases.foreach { case (jt, answer) => + val query = testRelation1 + .join(testRelation2.limit(0), joinType = jt, condition = Some('a.attr == 'b.attr)) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = answer + + comparePlans(optimized, correctAnswer) + } + } + + test("Limit 0: 3-way join") { + val testRelation3 = LocalRelation.fromExternalRows(Seq('c.int), data = Seq(Row(1))) + + val subJoinQuery = testRelation1 + .join(testRelation2, joinType = Inner, condition = Some('a.attr == 'b.attr)) + val query = subJoinQuery + .join(testRelation3.limit(0), joinType = Inner, condition = Some('a.attr == 'c.attr)) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = LocalRelation('a.int, 'b.int, 'c.int) + + comparePlans(optimized, correctAnswer) + } + + test("Limit 0: intersect") { + val query = testRelation1 + .intersect(testRelation1.limit(0), isAll = false) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = Distinct(LocalRelation('a.int)) + + comparePlans(optimized, correctAnswer) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala index 4c43d612446e..d395bba105a7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.{Distinct, GlobalLimit, LocalLimit, LocalRelation, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.types.{IntegerType, StructType} @@ -221,79 +221,4 @@ class PropagateEmptyRelationSuite extends PlanTest { val optimized = Optimize.execute(query.analyze) assert(optimized.resolved) } - - test("Limit 0: return empty local relation") { - val query = testRelation1.limit(0) - - val optimized = Optimize.execute(query.analyze) - val correctAnswer = LocalRelation('a.int) - - comparePlans(optimized, correctAnswer) - } - - test("Limit 0: individual LocalLimit 0 node") { - val query = LocalLimit(0, testRelation1) - - val optimized = Optimize.execute(query.analyze) - val correctAnswer = LocalRelation('a.int) - - comparePlans(optimized, correctAnswer) - } - - test("Limit 0: individual GlobalLimit 0 node") { - val query = GlobalLimit(0, testRelation1) - - val optimized = Optimize.execute(query.analyze) - val correctAnswer = LocalRelation('a.int) - - comparePlans(optimized, correctAnswer) - } - - test("Limit 0: Joins") { - val testcases = Seq( - (Inner, Some(LocalRelation('a.int, 'b.int))), - (LeftOuter, - Some(Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze)), - (RightOuter, Some(LocalRelation('a.int, 'b.int))), - (FullOuter, - Some(Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze)) - ) - - testcases.foreach { case (jt, answer) => - val query = testRelation1 - .join(testRelation2.limit(0), joinType = jt, condition = Some('a.attr == 'b.attr)) - - val optimized = Optimize.execute(query.analyze) - val correctAnswer = - answer.getOrElse(OptimizeWithoutPropagateEmptyRelation.execute(query.analyze)) - - comparePlans(optimized, correctAnswer) - } - } - - test("Limit 0: 3-way join") { - val testRelation3 = LocalRelation.fromExternalRows(Seq('c.int), data = Seq(Row(1))) - - val subJoinQuery = testRelation1 - .join(testRelation2, joinType = Inner, condition = Some('a.attr == 'b.attr)) - val query = subJoinQuery - .join(testRelation3.limit(0), joinType = Inner, condition = Some('a.attr == 'c.attr)) - - val optimized = Optimize.execute(query.analyze) - val correctAnswer = Some(LocalRelation('a.int, 'b.int, 'c.int)) - .getOrElse(OptimizeWithoutPropagateEmptyRelation.execute(query.analyze)) - - comparePlans(optimized, correctAnswer) - } - - test("Limit 0: intersect") { - val query = testRelation1 - .intersect(testRelation1.limit(0), isAll = false) - - val optimized = Optimize.execute(query.analyze) - val correctAnswer = Distinct(Some(LocalRelation('a.int)) - .getOrElse(OptimizeWithoutPropagateEmptyRelation.execute(query.analyze))) - - comparePlans(optimized, correctAnswer) - } } From ce3e9368c53eb362af1ee1f2fc99723ac3e68a8b Mon Sep 17 00:00:00 2001 From: Aayushmaan Jain Date: Thu, 4 Apr 2019 12:11:46 +0530 Subject: [PATCH 5/6] PR fixes --- .../sql/catalyst/optimizer/Optimizer.scala | 4 +-- .../optimizer/OptimizeLimitZeroSuite.scala | 33 +++++++++---------- 2 files changed, 17 insertions(+), 20 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 819f40aabe21..d0bb1ba1ca6b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -167,13 +167,13 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) // since the other rules might make two separate Unions operators adjacent. Batch("Union", Once, CombineUnions) :: + Batch("OptimizeLimitZero", Once, + OptimizeLimitZero) :: // Run this once earlier. This might simplify the plan and reduce cost of optimizer. // For example, a query such as Filter(LocalRelation) would go through all the heavy // optimizer rules that are triggered when there is a filter // (e.g. InferFiltersFromConstraints). If we run this batch earlier, the query becomes just // LocalRelation and does not trigger many rules. - Batch("OptimizeLimitZero", Once, - OptimizeLimitZero) :: Batch("LocalRelation early", fixedPoint, ConvertToLocalRelation, PropagateEmptyRelation) :: diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala index b8d8a19a4199..4b4b2d74b3db 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala @@ -39,7 +39,6 @@ class OptimizeLimitZeroSuite extends PlanTest { val testRelation1 = LocalRelation.fromExternalRows(Seq('a.int), data = Seq(Row(1))) val testRelation2 = LocalRelation.fromExternalRows(Seq('b.int), data = Seq(Row(1))) - test("Limit 0: return empty local relation") { val query = testRelation1.limit(0) @@ -67,23 +66,21 @@ class OptimizeLimitZeroSuite extends PlanTest { comparePlans(optimized, correctAnswer) } - test("Limit 0: Joins") { - val testcases = Seq( - (Inner, LocalRelation('a.int, 'b.int)), - (LeftOuter, Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze), - (RightOuter, LocalRelation('a.int, 'b.int)), - (FullOuter, Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze) - ) - - testcases.foreach { case (jt, answer) => - val query = testRelation1 - .join(testRelation2.limit(0), joinType = jt, condition = Some('a.attr == 'b.attr)) - - val optimized = Optimize.execute(query.analyze) - val correctAnswer = answer - - comparePlans(optimized, correctAnswer) - } + Seq( + (Inner, LocalRelation('a.int, 'b.int)), + (LeftOuter, Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze), + (RightOuter, LocalRelation('a.int, 'b.int)), + (FullOuter, Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze) + ).foreach { case (jt, answer) => + test(s"Limit 0: for join type $jt") { + val query = testRelation1 + .join(testRelation2.limit(0), joinType = jt, condition = Some('a.attr == 'b.attr)) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = answer + + comparePlans(optimized, correctAnswer) + } } test("Limit 0: 3-way join") { From 1936ec6cd7a6ec95c29177b5c3f02019b1729d25 Mon Sep 17 00:00:00 2001 From: Aayushmaan Jain Date: Thu, 4 Apr 2019 14:55:45 +0530 Subject: [PATCH 6/6] PR fixes --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 6 +++--- .../sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala | 3 +-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index d0bb1ba1ca6b..cca1d3c37d23 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1716,7 +1716,7 @@ object RemoveRepetitionFromGroupExpressions extends Rule[LogicalPlan] { /** * Replaces GlobalLimit 0 and LocalLimit 0 nodes (subtree) with empty Local Relation, as they don't - * return any rows. + * return any rows. */ object OptimizeLimitZero extends Rule[LogicalPlan] { // returns empty Local Relation corresponding to given plan @@ -1734,7 +1734,7 @@ object OptimizeLimitZero extends Rule[LogicalPlan] { // changes up the Logical Plan. // // Replace Global Limit 0 nodes with empty Local Relation - case gl @ GlobalLimit(IntegerLiteral(limit), _) if limit == 0 => + case gl @ GlobalLimit(IntegerLiteral(0), _) => empty(gl) // Note: For all SQL queries, if a LocalLimit 0 node exists in the Logical Plan, then a @@ -1743,7 +1743,7 @@ object OptimizeLimitZero extends Rule[LogicalPlan] { // then the following rule will handle that case as well. // // Replace Local Limit 0 nodes with empty Local Relation - case ll @ LocalLimit(IntegerLiteral(limit), _) if limit == 0 => + case ll @ LocalLimit(IntegerLiteral(0), _) => empty(ll) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala index 4b4b2d74b3db..cf875efc62c9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala @@ -71,13 +71,12 @@ class OptimizeLimitZeroSuite extends PlanTest { (LeftOuter, Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze), (RightOuter, LocalRelation('a.int, 'b.int)), (FullOuter, Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze) - ).foreach { case (jt, answer) => + ).foreach { case (jt, correctAnswer) => test(s"Limit 0: for join type $jt") { val query = testRelation1 .join(testRelation2.limit(0), joinType = jt, condition = Some('a.attr == 'b.attr)) val optimized = Optimize.execute(query.analyze) - val correctAnswer = answer comparePlans(optimized, correctAnswer) }