From 004a92509ca01639c0edd6bc0737fc004d551f49 Mon Sep 17 00:00:00 2001 From: Tanel Kiis Date: Fri, 4 Sep 2020 13:30:12 +0300 Subject: [PATCH 1/9] Infer filters from EqualNullSafe --- .../plans/logical/QueryPlanConstraints.scala | 6 +-- .../InferFiltersFromConstraintsSuite.scala | 38 ++++++++++++++----- 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala index 4c4ec000d0930..7187720ca0cd9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala @@ -65,13 +65,13 @@ trait ConstraintHelper { // IsNotNull should be constructed by `constructIsNotNullConstraints`. val predicates = constraints.filterNot(_.isInstanceOf[IsNotNull]) predicates.foreach { - case eq @ EqualTo(l: Attribute, r: Attribute) => + case eq @ Equality(l: Attribute, r: Attribute) => val candidateConstraints = predicates - eq inferredConstraints ++= replaceConstraints(candidateConstraints, l, r) inferredConstraints ++= replaceConstraints(candidateConstraints, r, l) - case eq @ EqualTo(l @ Cast(_: Attribute, _, _), r: Attribute) => + case eq @ Equality(l @ Cast(_: Attribute, _, _), r: Attribute) => inferredConstraints ++= replaceConstraints(predicates - eq, r, l) - case eq @ EqualTo(l: Attribute, r @ Cast(_: Attribute, _, _)) => + case eq @ Equality(l: Attribute, r @ Cast(_: Attribute, _, _)) => inferredConstraints ++= replaceConstraints(predicates - eq, l, r) case _ => // No inference } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala index 79bd573f1d84a..f408d6e504bb1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala @@ -29,15 +29,21 @@ import org.apache.spark.sql.types.{IntegerType, LongType} class InferFiltersFromConstraintsSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { - val batches = - Batch("InferAndPushDownFilters", FixedPoint(100), - PushPredicateThroughJoin, - PushPredicateThroughNonJoin, - InferFiltersFromConstraints, - CombineFilters, - SimplifyBinaryComparison, - BooleanSimplification, - PruneFilters) :: Nil + val operatorOptimizationRuleSet = Seq( + PushPredicateThroughJoin, + PushPredicateThroughNonJoin, + CombineFilters, + SimplifyBinaryComparison, + BooleanSimplification, + PruneFilters + ) + + val batches = Batch("Operator Optimization before Inferring Filters", FixedPoint(100), + operatorOptimizationRuleSet: _*) :: + Batch("Infer Filters", Once, + InferFiltersFromConstraints) :: + Batch("Operator Optimization after Inferring Filters", FixedPoint(100), + operatorOptimizationRuleSet: _*) :: Nil } val testRelation = LocalRelation('a.int, 'b.int, 'c.int) @@ -316,4 +322,18 @@ class InferFiltersFromConstraintsSuite extends PlanTest { condition) } } + + test("SPARK-xxxxx: single inner join with EqualNullSafe condition: " + + "filter out values on either side on equi-join keys") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val originalQuery = x.join(y, + condition = Some(("x.a".attr <=> "y.a".attr) && ("x.a".attr > 5))) + .analyze + val left = x.where(IsNotNull('a) && "x.a".attr > 5) + val right = y.where(IsNotNull('a) && "y.a".attr > 5) + val correctAnswer = left.join(right, condition = Some("x.a".attr <=> "y.a".attr)).analyze + val optimized = Optimize.execute(originalQuery) + comparePlans(optimized, correctAnswer) + } } From ec52adde00a4b151b9b11fe24faddfa602c9c45c Mon Sep 17 00:00:00 2001 From: Tanel Kiis Date: Fri, 4 Sep 2020 13:50:39 +0300 Subject: [PATCH 2/9] Typo --- .../catalyst/optimizer/InferFiltersFromConstraintsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala index f408d6e504bb1..acee8be70207e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala @@ -323,7 +323,7 @@ class InferFiltersFromConstraintsSuite extends PlanTest { } } - test("SPARK-xxxxx: single inner join with EqualNullSafe condition: " + + test("SPARK-32801: single inner join with EqualNullSafe condition: " + "filter out values on either side on equi-join keys") { val x = testRelation.subquery('x) val y = testRelation.subquery('y) From eef545d4b8501b4d37df54815adceb1f11366b8f Mon Sep 17 00:00:00 2001 From: Tanel Kiis Date: Fri, 4 Sep 2020 19:46:13 +0300 Subject: [PATCH 3/9] Fix idempotence error --- .../sql/catalyst/optimizer/Optimizer.scala | 2 +- .../plans/logical/QueryPlanConstraints.scala | 28 +++++++++++-------- .../InferFiltersFromConstraintsSuite.scala | 15 ++++++++++ 3 files changed, 32 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index bcdc5cd942e35..9c62ee340451c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -882,7 +882,7 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] } } - private def inferFilters(plan: LogicalPlan): LogicalPlan = plan transform { + private def inferFilters(plan: LogicalPlan): LogicalPlan = plan transformUp { case filter @ Filter(condition, child) => val newFilters = filter.constraints -- (child.constraints ++ splitConjunctivePredicates(condition)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala index 7187720ca0cd9..dd9d20ba10462 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala @@ -62,18 +62,22 @@ trait ConstraintHelper { */ def inferAdditionalConstraints(constraints: Set[Expression]): Set[Expression] = { var inferredConstraints = Set.empty[Expression] - // IsNotNull should be constructed by `constructIsNotNullConstraints`. - val predicates = constraints.filterNot(_.isInstanceOf[IsNotNull]) - predicates.foreach { - case eq @ Equality(l: Attribute, r: Attribute) => - val candidateConstraints = predicates - eq - inferredConstraints ++= replaceConstraints(candidateConstraints, l, r) - inferredConstraints ++= replaceConstraints(candidateConstraints, r, l) - case eq @ Equality(l @ Cast(_: Attribute, _, _), r: Attribute) => - inferredConstraints ++= replaceConstraints(predicates - eq, r, l) - case eq @ Equality(l: Attribute, r @ Cast(_: Attribute, _, _)) => - inferredConstraints ++= replaceConstraints(predicates - eq, l, r) - case _ => // No inference + var prevSize = -1 + while (inferredConstraints.size > prevSize) { + prevSize = inferredConstraints.size + // IsNotNull should be constructed by `constructIsNotNullConstraints`. + val predicates = (constraints ++ inferredConstraints).filterNot(_.isInstanceOf[IsNotNull]) + predicates.foreach { + case eq @ Equality(l: Attribute, r: Attribute) => + val candidateConstraints = predicates - eq + inferredConstraints ++= replaceConstraints(candidateConstraints, l, r) + inferredConstraints ++= replaceConstraints(candidateConstraints, r, l) + case eq @ Equality(l @ Cast(_: Attribute, _, _), r: Attribute) => + inferredConstraints ++= replaceConstraints(predicates - eq, r, l) + case eq @ Equality(l: Attribute, r @ Cast(_: Attribute, _, _)) => + inferredConstraints ++= replaceConstraints(predicates - eq, l, r) + case _ => // No inference + } } inferredConstraints -- constraints } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala index acee8be70207e..4bcf3593c5ecb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala @@ -336,4 +336,19 @@ class InferFiltersFromConstraintsSuite extends PlanTest { val optimized = Optimize.execute(originalQuery) comparePlans(optimized, correctAnswer) } + + test("SPARK-32801: Infer all constraints from a chain of filters") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val originalQuery = x.where("x.a".attr === "x.b".attr).join(y, + condition = Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr)) + .analyze + val left = x.where(IsNotNull('a) && IsNotNull('b) && "x.a".attr === "x.b".attr) + val right = y.where(IsNotNull('a) && IsNotNull('b) && "y.a".attr === "y.b".attr) + val correctAnswer = left.join(right, + condition = Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr)) + .analyze + val optimized = Optimize.execute(originalQuery) + comparePlans(optimized, correctAnswer) + } } From d2ed6839cb58b3103b6bbafaa71483f459479d6a Mon Sep 17 00:00:00 2001 From: Tanel Kiis Date: Sat, 5 Sep 2020 02:11:41 +0300 Subject: [PATCH 4/9] Add constraints to left semi join --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- .../sql/catalyst/plans/logical/basicLogicalOperators.scala | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 209bb5aa2e6b3..296fe86e834e5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -882,7 +882,7 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] } } - private def inferFilters(plan: LogicalPlan): LogicalPlan = plan transformUp { + private def inferFilters(plan: LogicalPlan): LogicalPlan = plan transform { case filter @ Filter(condition, child) => val newFilters = filter.constraints -- (child.constraints ++ splitConjunctivePredicates(condition)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 223ef652d2f80..b9031a083b290 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -354,6 +354,7 @@ case class Join( .union(ExpressionSet(splitConjunctivePredicates(condition.get))) case LeftSemi if condition.isDefined => left.constraints + .union(right.constraints) .union(ExpressionSet(splitConjunctivePredicates(condition.get))) case j: ExistenceJoin => left.constraints From 04db815365b9450a36f0ea288af529c921a8c6b0 Mon Sep 17 00:00:00 2001 From: Tanel Kiis Date: Sat, 5 Sep 2020 08:13:20 +0300 Subject: [PATCH 5/9] Exclude non-deterministic filters --- .../sql/catalyst/plans/logical/QueryPlanConstraints.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala index 695d6710de82a..8919d19fc972c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala @@ -63,8 +63,11 @@ trait ConstraintHelper { var prevSize = -1 while (inferredConstraints.size > prevSize) { prevSize = inferredConstraints.size - // IsNotNull should be constructed by `constructIsNotNullConstraints`. - val predicates = (constraints ++ inferredConstraints).filterNot(_.isInstanceOf[IsNotNull]) + val predicates = (constraints ++ inferredConstraints) + // IsNotNull should be constructed by `constructIsNotNullConstraints`. + .filterNot(_.isInstanceOf[IsNotNull]) + // Non deterministic expressions are all not equal and would cause OOM + .filter(_.deterministic) predicates.foreach { case eq @ Equality(l: Attribute, r: Attribute) => val candidateConstraints = predicates - eq From 198d15d5fe3b12eaf34191a435153df83752e8ff Mon Sep 17 00:00:00 2001 From: Tanel Kiis Date: Sat, 5 Sep 2020 10:49:03 +0300 Subject: [PATCH 6/9] UT for left semi join --- .../InferFiltersFromConstraintsSuite.scala | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala index 4bcf3593c5ecb..d69f0c17a2167 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala @@ -351,4 +351,23 @@ class InferFiltersFromConstraintsSuite extends PlanTest { val optimized = Optimize.execute(originalQuery) comparePlans(optimized, correctAnswer) } + + test("SPARK-32801: Infer from right side of left semi join") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val z = testRelation.subquery('z) + val originalQuery = x.join(y.join(z.where("z.a".attr > 1), + condition = Some("y.a".attr === "z.a".attr), joinType = LeftSemi), + condition = Some("x.a".attr === "y.a".attr)) + .analyze + val correctX = x.where(IsNotNull('a) && "x.a".attr > 1) + val correctY = y.where(IsNotNull('a) && "y.a".attr > 1) + val correctZ = z.where(IsNotNull('a) && "z.a".attr > 1) + val correctAnswer = correctX.join(correctY.join(correctZ, + condition = Some("y.a".attr === "z.a".attr), joinType = LeftSemi), + condition = Some("x.a".attr === "y.a".attr)) + .analyze + val optimized = Optimize.execute(originalQuery) + comparePlans(optimized, correctAnswer) + } } From c9d3dc1ff0339bd7326a30cef0f90f6e6d188d2a Mon Sep 17 00:00:00 2001 From: Tanel Kiis Date: Sat, 5 Sep 2020 16:35:44 +0300 Subject: [PATCH 7/9] scalafmt --- .../InferFiltersFromConstraintsSuite.scala | 68 +++++++++++-------- 1 file changed, 38 insertions(+), 30 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala index d69f0c17a2167..dbf5817fddb70 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala @@ -35,14 +35,16 @@ class InferFiltersFromConstraintsSuite extends PlanTest { CombineFilters, SimplifyBinaryComparison, BooleanSimplification, - PruneFilters - ) - - val batches = Batch("Operator Optimization before Inferring Filters", FixedPoint(100), - operatorOptimizationRuleSet: _*) :: - Batch("Infer Filters", Once, - InferFiltersFromConstraints) :: - Batch("Operator Optimization after Inferring Filters", FixedPoint(100), + PruneFilters) + + val batches = Batch( + "Operator Optimization before Inferring Filters", + FixedPoint(100), + operatorOptimizationRuleSet: _*) :: + Batch("Infer Filters", Once, InferFiltersFromConstraints) :: + Batch( + "Operator Optimization after Inferring Filters", + FixedPoint(100), operatorOptimizationRuleSet: _*) :: Nil } @@ -323,30 +325,31 @@ class InferFiltersFromConstraintsSuite extends PlanTest { } } - test("SPARK-32801: single inner join with EqualNullSafe condition: " + - "filter out values on either side on equi-join keys") { - val x = testRelation.subquery('x) - val y = testRelation.subquery('y) - val originalQuery = x.join(y, - condition = Some(("x.a".attr <=> "y.a".attr) && ("x.a".attr > 5))) - .analyze - val left = x.where(IsNotNull('a) && "x.a".attr > 5) - val right = y.where(IsNotNull('a) && "y.a".attr > 5) - val correctAnswer = left.join(right, condition = Some("x.a".attr <=> "y.a".attr)).analyze - val optimized = Optimize.execute(originalQuery) - comparePlans(optimized, correctAnswer) + test( + "SPARK-32801: single inner join with EqualNullSafe condition: " + + "filter out values on either side on equi-join keys") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val originalQuery = + x.join(y, condition = Some(("x.a".attr <=> "y.a".attr) && ("x.a".attr > 5))).analyze + val left = x.where(IsNotNull('a) && "x.a".attr > 5) + val right = y.where(IsNotNull('a) && "y.a".attr > 5) + val correctAnswer = left.join(right, condition = Some("x.a".attr <=> "y.a".attr)).analyze + val optimized = Optimize.execute(originalQuery) + comparePlans(optimized, correctAnswer) } test("SPARK-32801: Infer all constraints from a chain of filters") { val x = testRelation.subquery('x) val y = testRelation.subquery('y) - val originalQuery = x.where("x.a".attr === "x.b".attr).join(y, - condition = Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr)) + val originalQuery = x + .where("x.a".attr === "x.b".attr) + .join(y, condition = Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr)) .analyze val left = x.where(IsNotNull('a) && IsNotNull('b) && "x.a".attr === "x.b".attr) val right = y.where(IsNotNull('a) && IsNotNull('b) && "y.a".attr === "y.b".attr) - val correctAnswer = left.join(right, - condition = Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr)) + val correctAnswer = left + .join(right, condition = Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr)) .analyze val optimized = Optimize.execute(originalQuery) comparePlans(optimized, correctAnswer) @@ -356,16 +359,21 @@ class InferFiltersFromConstraintsSuite extends PlanTest { val x = testRelation.subquery('x) val y = testRelation.subquery('y) val z = testRelation.subquery('z) - val originalQuery = x.join(y.join(z.where("z.a".attr > 1), - condition = Some("y.a".attr === "z.a".attr), joinType = LeftSemi), - condition = Some("x.a".attr === "y.a".attr)) + val originalQuery = x + .join( + y.join( + z.where("z.a".attr > 1), + condition = Some("y.a".attr === "z.a".attr), + joinType = LeftSemi), + condition = Some("x.a".attr === "y.a".attr)) .analyze val correctX = x.where(IsNotNull('a) && "x.a".attr > 1) val correctY = y.where(IsNotNull('a) && "y.a".attr > 1) val correctZ = z.where(IsNotNull('a) && "z.a".attr > 1) - val correctAnswer = correctX.join(correctY.join(correctZ, - condition = Some("y.a".attr === "z.a".attr), joinType = LeftSemi), - condition = Some("x.a".attr === "y.a".attr)) + val correctAnswer = correctX + .join( + correctY.join(correctZ, condition = Some("y.a".attr === "z.a".attr), joinType = LeftSemi), + condition = Some("x.a".attr === "y.a".attr)) .analyze val optimized = Optimize.execute(originalQuery) comparePlans(optimized, correctAnswer) From 0148cd10bcf50cc417c865b0295363e62c11c3fc Mon Sep 17 00:00:00 2001 From: "tanel.kiis@gmail.com" Date: Sat, 5 Sep 2020 23:54:14 +0300 Subject: [PATCH 8/9] Simplify optimizer --- .../optimizer/InferFiltersFromConstraintsSuite.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala index dbf5817fddb70..3fc0cc9d1b42e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala @@ -30,11 +30,9 @@ class InferFiltersFromConstraintsSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val operatorOptimizationRuleSet = Seq( - PushPredicateThroughJoin, - PushPredicateThroughNonJoin, - CombineFilters, - SimplifyBinaryComparison, + PushDownPredicates, BooleanSimplification, + SimplifyBinaryComparison, PruneFilters) val batches = Batch( From 896be089e239ccd2d5232e66b3dc6d1a931765e3 Mon Sep 17 00:00:00 2001 From: "tanel.kiis@gmail.com" Date: Sun, 6 Sep 2020 18:39:53 +0300 Subject: [PATCH 9/9] UT for non-deterministic filters --- .../InferFiltersFromConstraintsSuite.scala | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala index 3fc0cc9d1b42e..4265d34074e9c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala @@ -324,7 +324,7 @@ class InferFiltersFromConstraintsSuite extends PlanTest { } test( - "SPARK-32801: single inner join with EqualNullSafe condition: " + + "SPARK-32801: Single inner join with EqualNullSafe condition: " + "filter out values on either side on equi-join keys") { val x = testRelation.subquery('x) val y = testRelation.subquery('y) @@ -376,4 +376,21 @@ class InferFiltersFromConstraintsSuite extends PlanTest { val optimized = Optimize.execute(originalQuery) comparePlans(optimized, correctAnswer) } + + test("SPARK-32801: Non-deterministic filters do not introduce an infinite loop") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val originalQuery = x + .join(y, condition = Some("x.a".attr === "y.a".attr)) + .where(rand(0) === "x.a".attr) + .analyze + val left = x.where(IsNotNull('a)) + val right = y.where(IsNotNull('a)) + val correctAnswer = left + .join(right, condition = Some("x.a".attr === "y.a".attr)) + .where(rand(0) === "x.a".attr) + .analyze + val optimized = Optimize.execute(originalQuery) + comparePlans(optimized, correctAnswer) + } }