Skip to content

Commit 7d87244

Browse files
author
Davies Liu
committed
push filter throughout outer join
1 parent 5f37aad commit 7d87244

File tree

2 files changed

+55
-27
lines changed

2 files changed

+55
-27
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -947,14 +947,40 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
947947
(leftEvaluateCondition, rightEvaluateCondition, commonCondition)
948948
}
949949

950+
/**
951+
* Returns whether the expression returns null or false when all inputs are nulls.
952+
*/
953+
private def canFilterOutNull(e: Expression): Boolean = {
954+
val attributes = e.references.toSeq
955+
val emptyRow = new GenericInternalRow(attributes.length)
956+
val v = BindReferences.bindReference(e, attributes).eval(emptyRow)
957+
v == null || v == false
958+
}
959+
960+
/**
961+
* Returns whether the join could be inner join or not.
962+
*
963+
* If a left/right outer join followed by a filter with a condition that could filter out rows
964+
* with null from right/left, the left/right outer join has the same result as inner join,
965+
* should be rewritten as inner join.
966+
*/
967+
private def isInnerJoin(
968+
joinType: JoinType,
969+
leftCond: Seq[Expression],
970+
rightCond: Seq[Expression]): Boolean = {
971+
joinType == Inner ||
972+
joinType == RightOuter && leftCond.exists(canFilterOutNull) ||
973+
joinType == LeftOuter && rightCond.exists(canFilterOutNull)
974+
}
975+
950976
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
951977
// push the where condition down into join filter
952978
case f @ Filter(filterCondition, Join(left, right, joinType, joinCondition)) =>
953979
val (leftFilterConditions, rightFilterConditions, commonFilterCondition) =
954980
split(splitConjunctivePredicates(filterCondition), left, right)
955981

