Skip to content

Commit db0c038

Browse files
chenghao-intelmarmbrus
authored andcommitted
[SPARK-2076][SQL] Pushdown the join filter & predication for outer join
As the rule described in https://cwiki.apache.org/confluence/display/Hive/OuterJoinBehavior, we can optimize the SQL Join by pushing down the Join predicate and Where predicate. Author: Cheng Hao <[email protected]> Closes apache#1015 from chenghao-intel/join_predicate_push_down and squashes the following commits: 10feff9 [Cheng Hao] fix bug of changing the join type in PredicatePushDownThroughJoin 44c6700 [Cheng Hao] Add logical to support pushdown the join filter 0bce426 [Cheng Hao] Pushdown the join filter & predicate for outer join
1 parent 884ca71 commit db0c038

File tree

2 files changed

+277
-22
lines changed

2 files changed

+277
-22
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 93 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ package org.apache.spark.sql.catalyst.optimizer
1919

2020
import org.apache.spark.sql.catalyst.expressions._
2121
import org.apache.spark.sql.catalyst.plans.Inner
22+
import org.apache.spark.sql.catalyst.plans.FullOuter
23+
import org.apache.spark.sql.catalyst.plans.LeftOuter
24+
import org.apache.spark.sql.catalyst.plans.RightOuter
25+
import org.apache.spark.sql.catalyst.plans.LeftSemi
2226
import org.apache.spark.sql.catalyst.plans.logical._
2327
import org.apache.spark.sql.catalyst.rules._
2428
import org.apache.spark.sql.catalyst.types._
@@ -34,7 +38,7 @@ object Optimizer extends RuleExecutor[LogicalPlan] {
3438
Batch("Filter Pushdown", FixedPoint(100),
3539
CombineFilters,
3640
PushPredicateThroughProject,
37-
PushPredicateThroughInnerJoin,
41+
PushPredicateThroughJoin,
3842
ColumnPruning) :: Nil
3943
}
4044

