Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
OptimizeWindowFunctions,
CollapseWindow,
CombineFilters,
CombineLimits,
EliminateLimits,
CombineUnions,
// Constant folding and strength reduction
TransposeWindow,
Expand Down Expand Up @@ -1451,11 +1451,20 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
}

/**
* Combines two adjacent [[Limit]] operators into one, merging the
* expressions into one single expression.
* This rule optimizes Limit operators by:
* 1. Eliminate [[Limit]] operators if it's child max row <= limit.
* 2. Combines two adjacent [[Limit]] operators into one, merging the
* expressions into one single expression.
*/
object CombineLimits extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
object EliminateLimits extends Rule[LogicalPlan] {
private def canEliminate(limitExpr: Expression, child: LogicalPlan): Boolean = {
limitExpr.foldable && child.maxRows.exists { _ <= limitExpr.eval().asInstanceOf[Int] }
}

def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
case Limit(l, child) if canEliminate(l, child) =>
child

case GlobalLimit(le, GlobalLimit(ne, grandChild)) =>
GlobalLimit(Least(Seq(ne, le)), grandChild)
case LocalLimit(le, LocalLimit(ne, grandChild)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ class CombiningLimitsSuite extends PlanTest {
Batch("Column Pruning", FixedPoint(100),
ColumnPruning,
RemoveNoopOperators) ::
Batch("Combine Limit", FixedPoint(10),
CombineLimits) ::
Batch("Eliminate Limit", FixedPoint(10),
EliminateLimits) ::
Batch("Constant Folding", FixedPoint(10),
NullPropagation,
ConstantFolding,
Expand Down Expand Up @@ -90,4 +90,31 @@ class CombiningLimitsSuite extends PlanTest {

comparePlans(optimized, correctAnswer)
}

test("SPARK-33442: Change Combine Limit to Eliminate limit using max row") {
// test child max row <= limit.
val query1 = testRelation.select().groupBy()(count(1)).limit(1).analyze
val optimized1 = Optimize.execute(query1)
val expected1 = testRelation.select().groupBy()(count(1)).analyze
comparePlans(optimized1, expected1)

// test child max row > limit.
val query2 = testRelation.select().groupBy()(count(1)).limit(0).analyze
val optimized2 = Optimize.execute(query2)
comparePlans(optimized2, query2)

// test child max row is none
val query3 = testRelation.select(Symbol("a")).limit(1).analyze
val optimized3 = Optimize.execute(query3)
comparePlans(optimized3, query3)

// test sort after limit
val query4 = testRelation.select().groupBy()(count(1))
.orderBy(count(1).asc).limit(1).analyze
val optimized4 = Optimize.execute(query4)
// the top project has been removed, so we need optimize expected too
val expected4 = Optimize.execute(
testRelation.select().groupBy()(count(1)).orderBy(count(1).asc).analyze)
comparePlans(optimized4, expected4)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class LimitPushdownSuite extends PlanTest {
EliminateSubqueryAliases) ::
Batch("Limit pushdown", FixedPoint(100),
LimitPushDown,
CombineLimits,
EliminateLimits,
ConstantFolding,
BooleanSimplification) :: Nil
}
Expand Down Expand Up @@ -74,7 +74,7 @@ class LimitPushdownSuite extends PlanTest {
Union(testRelation.limit(1), testRelation2.select('d, 'e, 'f).limit(1)).limit(2)
val unionOptimized = Optimize.execute(unionQuery.analyze)
val unionCorrectAnswer =
Limit(2, Union(testRelation.limit(1), testRelation2.select('d, 'e, 'f).limit(1))).analyze
Union(testRelation.limit(1), testRelation2.select('d, 'e, 'f).limit(1)).analyze
comparePlans(unionOptimized, unionCorrectAnswer)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
== Physical Plan ==
TakeOrderedAndProject (44)
* Sort (44)
+- * HashAggregate (43)
+- Exchange (42)
+- * HashAggregate (41)
Expand Down Expand Up @@ -244,7 +244,7 @@ Functions [3]: [sum(UnscaledValue(cs_ext_ship_cost#6)), sum(UnscaledValue(cs_net
Aggregate Attributes [3]: [sum(UnscaledValue(cs_ext_ship_cost#6))#23, sum(UnscaledValue(cs_net_profit#7))#24, count(cs_order_number#5)#27]
Results [3]: [count(cs_order_number#5)#27 AS order count #30, MakeDecimal(sum(UnscaledValue(cs_ext_ship_cost#6))#23,17,2) AS total shipping cost #31, MakeDecimal(sum(UnscaledValue(cs_net_profit#7))#24,17,2) AS total net profit #32]

(44) TakeOrderedAndProject
(44) Sort [codegen id : 12]
Input [3]: [order count #30, total shipping cost #31, total net profit #32]
Arguments: 100, [order count #30 ASC NULLS FIRST], [order count #30, total shipping cost #31, total net profit #32]
Arguments: [order count #30 ASC NULLS FIRST], true, 0

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
TakeOrderedAndProject [order count ,total shipping cost ,total net profit ]
WholeStageCodegen (12)
WholeStageCodegen (12)
Sort [order count ]
HashAggregate [sum,sum,count] [sum(UnscaledValue(cs_ext_ship_cost)),sum(UnscaledValue(cs_net_profit)),count(cs_order_number),order count ,total shipping cost ,total net profit ,sum,sum,count]
InputAdapter
Exchange #1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
== Physical Plan ==
TakeOrderedAndProject (41)
* Sort (41)
+- * HashAggregate (40)
+- Exchange (39)
+- * HashAggregate (38)
Expand Down Expand Up @@ -229,7 +229,7 @@ Functions [3]: [sum(UnscaledValue(cs_ext_ship_cost#6)), sum(UnscaledValue(cs_net
Aggregate Attributes [3]: [sum(UnscaledValue(cs_ext_ship_cost#6))#22, sum(UnscaledValue(cs_net_profit#7))#23, count(cs_order_number#5)#27]
Results [3]: [count(cs_order_number#5)#27 AS order count #30, MakeDecimal(sum(UnscaledValue(cs_ext_ship_cost#6))#22,17,2) AS total shipping cost #31, MakeDecimal(sum(UnscaledValue(cs_net_profit#7))#23,17,2) AS total net profit #32]

(41) TakeOrderedAndProject
(41) Sort [codegen id : 8]
Input [3]: [order count #30, total shipping cost #31, total net profit #32]
Arguments: 100, [order count #30 ASC NULLS FIRST], [order count #30, total shipping cost #31, total net profit #32]
Arguments: [order count #30 ASC NULLS FIRST], true, 0

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
TakeOrderedAndProject [order count ,total shipping cost ,total net profit ]
WholeStageCodegen (8)
WholeStageCodegen (8)
Sort [order count ]
HashAggregate [sum,sum,count] [sum(UnscaledValue(cs_ext_ship_cost)),sum(UnscaledValue(cs_net_profit)),count(cs_order_number),order count ,total shipping cost ,total net profit ,sum,sum,count]
InputAdapter
Exchange #1
Expand Down
Loading