Skip to content

Commit f158045

Browse files
committed
[SPARK-18597][SQL] Do not push-down join conditions to the left side of a Left Anti join [BRANCH-2.0]
## What changes were proposed in this pull request? We currently push down join conditions of a Left Anti join to both sides of the join. This is similar to Inner, Left Semi and Existence (a specialized left semi) join. The problem is that this changes the semantics of the join; a left anti join filters out rows that matches the join condition. This PR fixes this by only pushing down conditions to the right hand side of the join. This is similar to the behavior of left outer join. This PR is a backport of #16026 ## How was this patch tested? Added tests to `FilterPushdownSuite.scala` and created a SQLQueryTestSuite file for left anti joins with a regression test. Author: Herman van Hovell <[email protected]> Closes #16039 from hvanhovell/SPARK-18597-branch-2.0.
1 parent 759bd4a commit f158045

File tree

4 files changed

+72
-3
lines changed

4 files changed

+72
-3
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1289,7 +1289,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
12891289
split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)
12901290

12911291
joinType match {
1292-
case Inner | LeftExistence(_) =>
1292+
case Inner | LeftSemi | ExistenceJoin(_) =>
12931293
// push down the single side only join filter for both sides sub queries
12941294
val newLeft = leftJoinConditions.
12951295
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
@@ -1306,14 +1306,14 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
13061306
val newJoinCond = (rightJoinConditions ++ commonJoinCondition).reduceLeftOption(And)
13071307

13081308
Join(newLeft, newRight, RightOuter, newJoinCond)
1309-
case LeftOuter =>
1309+
case LeftOuter | LeftAnti =>
13101310
// push down the right side only join filter for right sub query
13111311
val newLeft = left
13121312
val newRight = rightJoinConditions.
13131313
reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
13141314
val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And)
13151315

1316-
Join(newLeft, newRight, LeftOuter, newJoinCond)
1316+
Join(newLeft, newRight, joinType, newJoinCond)
13171317
case FullOuter => j
13181318
case NaturalJoin(_) => sys.error("Untransformed NaturalJoin node")
13191319
case UsingJoin(_, _) => sys.error("Untransformed Using join node")

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,39 @@ class FilterPushdownSuite extends PlanTest {
514514
comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
515515
}
516516

517+
test("joins: push down where clause into left anti join") {
518+
val x = testRelation.subquery('x)
519+
val y = testRelation.subquery('y)
520+
val originalQuery =
521+
x.join(y, LeftAnti, Some("x.b".attr === "y.b".attr))
522+
.where("x.a".attr > 10)
523+
.analyze
524+
val optimized = Optimize.execute(originalQuery)
525+
val correctAnswer =
526+
x.where("x.a".attr > 10)
527+
.join(y, LeftAnti, Some("x.b".attr === "y.b".attr))
528+
.analyze
529+
comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
530+
}
531+
532+
test("joins: only push down join conditions to the right of a left anti join") {
533+
val x = testRelation.subquery('x)
534+
val y = testRelation.subquery('y)
535+
val originalQuery =
536+
x.join(y,
537+
LeftAnti,
538+
Some("x.b".attr === "y.b".attr && "y.a".attr > 10 && "x.a".attr > 10)).analyze
539+
val optimized = Optimize.execute(originalQuery)
540+
val correctAnswer =
541+
x.join(
542+
y.where("y.a".attr > 10),
543+
LeftAnti,
544+
Some("x.b".attr === "y.b".attr && "x.a".attr > 10))
545+
.analyze
546+
comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
547+
}
548+
549+
517550
val testRelationWithArrayType = LocalRelation('a.int, 'b.int, 'c_arr.array(IntegerType))
518551

519552
test("generate: predicate referenced no generated column") {
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
-- SPARK-18597: Do not push down predicates to left hand side in an anti-join
2+
CREATE OR REPLACE TEMPORARY VIEW tbl_a AS VALUES (1, 1), (2, 1), (3, 6) AS T(c1, c2);
3+
CREATE OR REPLACE TEMPORARY VIEW tbl_b AS VALUES 1 AS T(c1);
4+
5+
SELECT *
6+
FROM tbl_a
7+
LEFT ANTI JOIN tbl_b ON ((tbl_a.c1 = tbl_a.c2) IS NULL OR tbl_a.c1 = tbl_a.c2);
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
-- Automatically generated by SQLQueryTestSuite
2+
-- Number of queries: 3
3+
4+
5+
-- !query 0
6+
CREATE OR REPLACE TEMPORARY VIEW tbl_a AS VALUES (1, 1), (2, 1), (3, 6) AS T(c1, c2)
7+
-- !query 0 schema
8+
struct<>
9+
-- !query 0 output
10+
11+
12+
13+
-- !query 1
14+
CREATE OR REPLACE TEMPORARY VIEW tbl_b AS VALUES 1 AS T(c1)
15+
-- !query 1 schema
16+
struct<>
17+
-- !query 1 output
18+
19+
20+
21+
-- !query 2
22+
SELECT *
23+
FROM tbl_a
24+
LEFT ANTI JOIN tbl_b ON ((tbl_a.c1 = tbl_a.c2) IS NULL OR tbl_a.c1 = tbl_a.c2)
25+
-- !query 2 schema
26+
struct<c1:int,c2:int>
27+
-- !query 2 output
28+
2 1
29+
3 6

0 commit comments

Comments
 (0)