@@ -254,28 +258,98 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] {
254258

255259
/**
256260
* Pushes down [[catalyst.plans.logical.Filter Filter]] operators where the `condition` can be
257-
* evaluated using only the attributes of the left or right side of an inner join. Other
261+
* evaluated using only the attributes of the left or right side of a join. Other
258262
* [[catalyst.plans.logical.Filter Filter]] conditions are moved into the `condition` of the
259263
* [[catalyst.plans.logical.Join Join]].
264+
* And also Pushes down the join filter, where the `condition` can be evaluated using only the
265+
* attributes of the left or right side of sub query when applicable.
266+
*
267+
* Check https://cwiki.apache.org/confluence/display/Hive/OuterJoinBehavior for more details
260268
*/
261-
object PushPredicateThroughInnerJoin extends Rule[LogicalPlan] with PredicateHelper {
269+
object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
270+
// split the condition expression into 3 parts,
271+
// (canEvaluateInLeftSide, canEvaluateInRightSide, haveToEvaluateWithBothSide)
272+
private def split(condition: Seq[Expression], left: LogicalPlan, right: LogicalPlan) = {
273+
val (leftEvaluateCondition, rest) =
274+
condition.partition(_.references subsetOf left.outputSet)
275+
val (rightEvaluateCondition, commonCondition) =
276+
rest.partition(_.references subsetOf right.outputSet)
277+
278+
(leftEvaluateCondition, rightEvaluateCondition, commonCondition)
279+
}
280+
262281
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
263-
case f @ Filter(filterCondition, Join(left, right, Inner, joinCondition)) =>
264-
val allConditions =
265-
splitConjunctivePredicates(filterCondition) ++
266-
joinCondition.map(splitConjunctivePredicates).getOrElse(Nil)
267-
268-
// Split the predicates into those that can be evaluated on the left, right, and those that
269-
// must be evaluated after the join.
270-
val (rightConditions, leftOrJoinConditions) =
271-
allConditions.partition(_.references subsetOf right.outputSet)
272-
val (leftConditions, joinConditions) =
273-
leftOrJoinConditions.partition(_.references subsetOf left.outputSet)
274-
275-
// Build the new left and right side, optionally with the pushed down filters.
276-
val newLeft = leftConditions.reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
277-
val newRight = rightConditions.reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
278-
Join(newLeft, newRight, Inner, joinConditions.reduceLeftOption(And))
282+
// push the where condition down into join filter
283+
case f @ Filter(filterCondition, Join(left, right, joinType, joinCondition)) =>
284+
val (leftFilterConditions, rightFilterConditions, commonFilterCondition) =
285+
split(splitConjunctivePredicates(filterCondition), left, right)
286+
287+
joinType match {
288+
case Inner =>
289+
// push down the single side `where` condition into respective sides
290+
val newLeft = leftFilterConditions.
291+
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
292+
val newRight = rightFilterConditions.
293+
reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
294+
val newJoinCond = (commonFilterCondition ++ joinCondition).reduceLeftOption(And)
295+
296+
Join(newLeft, newRight, Inner, newJoinCond)
297+
case RightOuter =>
298+
// push down the right side only `where` condition
299+
val newLeft = left
300+
val newRight = rightFilterConditions.
301+
reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
302+
val newJoinCond = joinCondition
303+
val newJoin = Join(newLeft, newRight, RightOuter, newJoinCond)
304+
305+
(leftFilterConditions ++ commonFilterCondition).
306+
reduceLeftOption(And).map(Filter(_, newJoin)).getOrElse(newJoin)
307+
case _ @ (LeftOuter | LeftSemi) =>
308+
// push down the left side only `where` condition
309+
val newLeft = leftFilterConditions.
310+
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
311+
val newRight = right
312+
val newJoinCond = joinCondition
313+
val newJoin = Join(newLeft, newRight, joinType, newJoinCond)
314+
315+
(rightFilterConditions ++ commonFilterCondition).
316+
reduceLeftOption(And).map(Filter(_, newJoin)).getOrElse(newJoin)
317+
case FullOuter => f // DO Nothing for Full Outer Join
318+
}
319+
320+
// push down the join filter into sub query scanning if applicable
321+
case f @ Join(left, right, joinType, joinCondition) =>
322+
val (leftJoinConditions, rightJoinConditions, commonJoinCondition) =
323+
split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)
324+
325+
joinType match {
326+
case Inner =>
327+
// push down the single side only join filter for both sides sub queries
328+
val newLeft = leftJoinConditions.
329+
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
330+
val newRight = rightJoinConditions.
331+
reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
332+
val newJoinCond = commonJoinCondition.reduceLeftOption(And)
333+
334+
Join(newLeft, newRight, Inner, newJoinCond)
335+
case RightOuter =>
336+
// push down the left side only join filter for left side sub query
337+
val newLeft = leftJoinConditions.
338+
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
339+
val newRight = right
340+
val newJoinCond = (rightJoinConditions ++ commonJoinCondition).reduceLeftOption(And)
341+
342+
Join(newLeft, newRight, RightOuter, newJoinCond)
343+
case _ @ (LeftOuter | LeftSemi) =>
344+
// push down the right side only join filter for right sub query
345+
val newLeft = left
346+
val newRight = rightJoinConditions.
347+
reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
348+
val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And)
349+
350+
Join(newLeft, newRight, joinType, newJoinCond)
351+
case FullOuter => f
352+
}
279353
}
280354
}
281355

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala

Lines changed: 184 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@ package org.apache.spark.sql.catalyst.optimizer
2020
import org.apache.spark.sql.catalyst.analysis
2121
import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
2222
import org.apache.spark.sql.catalyst.plans.logical._
23+
import org.apache.spark.sql.catalyst.plans.Inner
24+
import org.apache.spark.sql.catalyst.plans.FullOuter
25+
import org.apache.spark.sql.catalyst.plans.LeftOuter
26+
import org.apache.spark.sql.catalyst.plans.RightOuter
2327
import org.apache.spark.sql.catalyst.rules._
24-
25-
/* Implicit conversions */
2628
import org.apache.spark.sql.catalyst.dsl.plans._
2729
import org.apache.spark.sql.catalyst.dsl.expressions._
30+
import org.junit.Test
2831

