From adef5c913fc1981bd0fc2fc2b55cd3350859d551 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Fri, 4 Feb 2022 11:17:20 -0800 Subject: [PATCH 1/2] [SPARK-36665][SQL][FOLLOWUP] Avoid Optimizing Not(InSubquery) --- .../sql/catalyst/optimizer/expressions.scala | 21 +++++--- .../optimizer/NotPropagationSuite.scala | 49 +++++++++++++++++++ 2 files changed, 64 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index eda4217cd957..3503f77f9389 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -466,9 +466,18 @@ object NotPropagation extends Rule[LogicalPlan] { case _ => false } + private def hasInSubquery(x: Expression): Boolean = x match { + case _: InSubquery => true + case Not(e) => hasInSubquery(e) + case _ => false + } + def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning( _.containsPattern(NOT), ruleId) { case q: LogicalPlan => q.transformExpressionsDownWithPruning(_.containsPattern(NOT), ruleId) { + // [SPARK-36665][SPARK-38085] Do not simplify Not(InSubquery) + case e @ Equality(a, b) if hasInSubquery(a) || hasInSubquery(b) => e + // Move `Not` from one side of `EqualTo`/`EqualNullSafe` to the other side if it's beneficial. // E.g. `EqualTo(Not(a), b)` where `b = Not(c)`, it will become // `EqualTo(a, Not(b))` => `EqualTo(a, Not(Not(c)))` => `EqualTo(a, c)` @@ -483,12 +492,12 @@ object NotPropagation extends Rule[LogicalPlan] { // Push `Not` to one side of `EqualTo`/`EqualNullSafe` if it's beneficial. // E.g. Not(EqualTo(x, false)) => EqualTo(x, true) - case Not(EqualTo(a, b)) if canSimplifyNot(b) => EqualTo(a, Not(b)) - case Not(EqualTo(a, b)) if canSimplifyNot(a) => EqualTo(Not(a), b) - case Not(EqualNullSafe(a, b)) if !a.nullable && !b.nullable && canSimplifyNot(b) => - EqualNullSafe(a, Not(b)) - case Not(EqualNullSafe(a, b)) if !a.nullable && !b.nullable && canSimplifyNot(a) => - EqualNullSafe(Not(a), b) + case Not(EqualTo(a, b)) if canSimplifyNot(b) && !hasInSubquery(b) => EqualTo(a, Not(b)) + case Not(EqualTo(a, b)) if canSimplifyNot(a) && !hasInSubquery(a) => EqualTo(Not(a), b) + case Not(EqualNullSafe(a, b)) if !a.nullable && !b.nullable && canSimplifyNot(b) && + !hasInSubquery(b) => EqualNullSafe(a, Not(b)) + case Not(EqualNullSafe(a, b)) if !a.nullable && !b.nullable && canSimplifyNot(a) && + !hasInSubquery(a) => EqualNullSafe(Not(a), b) } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NotPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NotPropagationSuite.scala index d9506098b1d0..30d5dfa8b8a1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NotPropagationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NotPropagationSuite.scala @@ -173,4 +173,53 @@ class NotPropagationSuite extends PlanTest with ExpressionEvalHelper { checkCondition(('a === 'b) =!= ('a === 'c), ('a === 'b) =!= ('a === 'c)) checkCondition(('a === 'b) =!= ('c in(1, 2, 3)), ('a === 'b) =!= ('c in(1, 2, 3))) } + + test("[SPARK-36665] Do not simplify Not(InSubquery)") { + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = Batch("AnalysisNodes", Once, EliminateSubqueryAliases) :: + Batch("Not Propagation", FixedPoint(50), NotPropagation, BooleanSimplification) :: Nil + } + + // As checkCondition() changes the exprId of sub-queries, defining a stable checker here + def shouldNotOptimize(input: Expression): Unit = { + val plan = testRelation.where(input).analyze + val actual = Optimize.execute(plan) + assert(actual == plan) + } + + // Check whether the input becomes a BinaryComparison and its one side matches the expected + def checkOneSide(input: Expression, expected: Expression): Unit = { + val plan = testRelation.where(input).analyze + val actual = Optimize.execute(plan) + val comp = actual.asInstanceOf[Filter].condition.asInstanceOf[BinaryComparison] + val side = testRelation.where(expected).analyze.asInstanceOf[Filter].condition + assert(comp.left == side || comp.right == side) + } + + val inSubquery = InSubquery(Seq('a), ListQuery(testRelation.select("a"))) + shouldNotOptimize(Not(inSubquery) === Literal(true)) + shouldNotOptimize(Literal(true) === Not(inSubquery)) + shouldNotOptimize(Not(inSubquery) <=> Literal(true)) + shouldNotOptimize(Literal(true) <=> Not(inSubquery)) + + shouldNotOptimize(Not(inSubquery) === Not('a === 'b)) + shouldNotOptimize(Not('a === 'b) === Not(inSubquery)) + shouldNotOptimize(Not(inSubquery) <=> Not('a === 'b)) + shouldNotOptimize(Not('a === 'b) <=> Not(inSubquery)) + + shouldNotOptimize(Not(Not(inSubquery) === ('a === 'b))) + shouldNotOptimize(Not(('a === 'b) === Not(inSubquery))) + shouldNotOptimize(Not(Not(inSubquery) <=> ('a === 'b))) + shouldNotOptimize(Not(('a === 'b) <=> Not(inSubquery))) + + checkOneSide(Not(Not(inSubquery) === Literal(true)), Literal(false)) + checkOneSide(Not(Literal(true) === Not(inSubquery)), Literal(false)) + shouldNotOptimize(Not(Not(inSubquery) <=> Literal(true))) + shouldNotOptimize(Not(Literal(true) <=> Not(inSubquery))) + + checkOneSide(Not(Not(inSubquery)) === Not('a === 'b), Not('a === 'b)) + checkOneSide(Not('a === 'b) === Not(Not(inSubquery)), Not('a === 'b)) + checkOneSide(Not(Not(inSubquery)) <=> Not('a === 'b), Not('a === 'b)) + checkOneSide(Not('a === 'b) <=> Not(Not(inSubquery)), Not('a === 'b)) + } } From 26da7fd657bc2fa016710553601fcab92a4d4655 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Sat, 5 Feb 2022 00:02:56 -0800 Subject: [PATCH 2/2] address review comments --- .../optimizer/NotPropagationSuite.scala | 2 +- .../sql/NotPropagationEndToEndSuite.scala | 52 +++++++++++++++++++ 2 files changed, 53 insertions(+), 1 deletion(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/NotPropagationEndToEndSuite.scala diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NotPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NotPropagationSuite.scala index 30d5dfa8b8a1..595e01f0261b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NotPropagationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NotPropagationSuite.scala @@ -174,7 +174,7 @@ class NotPropagationSuite extends PlanTest with ExpressionEvalHelper { checkCondition(('a === 'b) =!= ('c in(1, 2, 3)), ('a === 'b) =!= ('c in(1, 2, 3))) } - test("[SPARK-36665] Do not simplify Not(InSubquery)") { + test("SPARK-36665: Do not simplify Not(InSubquery)") { object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("AnalysisNodes", Once, EliminateSubqueryAliases) :: Batch("Not Propagation", FixedPoint(50), NotPropagation, BooleanSimplification) :: Nil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/NotPropagationEndToEndSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/NotPropagationEndToEndSuite.scala new file mode 100644 index 000000000000..9e4211435444 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/NotPropagationEndToEndSuite.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.test.SharedSparkSession + +class NotPropagationEndToEndSuite extends QueryTest with SharedSparkSession { + import testImplicits._ + + val t = "test_table" + + test("SPARK-36665: Do not simplify Not(InSubquery)") { + withTable(t) { + Seq[(Integer, Integer)]( + (1, 1), + (2, 2), + (3, 3), + (4, null), + (null, 0)) + .toDF("c1", "c2").write.saveAsTable(t) + val df = spark.table(t) + + checkAnswer(df.where(s"(c1 NOT IN (SELECT c2 FROM $t)) = true"), Seq.empty) + checkAnswer(df.where(s"(c1 NOT IN (SELECT c2 FROM $t WHERE c2 IS NOT NULL)) = true"), + Row(4, null) :: Nil) + checkAnswer(df.where(s"(c1 NOT IN (SELECT c2 FROM $t)) <=> true"), Seq.empty) + checkAnswer(df.where(s"(c1 NOT IN (SELECT c2 FROM $t WHERE c2 IS NOT NULL)) <=> true"), + Row(4, null) :: Nil) + checkAnswer(df.where(s"(c1 NOT IN (SELECT c2 FROM $t)) != false"), Seq.empty) + checkAnswer(df.where(s"(c1 NOT IN (SELECT c2 FROM $t WHERE c2 IS NOT NULL)) != false"), + Row(4, null) :: Nil) + checkAnswer(df.where(s"NOT((c1 NOT IN (SELECT c2 FROM $t)) <=> false)"), Seq.empty) + checkAnswer(df.where(s"NOT((c1 NOT IN (SELECT c2 FROM $t WHERE c2 IS NOT NULL)) <=> false)"), + Row(4, null) :: Nil) + } + } +}