From 37c47d9b7f41fec9ca8f298bba01e9ef415f2082 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Thu, 7 Nov 2019 19:07:03 +0800 Subject: [PATCH 1/4] Support exists in join condition --- .../sql/catalyst/optimizer/Optimizer.scala | 39 ++++++++++- .../org/apache/spark/sql/SubquerySuite.scala | 65 +++++++++++++++++++ 2 files changed, 102 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index b78bdf082f33..461f0f7e7f23 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1296,6 +1296,35 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ nonDeterministic) } + private def split(condition: Seq[Expression], + joinType: JoinType, + left: LogicalPlan, + right: LogicalPlan) = { + val (pushDownCandidates, nonDeterministic) = condition.partition(_.deterministic) + + joinType match { + case _: InnerLike | LeftSemi | RightOuter => + val (leftEvaluateCondition, rest) = + pushDownCandidates.partition(_.references.subsetOf(left.outputSet)) + val (rightEvaluateCondition, commonCondition) = + rest.partition(expr => expr.references.subsetOf(right.outputSet)) + (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ nonDeterministic) + case LeftOuter | LeftAnti | ExistenceJoin(_) => + val (rightEvaluateCondition, rest) = + pushDownCandidates.partition(_.references.subsetOf(right.outputSet)) + val (leftEvaluateCondition, commonCondition) = + rest.partition(expr => expr.references.subsetOf(left.outputSet)) + (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ nonDeterministic) + case FullOuter => + val (bothEvaluateCondition, commonCondition) = + pushDownCandidates.partition(cond => + cond.references.subsetOf(left.outputSet) && cond.references.subsetOf(right.outputSet)) + (bothEvaluateCondition, bothEvaluateCondition, commonCondition) + case NaturalJoin(_) => (null, null, null) + case UsingJoin(_, _) => (null, null, null) + } + } + def apply(plan: LogicalPlan): LogicalPlan = plan transform applyLocally val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = { @@ -1348,7 +1377,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { // push down the join filter into sub query scanning if applicable case j @ Join(left, right, joinType, joinCondition, hint) => val (leftJoinConditions, rightJoinConditions, commonJoinCondition) = - split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right) + split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), joinType, left, right) joinType match { case _: InnerLike | LeftSemi => @@ -1376,7 +1405,13 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And) Join(newLeft, newRight, joinType, newJoinCond, hint) - case FullOuter => j + case FullOuter => + val newLeft = leftJoinConditions. + reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) + val newRight = rightJoinConditions. + reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) + val newJoinCond = commonJoinCondition.reduceLeftOption(And) + Join(newLeft, newRight, FullOuter, newJoinCond, hint) case NaturalJoin(_) => sys.error("Untransformed NaturalJoin node") case UsingJoin(_, _) => sys.error("Untransformed Using join node") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index e05af08dfb74..fad7f42694fc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -352,6 +352,71 @@ class SubquerySuite extends QueryTest with SharedSparkSession { } } + + test("SPARK-29769: JOIN Condition use EXISTS/NOT EXISTS") { + withTempView("s1", "s2", "s3") { + Seq(1, 3, 5, 7, 9).toDF("id").createOrReplaceTempView("s1") + Seq(1, 3, 4, 6, 9).toDF("id").createOrReplaceTempView("s2") + Seq(3, 4, 6, 9).toDF("id").createOrReplaceTempView("s3") + + checkAnswer( + sql( + """ + | SELECT s1.id FROM s1 + | JOIN s2 ON s1.id = s2.id + | AND EXISTS (SELECT * from s3 where s3.id > 6) + """.stripMargin), + Row(1) :: Row(3) :: Row(9) :: Nil) + + checkAnswer( + sql( + """ + | SELECT s1.id, s2.id as id2 FROM s1 + | RIGHT OUTER JOIN s2 ON s1.id = s2.id + | AND EXISTS (SELECT * from s3 where s3.id > 6) + """.stripMargin), + Row(1, 1) :: Row(3, 3) :: Row(null, 4) :: Row(null, 6) :: Row(9, 9) :: Nil) + + checkAnswer( + sql( + """ + | SELECT s1.id FROM s1 + | LEFT SEMI JOIN s2 ON s1.id = s2.id + | AND EXISTS (SELECT * from s3 where s3.id > 6) + """.stripMargin), + Row(1) :: Row(3) :: Row(9) :: Nil) + + checkAnswer( + sql( + """ + | SELECT s1.id FROM s1 + | LEFT ANTI JOIN s2 ON s1.id = s2.id + | AND EXISTS (SELECT * from s3 where s3.id > 6) + """.stripMargin), + Row(5) :: Row(7) :: Nil) + + + checkAnswer( + sql( + """ + | SELECT s1.id, s2.id as id2 FROM s1 + | LEFT OUTER JOIN s2 ON s1.id = s2.id + | AND EXISTS (SELECT * from s3 where s3.id > 6) + """.stripMargin), + Row(1, 1) :: Row(3, 3) :: Row(5, null) :: Row(7, null) :: Row(9, 9) :: Nil) + + + checkAnswer( + sql( + """ + | SELECT s1.id, s2.id as id2 FROM s1 + | FULL OUTER JOIN s2 ON s1.id = s2.id + | AND EXISTS (SELECT * from s3 where s3.id > 6) + """.stripMargin), + Row(1, 1) :: Row(3, 3) :: Row(5, null) :: Row(7, null) :: Row(null, 4) :: Row(null, 6) :: Row(9, 9) :: Nil) + } + } + test("SPARK-14791: scalar subquery inside broadcast join") { val df = sql("select a, sum(b) as s from l group by a having a > (select avg(a) from l)") val expected = Row(3, 2.0, 3, 3.0) :: Row(6, null, 6, null) :: Nil From 802d34de6d470081430217eefc9b12575330d03a Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 8 Nov 2019 09:01:30 +0800 Subject: [PATCH 2/4] fix scalastyle --- .../test/scala/org/apache/spark/sql/SubquerySuite.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index fad7f42694fc..f3e5f6e3149a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -394,8 +394,7 @@ class SubquerySuite extends QueryTest with SharedSparkSession { | AND EXISTS (SELECT * from s3 where s3.id > 6) """.stripMargin), Row(5) :: Row(7) :: Nil) - - + checkAnswer( sql( """ @@ -405,7 +404,6 @@ class SubquerySuite extends QueryTest with SharedSparkSession { """.stripMargin), Row(1, 1) :: Row(3, 3) :: Row(5, null) :: Row(7, null) :: Row(9, 9) :: Nil) - checkAnswer( sql( """ @@ -413,7 +411,8 @@ class SubquerySuite extends QueryTest with SharedSparkSession { | FULL OUTER JOIN s2 ON s1.id = s2.id | AND EXISTS (SELECT * from s3 where s3.id > 6) """.stripMargin), - Row(1, 1) :: Row(3, 3) :: Row(5, null) :: Row(7, null) :: Row(null, 4) :: Row(null, 6) :: Row(9, 9) :: Nil) + Row(1, 1) :: Row(3, 3) :: Row(5, null) :: Row(7, null) :: + Row(null, 4) :: Row(null, 6) :: Row(9, 9) :: Nil) } } From 9b510c499119ef3962791e501d65c89fa45de785 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 8 Nov 2019 09:33:36 +0800 Subject: [PATCH 3/4] Update SubquerySuite.scala --- .../scala/org/apache/spark/sql/SubquerySuite.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index f3e5f6e3149a..7b7a01e85484 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -378,12 +378,12 @@ class SubquerySuite extends QueryTest with SharedSparkSession { Row(1, 1) :: Row(3, 3) :: Row(null, 4) :: Row(null, 6) :: Row(9, 9) :: Nil) checkAnswer( - sql( - """ - | SELECT s1.id FROM s1 - | LEFT SEMI JOIN s2 ON s1.id = s2.id - | AND EXISTS (SELECT * from s3 where s3.id > 6) - """.stripMargin), + sql( + """ + | SELECT s1.id FROM s1 + | LEFT SEMI JOIN s2 ON s1.id = s2.id + | AND EXISTS (SELECT * from s3 where s3.id > 6) + """.stripMargin), Row(1) :: Row(3) :: Row(9) :: Nil) checkAnswer( @@ -394,7 +394,7 @@ class SubquerySuite extends QueryTest with SharedSparkSession { | AND EXISTS (SELECT * from s3 where s3.id > 6) """.stripMargin), Row(5) :: Row(7) :: Nil) - + checkAnswer( sql( """ From 17dccd0e5f9284dcc903147947b98471976a4df8 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 8 Nov 2019 11:43:56 +0800 Subject: [PATCH 4/4] remove support for full outer join --- .../sql/catalyst/optimizer/Optimizer.scala | 13 ++----- .../org/apache/spark/sql/SubquerySuite.scala | 34 ++++++++++++++++--- 2 files changed, 32 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 461f0f7e7f23..e1bb36a4b638 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1315,11 +1315,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { val (leftEvaluateCondition, commonCondition) = rest.partition(expr => expr.references.subsetOf(left.outputSet)) (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ nonDeterministic) - case FullOuter => - val (bothEvaluateCondition, commonCondition) = - pushDownCandidates.partition(cond => - cond.references.subsetOf(left.outputSet) && cond.references.subsetOf(right.outputSet)) - (bothEvaluateCondition, bothEvaluateCondition, commonCondition) + case FullOuter => (null, null, null) case NaturalJoin(_) => (null, null, null) case UsingJoin(_, _) => (null, null, null) } @@ -1406,12 +1402,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { Join(newLeft, newRight, joinType, newJoinCond, hint) case FullOuter => - val newLeft = leftJoinConditions. - reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) - val newRight = rightJoinConditions. - reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - Join(newLeft, newRight, FullOuter, newJoinCond, hint) + j case NaturalJoin(_) => sys.error("Untransformed NaturalJoin node") case UsingJoin(_, _) => sys.error("Untransformed Using join node") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 7b7a01e85484..d07fadb8a109 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -377,6 +377,15 @@ class SubquerySuite extends QueryTest with SharedSparkSession { """.stripMargin), Row(1, 1) :: Row(3, 3) :: Row(null, 4) :: Row(null, 6) :: Row(9, 9) :: Nil) + checkAnswer( + sql( + """ + | SELECT s1.id, s2.id as id2 FROM s1 + | RIGHT OUTER JOIN s2 ON s1.id = s2.id + | AND NOT EXISTS (SELECT * from s3 where s3.id > 6) + """.stripMargin), + Row(null, 1) :: Row(null, 3) :: Row(null, 4) :: Row(null, 6) :: Row(null, 9) :: Nil) + checkAnswer( sql( """ @@ -386,6 +395,15 @@ class SubquerySuite extends QueryTest with SharedSparkSession { """.stripMargin), Row(1) :: Row(3) :: Row(9) :: Nil) + checkAnswer( + sql( + """ + | SELECT s1.id FROM s1 + | LEFT SEMI JOIN s2 ON s1.id = s2.id + | AND NOT EXISTS (SELECT * from s3 where s3.id > 6) + """.stripMargin), + Nil) + checkAnswer( sql( """ @@ -395,6 +413,15 @@ class SubquerySuite extends QueryTest with SharedSparkSession { """.stripMargin), Row(5) :: Row(7) :: Nil) + checkAnswer( + sql( + """ + | SELECT s1.id FROM s1 + | LEFT ANTI JOIN s2 ON s1.id = s2.id + | AND NOT EXISTS (SELECT * from s3 where s3.id > 6) + """.stripMargin), + Row(1) :: Row(3):: Row(5) :: Row(7) :: Row(9) :: Nil) + checkAnswer( sql( """ @@ -408,11 +435,10 @@ class SubquerySuite extends QueryTest with SharedSparkSession { sql( """ | SELECT s1.id, s2.id as id2 FROM s1 - | FULL OUTER JOIN s2 ON s1.id = s2.id - | AND EXISTS (SELECT * from s3 where s3.id > 6) + | LEFT OUTER JOIN s2 ON s1.id = s2.id + | AND NOT EXISTS (SELECT * from s3 where s3.id > 6) """.stripMargin), - Row(1, 1) :: Row(3, 3) :: Row(5, null) :: Row(7, null) :: - Row(null, 4) :: Row(null, 6) :: Row(9, 9) :: Nil) + Row(1, null) :: Row(3, null) :: Row(5, null) :: Row(7, null) :: Row(9, null) :: Nil) } }