Skip to content

Commit 44c6700

Browse files
Add logical to support pushdown the join filter
1 parent 0bce426 commit 44c6700

File tree

2 files changed

+233
-28
lines changed

2 files changed

+233
-28
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 50 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -260,13 +260,14 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] {
260260
* evaluated using only the attributes of the left or right side of a join. Other
261261
* [[catalyst.plans.logical.Filter Filter]] conditions are moved into the `condition` of the
262262
* [[catalyst.plans.logical.Join Join]].
263+
* And also Pushes down the join filter, where the `condition` can be evaluated using only the
264+
* attributes of the left or right side of sub query when applicable.
263265
*
264-
* The basic condition (JoinCondition & Filter) push down should conform to the rule:
265-
* https://cwiki.apache.org/confluence/display/Hive/OuterJoinBehavior
266+
* Check https://cwiki.apache.org/confluence/display/Hive/OuterJoinBehavior for more details
266267
*/
267268
object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
268269
// split the condition expression into 3 parts,
269-
// (canEvaluateInLeft, canEvaluateInRight, haveToEvaluateInBoth)
270+
// (canEvaluateInLeftSide, canEvaluateInRightSide, haveToEvaluateWithBothSide)
270271
private def split(condition: Seq[Expression], left: LogicalPlan, right: LogicalPlan) = {
271272
val (leftEvaluateCondition, rest) =
272273
condition.partition(_.references subsetOf left.outputSet)
@@ -277,53 +278,76 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
277278
}
278279

279280
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
281+
// push the where condition down into join filter
280282
case f @ Filter(filterCondition, Join(left, right, joinType, joinCondition)) =>
281-
val splittedFilterCondition = splitConjunctivePredicates(filterCondition)
282-
val splittedJoinCondition = joinCondition.map(splitConjunctivePredicates).getOrElse(Nil)
283-
284-
// Split the predicates into those that can be evaluated on the left, right, and those that
285-
// must be evaluated after the join.
286283
val (leftFilterConditions, rightFilterConditions, commonFilterCondition) =
287-
split(splittedFilterCondition, left, right)
288-
val (leftJoinConditions, rightJoinConditions, commonJoinCondition) =
289-
split(splittedJoinCondition, left, right)
284+
split(splitConjunctivePredicates(filterCondition), left, right)
290285

291286
joinType match {
292287
case Inner =>
293-
// Treat the Condition / Filter in the same way
294-
val newLeft = (leftFilterConditions ++ leftJoinConditions).
288+
// push down the single side `where` condition into respective sides
289+
val newLeft = leftFilterConditions.
295290
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
296-
val newRight = (rightFilterConditions ++ rightJoinConditions).
291+
val newRight = rightFilterConditions.
297292
reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
298-
val newJoinCond = (commonFilterCondition ++ commonJoinCondition).reduceLeftOption(And)
293+
val newJoinCond = (commonFilterCondition ++ joinCondition).reduceLeftOption(And)
299294

300295
Join(newLeft, newRight, Inner, newJoinCond)
301296
case RightOuter =>
302-
// Push Down the Right Only Filter & Push Down the Left Only Join Condition
303-
val newLeft = leftJoinConditions.
304-
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
297+
// push down the right side only `where` condition
298+
val newLeft = left
305299
val newRight = rightFilterConditions.
306300
reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
307-
val newJoinCond = (rightJoinConditions ++ commonJoinCondition).reduceLeftOption(And)
301+
val newJoinCond = joinCondition
308302
val newJoin = Join(newLeft, newRight, RightOuter, newJoinCond)
309303

310304
(leftFilterConditions ++ commonFilterCondition).
311305
reduceLeftOption(And).map(Filter(_, newJoin)).getOrElse(newJoin)
312306
case LeftOuter =>
313-
// Push Down the Left Only Filter & Push Down the Right Only Join Condition
307+
// push down the left side only `where` condition
314308
val newLeft = leftFilterConditions.
315309
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
316-
val newRight = rightJoinConditions.
317-
reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
318-
val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And)
310+
val newRight = right
311+
val newJoinCond = joinCondition
319312
val newJoin = Join(newLeft, newRight, LeftOuter, newJoinCond)
320313

321314
(rightFilterConditions ++ commonFilterCondition).
322315
reduceLeftOption(And).map(Filter(_, newJoin)).getOrElse(newJoin)
323-
case FullOuter =>
324-
// DO Nothing for Full Outer Join
316+
case FullOuter => f // DO Nothing for Full Outer Join
317+
}
318+
319+
// push down the join filter into sub query scanning if applicable
320+
case f @ Join(left, right, joinType, joinCondition) =>
321+
val (leftJoinConditions, rightJoinConditions, commonJoinCondition) =
322+
split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)
323+
324+
joinType match {
325+
case Inner =>
326+
// push down the single side only join filter for both sides sub queries
327+
val newLeft = leftJoinConditions.
328+
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
329+
val newRight = rightJoinConditions.
330+
reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
331+
val newJoinCond = commonJoinCondition.reduceLeftOption(And)
332+
333+
Join(newLeft, newRight, Inner, newJoinCond)
334+
case RightOuter =>
335+
// push down the left side only join filter for left side sub query
336+
val newLeft = leftJoinConditions.
337+
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
338+
val newRight = right
339+
val newJoinCond = (rightJoinConditions ++ commonJoinCondition).reduceLeftOption(And)
340+
341+
Join(newLeft, newRight, RightOuter, newJoinCond)
342+
case LeftOuter =>
343+
// push down the right side only join filter for right sub query
344+
val newLeft = left
345+
val newRight = rightJoinConditions.
346+
reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
347+
val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And)
325348