2932
class FilterPushdownSuite extends OptimizerTest {
3033

@@ -35,7 +38,7 @@ class FilterPushdownSuite extends OptimizerTest {
3538
Batch("Filter Pushdown", Once,
3639
CombineFilters,
3740
PushPredicateThroughProject,
38-
PushPredicateThroughInnerJoin) :: Nil
41+
PushPredicateThroughJoin) :: Nil
3942
}
4043

4144
val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
@@ -161,6 +164,184 @@ class FilterPushdownSuite extends OptimizerTest {
161164

162165
comparePlans(optimized, correctAnswer)
163166
}
167+
168+
test("joins: push down left outer join #1") {
169+
val x = testRelation.subquery('x)
170+
val y = testRelation.subquery('y)
171+
172+
val originalQuery = {
173+
x.join(y, LeftOuter)
174+
.where("x.b".attr === 1 && "y.b".attr === 2)
175+
}
176+
177+
val optimized = Optimize(originalQuery.analyze)
178+
val left = testRelation.where('b === 1)
179+
val correctAnswer =
180+
left.join(y, LeftOuter).where("y.b".attr === 2).analyze
181+
182+
comparePlans(optimized, correctAnswer)
183+
}
184+
185+
test("joins: push down right outer join #1") {
186+
val x = testRelation.subquery('x)
187+
val y = testRelation.subquery('y)
188+
189+
val originalQuery = {
190+
x.join(y, RightOuter)
191+
.where("x.b".attr === 1 && "y.b".attr === 2)
192+
}
193+
194+
val optimized = Optimize(originalQuery.analyze)
195+
val right = testRelation.where('b === 2).subquery('d)
196+
val correctAnswer =
197+
x.join(right, RightOuter).where("x.b".attr === 1).analyze
198+
199+
comparePlans(optimized, correctAnswer)
200+
}
201+
202+
test("joins: push down left outer join #2") {
203+
val x = testRelation.subquery('x)
204+
val y = testRelation.subquery('y)
205+
206+
val originalQuery = {
207+
x.join(y, LeftOuter, Some("x.b".attr === 1))
208+
.where("x.b".attr === 2 && "y.b".attr === 2)
209+
}
210+
211+
val optimized = Optimize(originalQuery.analyze)
212+
val left = testRelation.where('b === 2).subquery('d)
213+
val correctAnswer =
214+
left.join(y, LeftOuter, Some("d.b".attr === 1)).where("y.b".attr === 2).analyze
215+
216+
comparePlans(optimized, correctAnswer)
217+
}
218+
219+
test("joins: push down right outer join #2") {
220+
val x = testRelation.subquery('x)
221+
val y = testRelation.subquery('y)
222+
223+
val originalQuery = {
224+
x.join(y, RightOuter, Some("y.b".attr === 1))
225+
.where("x.b".attr === 2 && "y.b".attr === 2)
226+
}
227+
228+
val optimized = Optimize(originalQuery.analyze)
229+
val right = testRelation.where('b === 2).subquery('d)
230+
val correctAnswer =
231+
x.join(right, RightOuter, Some("d.b".attr === 1)).where("x.b".attr === 2).analyze
232+
233+
comparePlans(optimized, correctAnswer)
234+
}
235+
236+
test("joins: push down left outer join #3") {
237+
val x = testRelation.subquery('x)
238+
val y = testRelation.subquery('y)
239+
240+
val originalQuery = {
241+
x.join(y, LeftOuter, Some("y.b".attr === 1))
242+
.where("x.b".attr === 2 && "y.b".attr === 2)
243+
}
244+
245+
val optimized = Optimize(originalQuery.analyze)
246+
val left = testRelation.where('b === 2).subquery('l)
247+
val right = testRelation.where('b === 1).subquery('r)
248+
val correctAnswer =
249+
left.join(right, LeftOuter).where("r.b".attr === 2).analyze
250+
251+
comparePlans(optimized, correctAnswer)
252+
}
253+
254+
test("joins: push down right outer join #3") {
255+
val x = testRelation.subquery('x)
256+
val y = testRelation.subquery('y)
257+
258+
val originalQuery = {
259+
x.join(y, RightOuter, Some("y.b".attr === 1))
260+
.where("x.b".attr === 2 && "y.b".attr === 2)
261+
}
262+
263+
val optimized = Optimize(originalQuery.analyze)
264+
val right = testRelation.where('b === 2).subquery('r)
265+
val correctAnswer =
266+
x.join(right, RightOuter, Some("r.b".attr === 1)).where("x.b".attr === 2).analyze
267+
268+
comparePlans(optimized, correctAnswer)
269+
}
270+
271+
test("joins: push down left outer join #4") {
272+
val x = testRelation.subquery('x)
273+
val y = testRelation.subquery('y)
274+
275+
val originalQuery = {
276+
x.join(y, LeftOuter, Some("y.b".attr === 1))
277+
.where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr)
278+
}
279+
280+
val optimized = Optimize(originalQuery.analyze)
281+
val left = testRelation.where('b === 2).subquery('l)
282+
val right = testRelation.where('b === 1).subquery('r)
283+
val correctAnswer =
284+
left.join(right, LeftOuter).where("r.b".attr === 2 && "l.c".attr === "r.c".attr).analyze
285+
286+
comparePlans(optimized, correctAnswer)
287+
}
288+
289+
test("joins: push down right outer join #4") {
290+
val x = testRelation.subquery('x)
291+
val y = testRelation.subquery('y)
292+
293+
val originalQuery = {
294+
x.join(y, RightOuter, Some("y.b".attr === 1))
295+
.where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr)
296+
}
297+
298+
val optimized = Optimize(originalQuery.analyze)
299+
val left = testRelation.subquery('l)
300+
val right = testRelation.where('b === 2).subquery('r)
301+
val correctAnswer =
302+
left.join(right, RightOuter, Some("r.b".attr === 1)).
303+
where("l.b".attr === 2 && "l.c".attr === "r.c".attr).analyze
304+
305+
comparePlans(optimized, correctAnswer)
306+
}
307+
308+
test("joins: push down left outer join #5") {
309+
val x = testRelation.subquery('x)
310+
val y = testRelation.subquery('y)
311+
312+
val originalQuery = {
313+
x.join(y, LeftOuter, Some("y.b".attr === 1 && "x.a".attr === 3))
314+
.where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr)
315+
}
316+
317+
val optimized = Optimize(originalQuery.analyze)
318+
val left = testRelation.where('b === 2).subquery('l)
319+
val right = testRelation.where('b === 1).subquery('r)
320+
val correctAnswer =
321+
left.join(right, LeftOuter, Some("l.a".attr===3)).
322+
where("r.b".attr === 2 && "l.c".attr === "r.c".attr).analyze
323+
324+
comparePlans(optimized, correctAnswer)
325+
}
326+
327+
test("joins: push down right outer join #5") {
328+
val x = testRelation.subquery('x)
329+
val y = testRelation.subquery('y)
330+
331+
val originalQuery = {
332+
x.join(y, RightOuter, Some("y.b".attr === 1 && "x.a".attr === 3))
333+
.where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr)
334+
}
335+
336+
val optimized = Optimize(originalQuery.analyze)
337+
val left = testRelation.where('a === 3).subquery('l)
338+
val right = testRelation.where('b === 2).subquery('r)
339+
val correctAnswer =
340+
left.join(right, RightOuter, Some("r.b".attr === 1)).
341+
where("l.b".attr === 2 && "l.c".attr === "r.c".attr).analyze
342+
343+
comparePlans(optimized, correctAnswer)
344+
}
164345

165346
test("joins: can't push down") {
166347
val x = testRelation.subquery('x)

0 commit comments

Comments
 (0)