@@ -19,6 +19,9 @@ package org.apache.spark.sql.catalyst.optimizer
1919
2020import org .apache .spark .sql .catalyst .expressions ._
2121import org .apache .spark .sql .catalyst .plans .Inner
22+ import org .apache .spark .sql .catalyst .plans .FullOuter
23+ import org .apache .spark .sql .catalyst .plans .LeftOuter
24+ import org .apache .spark .sql .catalyst .plans .RightOuter
2225import org .apache .spark .sql .catalyst .plans .logical ._
2326import org .apache .spark .sql .catalyst .rules ._
2427import org .apache .spark .sql .catalyst .types ._
@@ -34,7 +37,7 @@ object Optimizer extends RuleExecutor[LogicalPlan] {
3437 Batch (" Filter Pushdown" , FixedPoint (100 ),
3538 CombineFilters ,
3639 PushPredicateThroughProject ,
37- PushPredicateThroughInnerJoin ,
40+ PushPredicateThroughJoin ,
3841 ColumnPruning ) :: Nil
3942}
4043
@@ -254,28 +257,74 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] {
254257
255258/**
256259 * Pushes down [[catalyst.plans.logical.Filter Filter ]] operators where the `condition` can be
257- * evaluated using only the attributes of the left or right side of an inner join. Other
260+ * evaluated using only the attributes of the left or right side of a join. Other
258261 * [[catalyst.plans.logical.Filter Filter ]] conditions are moved into the `condition` of the
259262 * [[catalyst.plans.logical.Join Join ]].
263+ *
264+ * The basic condition (JoinCondition & Filter) push down should conform to the rule:
265+ * https://cwiki.apache.org/confluence/display/Hive/OuterJoinBehavior
260266 */
261- object PushPredicateThroughInnerJoin extends Rule [LogicalPlan ] with PredicateHelper {
267+ object PushPredicateThroughJoin extends Rule [LogicalPlan ] with PredicateHelper {
268+ // split the condition expression into 3 parts,
269+ // (canEvaluateInLeft, canEvaluateInRight, haveToEvaluateInBoth)
270+ private def split (condition : Seq [Expression ], left : LogicalPlan , right : LogicalPlan ) = {
271+ val (leftEvaluateCondition, rest) =
272+ condition.partition(_.references subsetOf left.outputSet)
273+ val (rightEvaluateCondition, commonCondition) =
274+ rest.partition(_.references subsetOf right.outputSet)
275+
276+ (leftEvaluateCondition, rightEvaluateCondition, commonCondition)
277+ }
278+
262279 def apply (plan : LogicalPlan ): LogicalPlan = plan transform {
263- case f @ Filter (filterCondition, Join (left, right, Inner , joinCondition)) =>
264- val allConditions =
265- splitConjunctivePredicates(filterCondition) ++
266- joinCondition.map(splitConjunctivePredicates).getOrElse(Nil )
280+ case f @ Filter (filterCondition, Join (left, right, joinType, joinCondition)) =>
281+ val splittedFilterCondition = splitConjunctivePredicates(filterCondition)
282+ val splittedJoinCondition = joinCondition.map(splitConjunctivePredicates).getOrElse(Nil )
267283
268284 // Split the predicates into those that can be evaluated on the left, right, and those that
269285 // must be evaluated after the join.
270- val (rightConditions, leftOrJoinConditions) =
271- allConditions.partition(_.references subsetOf right.outputSet)
272- val (leftConditions, joinConditions) =
273- leftOrJoinConditions.partition(_.references subsetOf left.outputSet)
286+ val (leftFilterConditions, rightFilterConditions, commonFilterCondition) =
287+ split(splittedFilterCondition, left, right)
288+ val (leftJoinConditions, rightJoinConditions, commonJoinCondition) =
289+ split(splittedJoinCondition, left, right)
290+
291+ joinType match {
292+ case Inner =>
293+ // Treat the Condition / Filter in the same way
294+ val newLeft = (leftFilterConditions ++ leftJoinConditions).
295+ reduceLeftOption(And ).map(Filter (_, left)).getOrElse(left)
296+ val newRight = (rightFilterConditions ++ rightJoinConditions).
297+ reduceLeftOption(And ).map(Filter (_, right)).getOrElse(right)
298+ val newJoinCond = (commonFilterCondition ++ commonJoinCondition).reduceLeftOption(And )
274299
275- // Build the new left and right side, optionally with the pushed down filters.
276- val newLeft = leftConditions.reduceLeftOption(And ).map(Filter (_, left)).getOrElse(left)
277- val newRight = rightConditions.reduceLeftOption(And ).map(Filter (_, right)).getOrElse(right)
278- Join (newLeft, newRight, Inner , joinConditions.reduceLeftOption(And ))
300+ Join (newLeft, newRight, Inner , newJoinCond)
301+ case RightOuter =>
302+ // Push Down the Right Only Filter & Push Down the Left Only Join Condition
303+ val newLeft = leftJoinConditions.
304+ reduceLeftOption(And ).map(Filter (_, left)).getOrElse(left)
305+ val newRight = rightFilterConditions.
306+ reduceLeftOption(And ).map(Filter (_, right)).getOrElse(right)
307+ val newJoinCond = (rightJoinConditions ++ commonJoinCondition).reduceLeftOption(And )
308+ val newJoin = Join (newLeft, newRight, RightOuter , newJoinCond)
309+
310+ (leftFilterConditions ++ commonFilterCondition).
311+ reduceLeftOption(And ).map(Filter (_, newJoin)).getOrElse(newJoin)
312+ case LeftOuter =>
313+ // Push Down the Left Only Filter & Push Down the Right Only Join Condition
314+ val newLeft = leftFilterConditions.
315+ reduceLeftOption(And ).map(Filter (_, left)).getOrElse(left)
316+ val newRight = rightJoinConditions.
317+ reduceLeftOption(And ).map(Filter (_, right)).getOrElse(right)
318+ val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And )
319+ val newJoin = Join (newLeft, newRight, LeftOuter , newJoinCond)
320+
321+ (rightFilterConditions ++ commonFilterCondition).
322+ reduceLeftOption(And ).map(Filter (_, newJoin)).getOrElse(newJoin)
323+ case FullOuter =>
324+ // DO Nothing for Full Outer Join
325+
326+ f
327+ }
279328 }
280329}
281330
0 commit comments