326-
f
349+
Join(newLeft, newRight, LeftOuter, newJoinCond)
350+
case FullOuter => f
327351
}
328352
}
329353
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala

Lines changed: 183 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@ package org.apache.spark.sql.catalyst.optimizer
2020
import org.apache.spark.sql.catalyst.analysis
2121
import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
2222
import org.apache.spark.sql.catalyst.plans.logical._
23+
import org.apache.spark.sql.catalyst.plans.Inner
24+
import org.apache.spark.sql.catalyst.plans.FullOuter
25+
import org.apache.spark.sql.catalyst.plans.LeftOuter
26+
import org.apache.spark.sql.catalyst.plans.RightOuter
2327
import org.apache.spark.sql.catalyst.rules._
24-
25-
/* Implicit conversions */
2628
import org.apache.spark.sql.catalyst.dsl.plans._
2729
import org.apache.spark.sql.catalyst.dsl.expressions._
30+
import org.junit.Test
2831

2932
class FilterPushdownSuite extends OptimizerTest {
3033

@@ -161,6 +164,184 @@ class FilterPushdownSuite extends OptimizerTest {
161164

162165
comparePlans(optimized, correctAnswer)
163166
}
167+
168+
test("joins: push down left outer join #1") {
169+
val x = testRelation.subquery('x)
170+
val y = testRelation.subquery('y)
171+
172+
val originalQuery = {
173+
x.join(y, LeftOuter)
174+
.where("x.b".attr === 1 && "y.b".attr === 2)
175+
}
176+
177+
val optimized = Optimize(originalQuery.analyze)
178+
val left = testRelation.where('b === 1)
179+
val correctAnswer =
180+
left.join(y, LeftOuter).where("y.b".attr === 2).analyze
181+
182+
comparePlans(optimized, correctAnswer)
183+
}
184+
185+
test("joins: push down right outer join #1") {
186+
val x = testRelation.subquery('x)
187+
val y = testRelation.subquery('y)
188+
189+
val originalQuery = {
190+
x.join(y, RightOuter)
191+
.where("x.b".attr === 1 && "y.b".attr === 2)
192+
}
193+
194+
val optimized = Optimize(originalQuery.analyze)
195+
val right = testRelation.where('b === 2).subquery('d)
196+
val correctAnswer =
197+
x.join(right, RightOuter).where("x.b".attr === 1).analyze
198+
199+
comparePlans(optimized, correctAnswer)
200+
}
201+
202+
test("joins: push down left outer join #2") {
203+
val x = testRelation.subquery('x)
204+
val y = testRelation.subquery('y)
205+
206+
val originalQuery = {
207+
x.join(y, LeftOuter, Some("x.b".attr === 1))
208+
.where("x.b".attr === 2 && "y.b".attr === 2)
209+
}
210+
211+
val optimized = Optimize(originalQuery.analyze)
212+
val left = testRelation.where('b === 2).subquery('d)
213+
val correctAnswer =
214+
left.join(y, LeftOuter, Some("d.b".attr === 1)).where("y.b".attr === 2).analyze
215+
216+
comparePlans(optimized, correctAnswer)
217+
}
218+
219+
test("joins: push down right outer join #2") {
220+
val x = testRelation.subquery('x)
221+
val y = testRelation.subquery('y)
222+
223+
val originalQuery = {
224+
x.join(y, RightOuter, Some("y.b".attr === 1))
225+
.where("x.b".attr === 2 && "y.b".attr === 2)
226+
}
227+
228+
val optimized = Optimize(originalQuery.analyze)
229+
val right = testRelation.where('b === 2).subquery('d)
230+
val correctAnswer =
231+
x.join(right, RightOuter, Some("d.b".attr === 1)).where("x.b".attr === 2).analyze
232+
233+
comparePlans(optimized, correctAnswer)
234+
}
235+
236+
test("joins: push down left outer join #3") {
237+
val x = testRelation.subquery('x)
238+
val y = testRelation.subquery('y)
239+
240+
val originalQuery = {
241+
x.join(y, LeftOuter, Some("y.b".attr === 1))
242+
.where("x.b".attr === 2 && "y.b".attr === 2)
243+
}
244+
245+
val optimized = Optimize(originalQuery.analyze)
246+
val left = testRelation.where('b === 2).subquery('l)
247+
val right = testRelation.where('b === 1).subquery('r)
248+
val correctAnswer =
249+
left.join(right, LeftOuter).where("r.b".attr === 2).analyze
250+
251+
comparePlans(optimized, correctAnswer)
252+
}
253+
254+
test("joins: push down right outer join #3") {
255+
val x = testRelation.subquery('x)
256+
val y = testRelation.subquery('y)
257+
258+
val originalQuery = {
259+
x.join(y, RightOuter, Some("y.b".attr === 1))
260+
.where("x.b".attr === 2 && "y.b".attr === 2)
261+
}
262+
263+
val optimized = Optimize(originalQuery.analyze)
264+
val right = testRelation.where('b === 2).subquery('r)
265+
val correctAnswer =
266+
x.join(right, RightOuter, Some("r.b".attr === 1)).where("x.b".attr === 2).analyze
267+
268+
comparePlans(optimized, correctAnswer)
269+
}
270+
271+
test("joins: push down left outer join #4") {
272+
val x = testRelation.subquery('x)
273+
val y = testRelation.subquery('y)
274+
275+
val originalQuery = {
276+
x.join(y, LeftOuter, Some("y.b".attr === 1))
277+
.where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr)
278+
}
279+
280+
val optimized = Optimize(originalQuery.analyze)
281+
val left = testRelation.where('b === 2).subquery('l)
282+
val right = testRelation.where('b === 1).subquery('r)
283+
val correctAnswer =
284+
left.join(right, LeftOuter).where("r.b".attr === 2 && "l.c".attr === "r.c".attr).analyze
285+
286+
comparePlans(optimized, correctAnswer)
287+
}
288+
289+
test("joins: push down right outer join #4") {
290+
val x = testRelation.subquery('x)
291+
val y = testRelation.subquery('y)
292+
293+
val originalQuery = {
294+
x.join(y, RightOuter, Some("y.b".attr === 1))
295+
.where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr)
296+
}
297+
298+
val optimized = Optimize(originalQuery.analyze)
299+
val left = testRelation.subquery('l)
300+
val right = testRelation.where('b === 2).subquery('r)
301+
val correctAnswer =
302+
left.join(right, RightOuter, Some("r.b".attr === 1)).
303+
where("l.b".attr === 2 && "l.c".attr === "r.c".attr).analyze
304+
305+
comparePlans(optimized, correctAnswer)
306+
}
307+
308+
test("joins: push down left outer join #5") {
309+
val x = testRelation.subquery('x)
310+
val y = testRelation.subquery('y)
311+
312+
val originalQuery = {
313+
x.join(y, LeftOuter, Some("y.b".attr === 1 && "x.a".attr === 3))
314+
.where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr)
315+
}
316+
317+
val optimized = Optimize(originalQuery.analyze)
318+
val left = testRelation.where('b === 2).subquery('l)
319+
val right = testRelation.where('b === 1).subquery('r)
320+
val correctAnswer =
321+
left.join(right, LeftOuter, Some("l.a".attr===3)).
322+
where("r.b".attr === 2 && "l.c".attr === "r.c".attr).analyze
323+
324+
comparePlans(optimized, correctAnswer)
325+
}
326+
327+
test("joins: push down right outer join #5") {
328+
val x = testRelation.subquery('x)
329+
val y = testRelation.subquery('y)
330+
331+
val originalQuery = {
332+
x.join(y, RightOuter, Some("y.b".attr === 1 && "x.a".attr === 3))
333+
.where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr)
334+
}
335+
336+
val optimized = Optimize(originalQuery.analyze)
337+
val left = testRelation.where('a === 3).subquery('l)
338+
val right = testRelation.where('b === 2).subquery('r)
339+
val correctAnswer =
340+
left.join(right, RightOuter, Some("r.b".attr === 1)).
341+
where("l.b".attr === 2 && "l.c".attr === "r.c".attr).analyze
342+
343+
comparePlans(optimized, correctAnswer)
344+
}
164345

165346
test("joins: can't push down") {
166347
val x = testRelation.subquery('x)

0 commit comments

Comments
 (0)