956982
joinType match {
957-
case Inner =>
983+
case _ if isInnerJoin(joinType, leftFilterConditions, rightFilterConditions) =>
958984
// push down the single side `where` condition into respective sides
959985
val newLeft = leftFilterConditions.
960986
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
@@ -963,6 +989,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
963989
val newJoinCond = (commonFilterCondition ++ joinCondition).reduceLeftOption(And)
964990

965991
Join(newLeft, newRight, Inner, newJoinCond)
992+
966993
case RightOuter =>
967994
// push down the right side only `where` condition
968995
val newLeft = left
@@ -973,6 +1000,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
9731000

9741001
(leftFilterConditions ++ commonFilterCondition).
9751002
reduceLeftOption(And).map(Filter(_, newJoin)).getOrElse(newJoin)
1003+
9761004
case _ @ (LeftOuter | LeftSemi) =>
9771005
// push down the left side only `where` condition
9781006
val newLeft = leftFilterConditions.
@@ -1080,7 +1108,7 @@ object SimplifyCaseConversionExpressions extends Rule[LogicalPlan] {
10801108
* [[org.apache.spark.sql.catalyst.analysis.DecimalPrecision]].
10811109
*/
10821110
object DecimalAggregates extends Rule[LogicalPlan] {
1083-
import Decimal.MAX_LONG_DIGITS
1111+
import org.apache.spark.sql.types.Decimal.MAX_LONG_DIGITS
10841112

10851113
/** Maximum number of decimal digits representable precisely in a Double */
10861114
private val MAX_DOUBLE_DIGITS = 15

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
2222
import org.apache.spark.sql.catalyst.dsl.expressions._
2323
import org.apache.spark.sql.catalyst.dsl.plans._
2424
import org.apache.spark.sql.catalyst.expressions._
25-
import org.apache.spark.sql.catalyst.plans.{LeftOuter, LeftSemi, PlanTest, RightOuter}
25+
import org.apache.spark.sql.catalyst.plans._
2626
import org.apache.spark.sql.catalyst.plans.logical._
2727
import org.apache.spark.sql.catalyst.rules._
2828
import org.apache.spark.sql.types.IntegerType
@@ -300,13 +300,13 @@ class FilterPushdownSuite extends PlanTest {
300300

301301
val originalQuery = {
302302
x.join(y, LeftOuter)
303-
.where("x.b".attr === 1 && "y.b".attr === 2)
303+
.where("x.b".attr === 1 && "y.b".attr.isNull)
304304
}
305305

306306
val optimized = Optimize.execute(originalQuery.analyze)
307307
val left = testRelation.where('b === 1)
308308
val correctAnswer =
309-
left.join(y, LeftOuter).where("y.b".attr === 2).analyze
309+
left.join(y, LeftOuter).where("y.b".attr.isNull).analyze
310310

311311
comparePlans(optimized, correctAnswer)
312312
}
@@ -317,13 +317,13 @@ class FilterPushdownSuite extends PlanTest {
317317

318318
val originalQuery = {
319319
x.join(y, RightOuter)
320-
.where("x.b".attr === 1 && "y.b".attr === 2)
320+
.where("x.b".attr.isNull && "y.b".attr === 2)
321321
}
322322

323323
val optimized = Optimize.execute(originalQuery.analyze)
324324
val right = testRelation.where('b === 2).subquery('d)
325325
val correctAnswer =
326-
x.join(right, RightOuter).where("x.b".attr === 1).analyze
326+
x.join(right, RightOuter).where("x.b".attr.isNull).analyze
327327

328328
comparePlans(optimized, correctAnswer)
329329
}
@@ -334,13 +334,13 @@ class FilterPushdownSuite extends PlanTest {
334334

335335
val originalQuery = {
336336
x.join(y, LeftOuter, Some("x.b".attr === 1))
337-
.where("x.b".attr === 2 && "y.b".attr === 2)
337+
.where("x.b".attr === 2 && "y.b".attr.isNull)
338338
}
339339

340340
val optimized = Optimize.execute(originalQuery.analyze)
341341
val left = testRelation.where('b === 2).subquery('d)
342342
val correctAnswer =
343-
left.join(y, LeftOuter, Some("d.b".attr === 1)).where("y.b".attr === 2).analyze
343+
left.join(y, LeftOuter, Some("d.b".attr === 1)).where("y.b".attr.isNull).analyze
344344

345345
comparePlans(optimized, correctAnswer)
346346
}
@@ -351,13 +351,13 @@ class FilterPushdownSuite extends PlanTest {
351351

352352
val originalQuery = {
353353
x.join(y, RightOuter, Some("y.b".attr === 1))
354-
.where("x.b".attr === 2 && "y.b".attr === 2)
354+
.where("x.b".attr.isNull && "y.b".attr === 2)
355355
}
356356

357357
val optimized = Optimize.execute(originalQuery.analyze)
358358
val right = testRelation.where('b === 2).subquery('d)
359359
val correctAnswer =
360-
x.join(right, RightOuter, Some("d.b".attr === 1)).where("x.b".attr === 2).analyze
360+
x.join(right, RightOuter, Some("d.b".attr === 1)).where("x.b".attr.isNull).analyze
361361

362362
comparePlans(optimized, correctAnswer)
363363
}
@@ -368,14 +368,14 @@ class FilterPushdownSuite extends PlanTest {
368368

369369
val originalQuery = {
370370
x.join(y, LeftOuter, Some("y.b".attr === 1))
371-
.where("x.b".attr === 2 && "y.b".attr === 2)
371+
.where("x.b".attr === 2 && "y.b".attr.isNull)
372372
}
373373

374374
val optimized = Optimize.execute(originalQuery.analyze)
375375
val left = testRelation.where('b === 2).subquery('l)
376376
val right = testRelation.where('b === 1).subquery('r)
377377
val correctAnswer =
378-
left.join(right, LeftOuter).where("r.b".attr === 2).analyze
378+
left.join(right, LeftOuter).where("r.b".attr.isNull).analyze
379379

380380
comparePlans(optimized, correctAnswer)
381381
}
@@ -386,13 +386,13 @@ class FilterPushdownSuite extends PlanTest {
386386

387387
val originalQuery = {
388388
x.join(y, RightOuter, Some("y.b".attr === 1))
389-
.where("x.b".attr === 2 && "y.b".attr === 2)
389+
.where("x.b".attr.isNull && "y.b".attr === 2)
390390
}
391391

392392
val optimized = Optimize.execute(originalQuery.analyze)
393393
val right = testRelation.where('b === 2).subquery('r)
394394
val correctAnswer =
395-
x.join(right, RightOuter, Some("r.b".attr === 1)).where("x.b".attr === 2).analyze
395+
x.join(right, RightOuter, Some("r.b".attr === 1)).where("x.b".attr.isNull).analyze
396396

397397
comparePlans(optimized, correctAnswer)
398398
}
@@ -403,14 +403,14 @@ class FilterPushdownSuite extends PlanTest {
403403

404404
val originalQuery = {
405405
x.join(y, LeftOuter, Some("y.b".attr === 1))
406-
.where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr)
406+
.where("x.b".attr === 2 && "y.b".attr.isNull && "x.c".attr === "y.c".attr)
407407
}
408408

409409
val optimized = Optimize.execute(originalQuery.analyze)
410410
val left = testRelation.where('b === 2).subquery('l)
411411
val right = testRelation.where('b === 1).subquery('r)
412412
val correctAnswer =
413-
left.join(right, LeftOuter).where("r.b".attr === 2 && "l.c".attr === "r.c".attr).analyze
413+
left.join(right, LeftOuter).where("r.b".attr.isNull && "l.c".attr === "r.c".attr).analyze
414414

415415
comparePlans(optimized, correctAnswer)
416416
}
@@ -421,15 +421,15 @@ class FilterPushdownSuite extends PlanTest {
421421

422422
val originalQuery = {
423423
x.join(y, RightOuter, Some("y.b".attr === 1))
424-
.where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr)
424+
.where("x.b".attr.isNull && "y.b".attr === 2 && "x.c".attr === "y.c".attr)
425425
}
426426

427427
val optimized = Optimize.execute(originalQuery.analyze)
428428
val left = testRelation.subquery('l)
429429
val right = testRelation.where('b === 2).subquery('r)
430430
val correctAnswer =
431431
left.join(right, RightOuter, Some("r.b".attr === 1)).
432-
where("l.b".attr === 2 && "l.c".attr === "r.c".attr).analyze
432+
where("l.b".attr.isNull && "l.c".attr === "r.c".attr).analyze
433433

434434
comparePlans(optimized, correctAnswer)
435435
}
@@ -439,16 +439,16 @@ class FilterPushdownSuite extends PlanTest {
439439
val y = testRelation.subquery('y)
440440

441441
val originalQuery = {
442-
x.join(y, LeftOuter, Some("y.b".attr === 1 && "x.a".attr === 3))
442+
x.join(y, LeftOuter, Some("y.a".attr === 1 && "x.a".attr === 3))
443443
.where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr)
444444
}
445445

446446
val optimized = Optimize.execute(originalQuery.analyze)
447447
val left = testRelation.where('b === 2).subquery('l)
448-
val right = testRelation.where('b === 1).subquery('r)
448+
val right = testRelation.where('b === 2).subquery('r)
449449
val correctAnswer =
450-
left.join(right, LeftOuter, Some("l.a".attr===3)).
451-
where("r.b".attr === 2 && "l.c".attr === "r.c".attr).analyze
450+
left.join(right, Inner,
451+
Some("l.c".attr === "r.c".attr && ("r.a".attr === 1 && "l.a".attr === 3))).analyze
452452

453453
comparePlans(optimized, correctAnswer)
454454
}
@@ -458,16 +458,16 @@ class FilterPushdownSuite extends PlanTest {
458458
val y = testRelation.subquery('y)
459459

460460
val originalQuery = {
461-
x.join(y, RightOuter, Some("y.b".attr === 1 && "x.a".attr === 3))
461+
x.join(y, RightOuter, Some("y.a".attr === 1 && "x.a".attr === 3))
462462
.where("x.b".attr === 2 && "y.b".attr === 2 && "x.c".attr === "y.c".attr)
463463
}
464464

465465
val optimized = Optimize.execute(originalQuery.analyze)
466-
val left = testRelation.where('a === 3).subquery('l)
466+
val left = testRelation.where('b === 2).subquery('l)
467467
val right = testRelation.where('b === 2).subquery('r)
468468
val correctAnswer =
469-
left.join(right, RightOuter, Some("r.b".attr === 1)).
470-
where("l.b".attr === 2 && "l.c".attr === "r.c".attr).analyze
469+
left.join(right, Inner,
470+
Some("l.c".attr === "r.c".attr && ("r.a".attr === 1 && "l.a".attr === 3))).analyze
471471

472472
comparePlans(optimized, correctAnswer)
473473
}

0 commit comments

Comments
 (0)