Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
// since the other rules might make two separate Unions operators adjacent.
Batch("Union", Once,
CombineUnions) ::
Batch("OptimizeLimitZero", Once,
OptimizeLimitZero) ::
// Run this once earlier. This might simplify the plan and reduce cost of optimizer.
// For example, a query such as Filter(LocalRelation) would go through all the heavy
// optimizer rules that are triggered when there is a filter
Expand Down Expand Up @@ -1711,3 +1713,37 @@ object RemoveRepetitionFromGroupExpressions extends Rule[LogicalPlan] {
}
}
}

/**
* Replaces GlobalLimit 0 and LocalLimit 0 nodes (subtree) with empty Local Relation, as they don't
* return any rows.
*/
object OptimizeLimitZero extends Rule[LogicalPlan] {
// returns empty Local Relation corresponding to given plan
private def empty(plan: LogicalPlan) =
LocalRelation(plan.output, data = Seq.empty, isStreaming = plan.isStreaming)

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
// Nodes below GlobalLimit or LocalLimit can be pruned if the limit value is zero (0).
// Any subtree in the logical plan that has GlobalLimit 0 or LocalLimit 0 as its root is
// semantically equivalent to an empty relation.
//
// In such cases, the effects of Limit 0 can be propagated through the Logical Plan by replacing
// the (Global/Local) Limit subtree with an empty LocalRelation, thereby pruning the subtree
// below and triggering other optimization rules of PropagateEmptyRelation to propagate the
// changes up the Logical Plan.
//
// Replace Global Limit 0 nodes with empty Local Relation
case gl @ GlobalLimit(IntegerLiteral(0), _) =>
empty(gl)

// Note: For all SQL queries, if a LocalLimit 0 node exists in the Logical Plan, then a
// GlobalLimit 0 node would also exist. Thus, the above case would be sufficient to handle
// almost all cases. However, if a user explicitly creates a Logical Plan with LocalLimit 0 node
// then the following rule will handle that case as well.
//
// Replace Local Limit 0 nodes with empty Local Relation
case ll @ LocalLimit(IntegerLiteral(0), _) =>
empty(ll)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{Distinct, GlobalLimit, LocalLimit, LocalRelation, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.types.IntegerType

// Test class to verify correct functioning of OptimizeLimitZero rule in various scenarios
class OptimizeLimitZeroSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("OptimizeLimitZero", Once,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can have two batches here, just the same as Optimizer.

Copy link
Contributor Author

@aayushmaanjain aayushmaanjain Apr 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang Is there a specific reason for having it that way? I modelled the test suite based on PropagateEmptyRelation, wherein also they have all rules as part of the same batch..

ReplaceIntersectWithSemiJoin,
OptimizeLimitZero,
PropagateEmptyRelation) :: Nil
}

val testRelation1 = LocalRelation.fromExternalRows(Seq('a.int), data = Seq(Row(1)))
val testRelation2 = LocalRelation.fromExternalRows(Seq('b.int), data = Seq(Row(1)))

test("Limit 0: return empty local relation") {
val query = testRelation1.limit(0)

val optimized = Optimize.execute(query.analyze)
val correctAnswer = LocalRelation('a.int)

comparePlans(optimized, correctAnswer)
}

test("Limit 0: individual LocalLimit 0 node") {
val query = LocalLimit(0, testRelation1)

val optimized = Optimize.execute(query.analyze)
val correctAnswer = LocalRelation('a.int)

comparePlans(optimized, correctAnswer)
}

test("Limit 0: individual GlobalLimit 0 node") {
val query = GlobalLimit(0, testRelation1)

val optimized = Optimize.execute(query.analyze)
val correctAnswer = LocalRelation('a.int)

comparePlans(optimized, correctAnswer)
}

Seq(
(Inner, LocalRelation('a.int, 'b.int)),
(LeftOuter, Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze),
(RightOuter, LocalRelation('a.int, 'b.int)),
(FullOuter, Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze)
).foreach { case (jt, correctAnswer) =>
test(s"Limit 0: for join type $jt") {
val query = testRelation1
.join(testRelation2.limit(0), joinType = jt, condition = Some('a.attr == 'b.attr))

val optimized = Optimize.execute(query.analyze)

comparePlans(optimized, correctAnswer)
}
}

test("Limit 0: 3-way join") {
val testRelation3 = LocalRelation.fromExternalRows(Seq('c.int), data = Seq(Row(1)))

val subJoinQuery = testRelation1
.join(testRelation2, joinType = Inner, condition = Some('a.attr == 'b.attr))
val query = subJoinQuery
.join(testRelation3.limit(0), joinType = Inner, condition = Some('a.attr == 'c.attr))

val optimized = Optimize.execute(query.analyze)
val correctAnswer = LocalRelation('a.int, 'b.int, 'c.int)

comparePlans(optimized, correctAnswer)
}

test("Limit 0: intersect") {
val query = testRelation1
.intersect(testRelation1.limit(0), isAll = false)

val optimized = Optimize.execute(query.analyze)
val correctAnswer = Distinct(LocalRelation('a.int))

comparePlans(optimized, correctAnswer)
}
}