From 0bce426d75908ef1ebe0a8e78e9ee0c90aa04be2 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Mon, 9 Jun 2014 13:09:10 +0800 Subject: [PATCH 1/3] Pushdown the join filter & predicate for outer join --- .../sql/catalyst/optimizer/Optimizer.scala | 79 +++++++++++++++---- .../optimizer/FilterPushdownSuite.scala | 2 +- 2 files changed, 65 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 406ffd6801e98..06be590469964 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -19,6 +19,9 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.Inner +import org.apache.spark.sql.catalyst.plans.FullOuter +import org.apache.spark.sql.catalyst.plans.LeftOuter +import org.apache.spark.sql.catalyst.plans.RightOuter import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.types._ @@ -34,7 +37,7 @@ object Optimizer extends RuleExecutor[LogicalPlan] { Batch("Filter Pushdown", FixedPoint(100), CombineFilters, PushPredicateThroughProject, - PushPredicateThroughInnerJoin, + PushPredicateThroughJoin, ColumnPruning) :: Nil } @@ -254,28 +257,74 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] { /** * Pushes down [[catalyst.plans.logical.Filter Filter]] operators where the `condition` can be - * evaluated using only the attributes of the left or right side of an inner join. Other + * evaluated using only the attributes of the left or right side of a join. Other * [[catalyst.plans.logical.Filter Filter]] conditions are moved into the `condition` of the * [[catalyst.plans.logical.Join Join]]. + * + * The basic condition (JoinCondition & Filter) push down should conform to the rule: + * https://cwiki.apache.org/confluence/display/Hive/OuterJoinBehavior */ -object PushPredicateThroughInnerJoin extends Rule[LogicalPlan] with PredicateHelper { +object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { + // split the condition expression into 3 parts, + // (canEvaluateInLeft, canEvaluateInRight, haveToEvaluateInBoth) + private def split(condition: Seq[Expression], left: LogicalPlan, right: LogicalPlan) = { + val (leftEvaluateCondition, rest) = + condition.partition(_.references subsetOf left.outputSet) + val (rightEvaluateCondition, commonCondition) = + rest.partition(_.references subsetOf right.outputSet) + + (leftEvaluateCondition, rightEvaluateCondition, commonCondition) + } + def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case f @ Filter(filterCondition, Join(left, right, Inner, joinCondition)) => - val allConditions = - splitConjunctivePredicates(filterCondition) ++ - joinCondition.map(splitConjunctivePredicates).getOrElse(Nil) + case f @ Filter(filterCondition, Join(left, right, joinType, joinCondition)) => + val splittedFilterCondition = splitConjunctivePredicates(filterCondition) + val splittedJoinCondition = joinCondition.map(splitConjunctivePredicates).getOrElse(Nil) // Split the predicates into those that can be evaluated on the left, right, and those that // must be evaluated after the join. - val (rightConditions, leftOrJoinConditions) = - allConditions.partition(_.references subsetOf right.outputSet) - val (leftConditions, joinConditions) = - leftOrJoinConditions.partition(_.references subsetOf left.outputSet) + val (leftFilterConditions, rightFilterConditions, commonFilterCondition) = + split(splittedFilterCondition, left, right) + val (leftJoinConditions, rightJoinConditions, commonJoinCondition) = + split(splittedJoinCondition, left, right) + + joinType match { + case Inner => + // Treat the Condition / Filter in the same way + val newLeft = (leftFilterConditions ++ leftJoinConditions). + reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) + val newRight = (rightFilterConditions ++ rightJoinConditions). + reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) + val newJoinCond = (commonFilterCondition ++ commonJoinCondition).reduceLeftOption(And) - // Build the new left and right side, optionally with the pushed down filters. - val newLeft = leftConditions.reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) - val newRight = rightConditions.reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - Join(newLeft, newRight, Inner, joinConditions.reduceLeftOption(And)) + Join(newLeft, newRight, Inner, newJoinCond) + case RightOuter => + // Push Down the Right Only Filter & Push Down the Left Only Join Condition + val newLeft = leftJoinConditions. + reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) + val newRight = rightFilterConditions. + reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) + val newJoinCond = (rightJoinConditions ++ commonJoinCondition).reduceLeftOption(And) + val newJoin = Join(newLeft, newRight, RightOuter, newJoinCond) + + (leftFilterConditions ++ commonFilterCondition). + reduceLeftOption(And).map(Filter(_, newJoin)).getOrElse(newJoin) + case LeftOuter => + // Push Down the Left Only Filter & Push Down the Right Only Join Condition + val newLeft = leftFilterConditions. + reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) + val newRight = rightJoinConditions. + reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) + val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And) + val newJoin = Join(newLeft, newRight, LeftOuter, newJoinCond) + + (rightFilterConditions ++ commonFilterCondition). + reduceLeftOption(And).map(Filter(_, newJoin)).getOrElse(newJoin) + case FullOuter => + // DO Nothing for Full Outer Join + + f + } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index ef47850455a37..4382b4ed24d7c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -35,7 +35,7 @@ class FilterPushdownSuite extends OptimizerTest { Batch("Filter Pushdown", Once, CombineFilters, PushPredicateThroughProject, - PushPredicateThroughInnerJoin) :: Nil + PushPredicateThroughJoin) :: Nil } val testRelation = LocalRelation('a.int, 'b.int, 'c.int) From 44c6700fad0a702f5f278d251cc31168ba72ea9b Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Tue, 10 Jun 2014 14:55:12 +0800 Subject: [PATCH 2/3] Add logical to support pushdown the join filter --- .../sql/catalyst/optimizer/Optimizer.scala | 76 ++++--- .../optimizer/FilterPushdownSuite.scala | 185 +++++++++++++++++- 2 files changed, 233 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 06be590469964..6a4b140281790 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -260,13 +260,14 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] { * evaluated using only the attributes of the left or right side of a join. Other * [[catalyst.plans.logical.Filter Filter]] conditions are moved into the `condition` of the * [[catalyst.plans.logical.Join Join]]. + * And also Pushes down the join filter, where the `condition` can be evaluated using only the + * attributes of the left or right side of sub query when applicable. * - * The basic condition (JoinCondition & Filter) push down should conform to the rule: - * https://cwiki.apache.org/confluence/display/Hive/OuterJoinBehavior + * Check https://cwiki.apache.org/confluence/display/Hive/OuterJoinBehavior for more details */ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { // split the condition expression into 3 parts, - // (canEvaluateInLeft, canEvaluateInRight, haveToEvaluateInBoth) + // (canEvaluateInLeftSide, canEvaluateInRightSide, haveToEvaluateWithBothSide) private def split(condition: Seq[Expression], left: LogicalPlan, right: LogicalPlan) = { val (leftEvaluateCondition, rest) = condition.partition(_.references subsetOf left.outputSet) @@ -277,53 +278,76 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { } def apply(plan: LogicalPlan): LogicalPlan = plan transform { + // push the where condition down into join filter case f @ Filter(filterCondition, Join(left, right, joinType, joinCondition)) => - val splittedFilterCondition = splitConjunctivePredicates(filterCondition) - val splittedJoinCondition = joinCondition.map(splitConjunctivePredicates).getOrElse(Nil) - - // Split the predicates into those that can be evaluated on the left, right, and those that - // must be evaluated after the join. val (leftFilterConditions, rightFilterConditions, commonFilterCondition) = - split(splittedFilterCondition, left, right) - val (leftJoinConditions, rightJoinConditions, commonJoinCondition) = - split(splittedJoinCondition, left, right) + split(splitConjunctivePredicates(filterCondition), left, right) joinType match { case Inner => - // Treat the Condition / Filter in the same way - val newLeft = (leftFilterConditions ++ leftJoinConditions). + // push down the single side `where` condition into respective sides + val newLeft = leftFilterConditions. reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) - val newRight = (rightFilterConditions ++ rightJoinConditions). + val newRight = rightFilterConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = (commonFilterCondition ++ commonJoinCondition).reduceLeftOption(And) + val newJoinCond = (commonFilterCondition ++ joinCondition).reduceLeftOption(And) Join(newLeft, newRight, Inner, newJoinCond) case RightOuter => - // Push Down the Right Only Filter & Push Down the Left Only Join Condition - val newLeft = leftJoinConditions. - reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) + // push down the right side only `where` condition + val newLeft = left val newRight = rightFilterConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = (rightJoinConditions ++ commonJoinCondition).reduceLeftOption(And) + val newJoinCond = joinCondition val newJoin = Join(newLeft, newRight, RightOuter, newJoinCond) (leftFilterConditions ++ commonFilterCondition). reduceLeftOption(And).map(Filter(_, newJoin)).getOrElse(newJoin) case LeftOuter => - // Push Down the Left Only Filter & Push Down the Right Only Join Condition + // push down the left side only `where` condition val newLeft = leftFilterConditions. reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) - val newRight = rightJoinConditions. - reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And) + val newRight = right + val newJoinCond = joinCondition val newJoin = Join(newLeft, newRight, LeftOuter, newJoinCond) (rightFilterConditions ++ commonFilterCondition). reduceLeftOption(And).map(Filter(_, newJoin)).getOrElse(newJoin) - case FullOuter => - // DO Nothing for Full Outer Join + case FullOuter => f // DO Nothing for Full Outer Join + } + + // push down the join filter into sub query scanning if applicable + case f @ Join(left, right, joinType, joinCondition) => + val (leftJoinConditions, rightJoinConditions, commonJoinCondition) = + split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right) + + joinType match { + case Inner => + // push down the single side only join filter for both sides sub queries + val newLeft = leftJoinConditions. + reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) + val newRight = rightJoinConditions. + reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) + val newJoinCond = commonJoinCondition.reduceLeftOption(And) + + Join(newLeft, newRight, Inner, newJoinCond) + case RightOuter => + // push down the left side only join filter for left side sub query + val newLeft = leftJoinConditions. + reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) + val newRight = right + val newJoinCond = (rightJoinConditions ++ commonJoinCondition).reduceLeftOption(And) + + Join(newLeft, newRight, RightOuter, newJoinCond) + case LeftOuter => + // push down the right side only join filter for right sub query + val newLeft = left + val newRight = rightJoinConditions. + reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) + val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And) - f + Join(newLeft, newRight, LeftOuter, newJoinCond) + case FullOuter => f } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 4382b4ed24d7c..02cc665f8a8c7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -20,11 +20,14 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.Inner +import org.apache.spark.sql.catalyst.plans.FullOuter +import org.apache.spark.sql.catalyst.plans.LeftOuter +import org.apache.spark.sql.catalyst.plans.RightOuter import org.apache.spark.sql.catalyst.rules._ - -/* Implicit conversions */ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.junit.Test class FilterPushdownSuite extends OptimizerTest { @@ -161,6 +164,184 @@ class FilterPushdownSuite extends OptimizerTest { comparePlans(optimized, correctAnswer) } + + test("joins: push down left outer join #1") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y, LeftOuter) + .where("x.b".attr === 1 && "y.b".attr === 2) + } + + val optimized = Optimize(originalQuery.analyze) + val left = testRelation.where('b === 1) + val correctAnswer = + left.join(y, LeftOuter).where("y.b".attr === 2).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: push down right outer join #1") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y, RightOuter) + .where("x.b".attr === 1 && "y.b".attr === 2) + } + + val optimized = Optimize(originalQuery.analyze) + val right = testRelation.where('b === 2).subquery('d) + val correctAnswer = + x.join(right, RightOuter).where("x.b".attr === 1).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: push down left outer join #2") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y, LeftOuter, Some("x.b".attr === 1)) + .where("x.b".attr === 2 && "y.b".attr === 2) + } + + val optimized = Optimize(originalQuery.analyze) + val left = testRelation.where('b === 2).subquery('d) + val correctAnswer = + left.join(y, LeftOuter, Some("d.b".attr === 1)).where("y.b".attr === 2).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: push down right outer join #2") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y, RightOuter, Some("y.b".attr === 1)) + .where("x.b".attr === 2 && "y.b".attr === 2) + } + + val optimized = Optimize(originalQuery.analyze) + val right = testRelation.where('b === 2).subquery('d) + val correctAnswer = + x.join(right, RightOuter, Some("d.b".attr === 1)).where("x.b".attr === 2).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: push down left outer join #3") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y, LeftOuter, Some("y.b".attr === 1)) + .where("x.b".attr === 2 && "y.b".attr === 2) + } + + val optimized = Optimize(originalQuery.analyze) + val left = testRelation.where('b === 2).subquery('l) + val right = testRelation.where('b === 1).subquery('r) + val correctAnswer = + left.join(right, LeftOuter).where("r.b".attr === 2).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: push down right outer join #3") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y, RightOuter, Some("y.b".attr === 1)) + .where("x.b".attr === 2 && "y.b".attr === 2) + } + + val optimized = Optimize(originalQuery.analyze) + val right = testRelation.where('b === 2).subquery('r) + val correctAnswer = + x.join(right, RightOuter, Some("r.b".attr === 1)).where("x.b".attr === 2).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: push down left outer join #4") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y, LeftOuter, Some("y.b".attr === 1)) + .where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr) + } + + val optimized = Optimize(originalQuery.analyze) + val left = testRelation.where('b === 2).subquery('l) + val right = testRelation.where('b === 1).subquery('r) + val correctAnswer = + left.join(right, LeftOuter).where("r.b".attr === 2 && "l.c".attr === "r.c".attr).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: push down right outer join #4") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y, RightOuter, Some("y.b".attr === 1)) + .where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr) + } + + val optimized = Optimize(originalQuery.analyze) + val left = testRelation.subquery('l) + val right = testRelation.where('b === 2).subquery('r) + val correctAnswer = + left.join(right, RightOuter, Some("r.b".attr === 1)). + where("l.b".attr === 2 && "l.c".attr === "r.c".attr).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: push down left outer join #5") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y, LeftOuter, Some("y.b".attr === 1 && "x.a".attr === 3)) + .where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr) + } + + val optimized = Optimize(originalQuery.analyze) + val left = testRelation.where('b === 2).subquery('l) + val right = testRelation.where('b === 1).subquery('r) + val correctAnswer = + left.join(right, LeftOuter, Some("l.a".attr===3)). + where("r.b".attr === 2 && "l.c".attr === "r.c".attr).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: push down right outer join #5") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val originalQuery = { + x.join(y, RightOuter, Some("y.b".attr === 1 && "x.a".attr === 3)) + .where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr) + } + + val optimized = Optimize(originalQuery.analyze) + val left = testRelation.where('a === 3).subquery('l) + val right = testRelation.where('b === 2).subquery('r) + val correctAnswer = + left.join(right, RightOuter, Some("r.b".attr === 1)). + where("l.b".attr === 2 && "l.c".attr === "r.c".attr).analyze + + comparePlans(optimized, correctAnswer) + } test("joins: can't push down") { val x = testRelation.subquery('x) From 10feff9b36f14484558b23c1f74bb34333ab2608 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Tue, 10 Jun 2014 15:37:33 +0800 Subject: [PATCH 3/3] fix bug of changing the join type in PredicatePushDownThroughJoin --- .../apache/spark/sql/catalyst/optimizer/Optimizer.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 6a4b140281790..ccb8245cc2e7d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.catalyst.plans.FullOuter import org.apache.spark.sql.catalyst.plans.LeftOuter import org.apache.spark.sql.catalyst.plans.RightOuter +import org.apache.spark.sql.catalyst.plans.LeftSemi import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.types._ @@ -303,13 +304,13 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { (leftFilterConditions ++ commonFilterCondition). reduceLeftOption(And).map(Filter(_, newJoin)).getOrElse(newJoin) - case LeftOuter => + case _ @ (LeftOuter | LeftSemi) => // push down the left side only `where` condition val newLeft = leftFilterConditions. reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = right val newJoinCond = joinCondition - val newJoin = Join(newLeft, newRight, LeftOuter, newJoinCond) + val newJoin = Join(newLeft, newRight, joinType, newJoinCond) (rightFilterConditions ++ commonFilterCondition). reduceLeftOption(And).map(Filter(_, newJoin)).getOrElse(newJoin) @@ -339,14 +340,14 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { val newJoinCond = (rightJoinConditions ++ commonJoinCondition).reduceLeftOption(And) Join(newLeft, newRight, RightOuter, newJoinCond) - case LeftOuter => + case _ @ (LeftOuter | LeftSemi) => // push down the right side only join filter for right sub query val newLeft = left val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And) - Join(newLeft, newRight, LeftOuter, newJoinCond) + Join(newLeft, newRight, joinType, newJoinCond) case FullOuter => f } }