From c2db998cb7bdd4d9fcf9d594c98385324e5b0aaa Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Sat, 30 Mar 2019 14:42:58 -0700 Subject: [PATCH 1/5] [SPARK-19712] Don't do partial pushdown when pushing down LeftAnti joins below Aggregate or window operators --- .../optimizer/PushDownLeftSemiAntiJoin.scala | 36 +++++++++++++------ .../LeftSemiAntiJoinPushDownSuite.scala | 32 +++++++++++++++-- 2 files changed, 56 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala index bc868df3dbb0..f2fca8fbf7ce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala @@ -70,10 +70,12 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper { } // Check if the remaining predicates do not contain columns from the right - // hand side of the join. Since the remaining predicates will be kept - // as a filter over aggregate, this check is necessary after the left semi - // or left anti join is moved below aggregate. The reason is, for this kind - // of join, we only output from the left leg of the join. + // hand side of the join. In case of LeftSemi join, since remaining predicates + // will be kept as a filter over aggregate, this check is necessary after the left semi join + // is moved below aggregate. The reason is, for this kind of join, we only output from the + // left leg of the join. In case of left anti join, the join is pushed down when + // the entire join condition is eligible to be pushdown to preserve the semantics of + // left anti join. val rightOpColumns = AttributeSet(stayUp.toSet).intersect(rightOp.outputSet) if (pushDown.nonEmpty && rightOpColumns.isEmpty) { @@ -82,7 +84,14 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper { val newAgg = agg.copy(child = Join(agg.child, rightOp, joinType, Option(replaced), hint)) // If there is no more filter to stay up, just return the Aggregate over Join. // Otherwise, create "Filter(stayUp) <- Aggregate <- Join(pushDownPredicate)". - if (stayUp.isEmpty) newAgg else Filter(stayUp.reduce(And), newAgg) + if (stayUp.isEmpty) { + newAgg + } else { + joinType match { + case LeftSemi => Filter(stayUp.reduce(And), newAgg) + case _ => join + } + } } else { // The join condition is not a subset of the Aggregate's GROUP BY columns, // no push down. @@ -105,16 +114,23 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper { } // Check if the remaining predicates do not contain columns from the right - // hand side of the join. Since the remaining predicates will be kept - // as a filter over window, this check is necessary after the left semi - // or left anti join is moved below window. The reason is, for this kind - // of join, we only output from the left leg of the join. + // hand side of the join. In case of LeftSemi join, since remaining predicates + // will be kept as a filter over aggregate, this check is necessary after the left semi join + // is moved below aggregate. The reason is, for this kind of join, we only output from the + // left leg of the join. In case of left anti join, the join is pushed down when + // the entire join condition is eligible to be pushdown to preserve the semantics of + // left anti join. val rightOpColumns = AttributeSet(stayUp.toSet).intersect(rightOp.outputSet) if (pushDown.nonEmpty && rightOpColumns.isEmpty) { val predicate = pushDown.reduce(And) val newPlan = w.copy(child = Join(w.child, rightOp, joinType, Option(predicate), hint)) - if (stayUp.isEmpty) newPlan else Filter(stayUp.reduce(And), newPlan) + if (stayUp.isEmpty) newPlan else { + joinType match { + case LeftSemi => Filter(stayUp.reduce(And), newPlan) + case _ => join + } + } } else { // The join condition is not a subset of the Window's PARTITION BY clause, // no push down. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala index 1a0231ed2d99..6105771697ba 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala @@ -117,7 +117,7 @@ class LeftSemiPushdownSuite extends PlanTest { comparePlans(optimized, originalQuery.analyze) } - test("Aggregate: LeftSemiAnti join partial pushdown") { + test("Aggregate: LeftSemi join partial pushdown") { val originalQuery = testRelation .groupBy('b)('b, sum('c).as('sum)) .join(testRelation1, joinType = LeftSemi, condition = Some('b === 'd && 'sum === 10)) @@ -132,6 +132,15 @@ class LeftSemiPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("Aggregate: LeftAnti join no pushdown") { + val originalQuery = testRelation + .groupBy('b)('b, sum('c).as('sum)) + .join(testRelation1, joinType = LeftAnti, condition = Some('b === 'd && 'sum === 10)) + + val optimized = Optimize.execute(originalQuery.analyze) + comparePlans(optimized, originalQuery.analyze) + } + test("LeftSemiAnti join over aggregate - no pushdown") { val originalQuery = testRelation .groupBy('b)('b, sum('c).as('sum)) @@ -174,7 +183,7 @@ class LeftSemiPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } - test("Window: LeftSemiAnti partial pushdown") { + test("Window: LeftSemi partial pushdown") { // Attributes from join condition which does not refer to the window partition spec // are kept up in the plan as a Filter operator above Window. val winExpr = windowExpr(count('b), windowSpec('a :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) @@ -195,6 +204,25 @@ class LeftSemiPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("Window: LeftAnti no pushdown") { + // Attributes from join condition which does not refer to the window partition spec + // are kept up in the plan as a Filter operator above Window. + val winExpr = windowExpr(count('b), windowSpec('a :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) + + val originalQuery = testRelation + .select('a, 'b, 'c, winExpr.as('window)) + .join(testRelation1, joinType = LeftAnti, condition = Some('a === 'd && 'b > 5)) + + val optimized = Optimize.execute(originalQuery.analyze) + + val correctAnswer = testRelation + .select('a, 'b, 'c) + .window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil) + .join(testRelation1, joinType = LeftAnti, condition = Some('a === 'd && 'b > 5)) + .select('a, 'b, 'c, 'window).analyze + comparePlans(optimized, correctAnswer) + } + test("Union: LeftSemiAnti join pushdown") { val testRelation2 = LocalRelation('x.int, 'y.int, 'z.int) From 84a5d5daf9e4ec2ab3882d34caa4ec40e578c23a Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Mon, 1 Apr 2019 00:19:12 -0700 Subject: [PATCH 2/5] Code review --- .../optimizer/PushDownLeftSemiAntiJoin.scala | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala index f2fca8fbf7ce..1b52823832f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala @@ -73,9 +73,7 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper { // hand side of the join. In case of LeftSemi join, since remaining predicates // will be kept as a filter over aggregate, this check is necessary after the left semi join // is moved below aggregate. The reason is, for this kind of join, we only output from the - // left leg of the join. In case of left anti join, the join is pushed down when - // the entire join condition is eligible to be pushdown to preserve the semantics of - // left anti join. + // left leg of the join. val rightOpColumns = AttributeSet(stayUp.toSet).intersect(rightOp.outputSet) if (pushDown.nonEmpty && rightOpColumns.isEmpty) { @@ -87,6 +85,8 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper { if (stayUp.isEmpty) { newAgg } else { + // In case of left anti join, the join is pushed down when the entire join condition + // is eligible to be pushed down to preserve the semantics of left anti join. joinType match { case LeftSemi => Filter(stayUp.reduce(And), newAgg) case _ => join @@ -117,15 +117,17 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper { // hand side of the join. In case of LeftSemi join, since remaining predicates // will be kept as a filter over aggregate, this check is necessary after the left semi join // is moved below aggregate. The reason is, for this kind of join, we only output from the - // left leg of the join. In case of left anti join, the join is pushed down when - // the entire join condition is eligible to be pushdown to preserve the semantics of - // left anti join. + // left leg of the join. val rightOpColumns = AttributeSet(stayUp.toSet).intersect(rightOp.outputSet) if (pushDown.nonEmpty && rightOpColumns.isEmpty) { val predicate = pushDown.reduce(And) val newPlan = w.copy(child = Join(w.child, rightOp, joinType, Option(predicate), hint)) - if (stayUp.isEmpty) newPlan else { + if (stayUp.isEmpty) { + newPlan + } else { + // In case of left anti join, the join is pushed down when the entire join condition + // is eligible to be pushed down to preserve the semantics of left anti join. joinType match { case LeftSemi => Filter(stayUp.reduce(And), newPlan) case _ => join From 80b345a10c53f21d695069ec868eed58e8d7d4d9 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Mon, 1 Apr 2019 00:50:29 -0700 Subject: [PATCH 3/5] code review --- .../optimizer/PushDownLeftSemiAntiJoin.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala index 1b52823832f1..74b3d38b0965 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala @@ -85,10 +85,12 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper { if (stayUp.isEmpty) { newAgg } else { - // In case of left anti join, the join is pushed down when the entire join condition - // is eligible to be pushed down to preserve the semantics of left anti join. joinType match { + // In case of Left semi join, the part of the join condition which does not refer to + // to child attributes of the aggregate operator are kept as a Filter over window. case LeftSemi => Filter(stayUp.reduce(And), newAgg) + // In case of left anti join, the join is pushed down when the entire join condition + // is eligible to be pushed down to preserve the semantics of left anti join. case _ => join } } @@ -115,8 +117,8 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper { // Check if the remaining predicates do not contain columns from the right // hand side of the join. In case of LeftSemi join, since remaining predicates - // will be kept as a filter over aggregate, this check is necessary after the left semi join - // is moved below aggregate. The reason is, for this kind of join, we only output from the + // will be kept as a filter over window, this check is necessary after the left semi join + // is moved below window. The reason is, for this kind of join, we only output from the // left leg of the join. val rightOpColumns = AttributeSet(stayUp.toSet).intersect(rightOp.outputSet) @@ -126,10 +128,12 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper { if (stayUp.isEmpty) { newPlan } else { - // In case of left anti join, the join is pushed down when the entire join condition - // is eligible to be pushed down to preserve the semantics of left anti join. joinType match { + // In case of Left semi join, the part of the join condition which does not refer to + // to partition attributes of the window operator are kept as a Filter over window. case LeftSemi => Filter(stayUp.reduce(And), newPlan) + // In case of left anti join, the join is pushed down when the entire join condition + // is eligible to be pushed down to preserve the semantics of left anti join. case _ => join } } From 69121b16a3e0a4bc82335e0f50a03373713591c2 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Mon, 1 Apr 2019 07:57:58 -0700 Subject: [PATCH 4/5] Code review --- .../optimizer/PushDownLeftSemiAntiJoin.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala index 74b3d38b0965..87a4975051fd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala @@ -70,10 +70,10 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper { } // Check if the remaining predicates do not contain columns from the right - // hand side of the join. In case of LeftSemi join, since remaining predicates - // will be kept as a filter over aggregate, this check is necessary after the left semi join - // is moved below aggregate. The reason is, for this kind of join, we only output from the - // left leg of the join. + // hand side of the join. Since the remaining predicates will be kept + // as a filter over aggregate, this check is necessary after the left semi + // or left anti join is moved below aggregate. The reason is, for this kind + // of join, we only output from the left leg of the join. val rightOpColumns = AttributeSet(stayUp.toSet).intersect(rightOp.outputSet) if (pushDown.nonEmpty && rightOpColumns.isEmpty) { @@ -116,10 +116,10 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper { } // Check if the remaining predicates do not contain columns from the right - // hand side of the join. In case of LeftSemi join, since remaining predicates - // will be kept as a filter over window, this check is necessary after the left semi join - // is moved below window. The reason is, for this kind of join, we only output from the - // left leg of the join. + // hand side of the join. Since the remaining predicates will be kept + // as a filter over window, this check is necessary after the left semi + // or left anti join is moved below window. The reason is, for this kind + // of join, we only output from the left leg of the join. val rightOpColumns = AttributeSet(stayUp.toSet).intersect(rightOp.outputSet) if (pushDown.nonEmpty && rightOpColumns.isEmpty) { From 72a45521c72018e0165f982161a00422f7982263 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Mon, 1 Apr 2019 20:43:12 -0700 Subject: [PATCH 5/5] Code review --- .../optimizer/PushDownLeftSemiAntiJoin.scala | 9 ++++++++- .../optimizer/LeftSemiAntiJoinPushDownSuite.scala | 12 +++++++++++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala index 87a4975051fd..afe2cfa81ffe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala @@ -206,7 +206,14 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper { if (pushDown.nonEmpty && rightOpColumns.isEmpty) { val newChild = insertJoin(Option(pushDown.reduceLeft(And))) if (stayUp.nonEmpty) { - Filter(stayUp.reduceLeft(And), newChild) + join.joinType match { + // In case of Left semi join, the part of the join condition which does not refer to + // to attributes of the grandchild are kept as a Filter over window. + case LeftSemi => Filter(stayUp.reduce(And), newChild) + // In case of left anti join, the join is pushed down when the entire join condition + // is eligible to be pushed down to preserve the semantics of left anti join. + case _ => join + } } else { newChild } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala index 6105771697ba..185568d334ce 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala @@ -279,7 +279,7 @@ class LeftSemiPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } - test("Unary: LeftSemiAnti join pushdown - partial pushdown") { + test("Unary: LeftSemi join pushdown - partial pushdown") { val testRelationWithArrayType = LocalRelation('a.int, 'b.int, 'c_arr.array(IntegerType)) val originalQuery = testRelationWithArrayType .generate(Explode('c_arr), alias = Some("arr"), outputNames = Seq("out_col")) @@ -295,6 +295,16 @@ class LeftSemiPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("Unary: LeftAnti join pushdown - no pushdown") { + val testRelationWithArrayType = LocalRelation('a.int, 'b.int, 'c_arr.array(IntegerType)) + val originalQuery = testRelationWithArrayType + .generate(Explode('c_arr), alias = Some("arr"), outputNames = Seq("out_col")) + .join(testRelation1, joinType = LeftAnti, condition = Some('b === 'd && 'b === 'out_col)) + + val optimized = Optimize.execute(originalQuery.analyze) + comparePlans(optimized, originalQuery.analyze) + } + test("Unary: LeftSemiAnti join pushdown - no pushdown") { val testRelationWithArrayType = LocalRelation('a.int, 'b.int, 'c_arr.array(IntegerType)) val originalQuery = testRelationWithArrayType