From 0afcdd3cde2a22f54df9840c21e2fecdb56dd9ea Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Fri, 14 Aug 2015 01:48:11 +0800 Subject: [PATCH 1/5] CNF transformation --- .../sql/catalyst/expressions/predicates.scala | 24 ++++ .../sql/catalyst/planning/patterns.scala | 3 +- .../expressions/CNFNormalizationSuite.scala | 120 ++++++++++++++++++ 3 files changed, 146 insertions(+), 1 deletion(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CNFNormalizationSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index 304b438c84ba..9a1788e8a667 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -57,6 +57,30 @@ trait PredicateHelper { } } + /** + * Convert an expression into its conjunctive normal form (CNF), i.e. AND of ORs. + * For example, a && b || c is normalized to (a || c) && (b || c) by this method. + * + * Refer to https://en.wikipedia.org/wiki/Conjunctive_normal_form for more information + */ + protected def cnfNormalization(condition: Expression): Expression = { + condition.transformUp { + case or: Or => pushOrToBottom(or) + } + } + + private def pushOrToBottom(condition: Expression): Expression = { + condition match { + case Or(left, right) => (left, right) match { + case (And(ll, lr), r) => + And(pushOrToBottom(Or(ll, r)), pushOrToBottom(Or(lr, r))) + case (l, And(rl, rr)) => + And(pushOrToBottom(Or(l, rl)), pushOrToBottom(Or(l, rr))) + case (l, r) => Or(l, r) + } + } + } + protected def splitDisjunctivePredicates(condition: Expression): Seq[Expression] = { condition match { case Or(cond1, cond2) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index cd3f15cbe107..c884e648f915 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -62,7 +62,8 @@ object PhysicalOperation extends PredicateHelper { case Filter(condition, child) if condition.deterministic => val (fields, filters, other, aliases) = collectProjectsAndFilters(child) val substitutedCondition = substitute(aliases)(condition) - (fields, filters ++ splitConjunctivePredicates(substitutedCondition), other, aliases) + (fields, filters ++ splitConjunctivePredicates(cnfNormalization(substitutedCondition)), + other, aliases) case other => (None, Nil, other, Map.empty) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CNFNormalizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CNFNormalizationSuite.scala new file mode 100644 index 000000000000..3f16bfcf8012 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CNFNormalizationSuite.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.dsl.expressions._ + +class CNFNormalizationSuite extends SparkFunSuite with PredicateHelper { + + val testRelation = LocalRelation('a.int, 'b.int, 'c.int, 'd.int, 'e.int) + + // Take in expression without And, normalize to its leftmost representation to be comparable + private def normalizeOrExpression(expression: Expression): Expression = { + val atoms = ArrayBuffer.empty[Expression] + expression.foreachUp { + case Or(l: Or, r: Or) => // do nothing + case Or(l, r: Or) => atoms += l + case Or(l: Or, r) => atoms += r + case Or(l, r) => atoms += l; atoms += r + case _ => // do nothing + } + if (!expression.isInstanceOf[Or]) { + atoms += expression + } + atoms.sortBy(_.toString).reduce(Or) + } + + private def checkCondition(input: Expression, expected: Expression): Unit = { + val actual = testRelation.where(input).analyze + val correctAnswer = testRelation.where(expected).analyze + + val resultFilterExpression = actual.collectFirst { case f: Filter => f.condition }.get + val expectedFilterExpression = correctAnswer.collectFirst { case f: Filter => f.condition }.get + + val exprs = splitConjunctivePredicates(cnfNormalization(resultFilterExpression)). + map(normalizeOrExpression).sortBy(_.toString) + val expectedExprs = splitConjunctivePredicates(cnfNormalization(expectedFilterExpression)). + map(normalizeOrExpression).sortBy(_.toString) + + assert(exprs === expectedExprs) + } + + private val a = Literal(1) < 'a + private val b = Literal(1) < 'b + private val c = Literal(1) < 'c + private val d = Literal(1) < 'd + private val e = Literal(1) < 'e + private val f = ! a + + test("a || b => a || b") { + checkCondition(a || b, a || b) + } + + test("a && b && c => a && b && c") { + checkCondition(a && b && c, a && b && c) + } + + test("a && b || c => (a || c) && (b || c)") { + checkCondition(a && b || c, (a || c) && (b || c)) + } + + test("a && b || f => (a || f) && (b || f)") { + checkCondition(a && b || f, (a || f) && (b || f)) + } + + test("(a && b) || (c && d) => (c || a) && (c || b) && ((d || a) && (d || b))") { + checkCondition((a && b) || (c && d), (a || c) && (b || c) && (a || d) && (b || d)) + } + + test("a || b || c && d => (a || b || c) && (a || b || d)") { + checkCondition(a || b || c && d, (a || b || c) && (a || b || d)) + } + + test("a || (b && c || d) => (a || b || d) && (a || c || d)") { + checkCondition(a || (b && c || d), (a || b || d) && (a || c || d)) + } + + test("a && (b && c || d && e) => a && (b || d) && (c || d) && (b || e) && (c || e)") { + val input = a && (b && c || d && e) + val expected = a && (b || d) && (c || d) && (b || e) && (c || e) + checkCondition(input, expected) + } + + test( + "a || (b && c || d && e) => (a || b || d) && (a || c || d) && (a || b || e) && (a || c || e)") { + val input = a || (b && c || d && e) + val expected = (a || b || d) && (a || c || d) && (a || b || e) && (a || c || e) + checkCondition(input, expected) + } + + test( + "a && b && c || d && e && f => " + + "(a || d) && (a || e) && (a || f) && (b || d) && " + + "(b || e) && (b || f) && (c || d) && (c || e) && (c || f)") { + val input = a && b && c || d && e && f + val expected = (a || d) && (a || e) && (a || f) && + (b || d) && (b || e) && (b || f) && + (c || d) && (c || e) && (c || f) + checkCondition(input, expected) + } +} From 322f58a0ccfe600fb7abcb35e9bdfa9958cfc4c4 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Sat, 15 Aug 2015 02:01:59 +0800 Subject: [PATCH 2/5] address comments --- .../sql/catalyst/expressions/predicates.scala | 23 +++---- .../sql/catalyst/optimizer/Optimizer.scala | 4 ++ .../expressions/CNFNormalizationSuite.scala | 68 +++++++++++++++---- 3 files changed, 69 insertions(+), 26 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index 9a1788e8a667..7f6fea090cd6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -64,20 +64,19 @@ trait PredicateHelper { * Refer to https://en.wikipedia.org/wiki/Conjunctive_normal_form for more information */ protected def cnfNormalization(condition: Expression): Expression = { - condition.transformUp { - case or: Or => pushOrToBottom(or) + // reverse Or with its And child to eliminate (And under Or) occurrence + def normalize(condition: Expression): Expression = { + condition match { + case Or(And(innerLhs, innerRhs), rhs) => + And(normalize(Or(innerLhs, rhs)), normalize(Or(innerRhs, rhs))) + case Or(lhs, And(innerLhs, innerRhs)) => + And(normalize(Or(lhs, innerLhs)), normalize(Or(lhs, innerRhs))) + case _ => condition + } } - } - private def pushOrToBottom(condition: Expression): Expression = { - condition match { - case Or(left, right) => (left, right) match { - case (And(ll, lr), r) => - And(pushOrToBottom(Or(ll, r)), pushOrToBottom(Or(lr, r))) - case (l, And(rl, rr)) => - And(pushOrToBottom(Or(l, rl)), pushOrToBottom(Or(l, rr))) - case (l, r) => Or(l, r) - } + condition.transformUp { + case or: Or => normalize(or) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index f6088695a927..9b34d1bd487c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -573,6 +573,10 @@ object BooleanSimplification extends Rule[LogicalPlan] with PredicateHelper { case And(l, r) => Or(Not(l), Not(r)) // not(not(e)) => e case Not(e) => e + // De Morgan's law: !(a || b) => !a && !b + case Or(lhs, rhs) => And(Not(lhs), Not(rhs)) + // De Morgan's law: !(a && b) => !a || !b + case And(lhs, rhs) => Or(Not(lhs), Not(rhs)) case _ => not } // end of Not(exp) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CNFNormalizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CNFNormalizationSuite.scala index 3f16bfcf8012..4f25b4e9f211 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CNFNormalizationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CNFNormalizationSuite.scala @@ -20,41 +20,54 @@ package org.apache.spark.sql.catalyst.expressions import scala.collection.mutable.ArrayBuffer import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.optimizer._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.RuleExecutor class CNFNormalizationSuite extends SparkFunSuite with PredicateHelper { + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = + Batch("AnalysisNodes", Once, + EliminateSubQueries) :: + Batch("Constant Folding", FixedPoint(50), + NullPropagation, + ConstantFolding, + BooleanSimplification, + SimplifyFilters) :: Nil + } + val testRelation = LocalRelation('a.int, 'b.int, 'c.int, 'd.int, 'e.int) // Take in expression without And, normalize to its leftmost representation to be comparable private def normalizeOrExpression(expression: Expression): Expression = { - val atoms = ArrayBuffer.empty[Expression] + val elements = ArrayBuffer.empty[Expression] expression.foreachUp { - case Or(l: Or, r: Or) => // do nothing - case Or(l, r: Or) => atoms += l - case Or(l: Or, r) => atoms += r - case Or(l, r) => atoms += l; atoms += r + case Or(lhs, rhs) => + if (!lhs.isInstanceOf[Or]) elements += lhs + if (!rhs.isInstanceOf[Or]) elements += rhs case _ => // do nothing } if (!expression.isInstanceOf[Or]) { - atoms += expression + elements += expression } - atoms.sortBy(_.toString).reduce(Or) + elements.sortBy(_.toString).reduce(Or) } private def checkCondition(input: Expression, expected: Expression): Unit = { - val actual = testRelation.where(input).analyze - val correctAnswer = testRelation.where(expected).analyze + val actual = Optimize.execute(testRelation.where(input).analyze) + val correctAnswer = Optimize.execute(testRelation.where(expected).analyze) val resultFilterExpression = actual.collectFirst { case f: Filter => f.condition }.get val expectedFilterExpression = correctAnswer.collectFirst { case f: Filter => f.condition }.get - val exprs = splitConjunctivePredicates(cnfNormalization(resultFilterExpression)). - map(normalizeOrExpression).sortBy(_.toString) - val expectedExprs = splitConjunctivePredicates(cnfNormalization(expectedFilterExpression)). - map(normalizeOrExpression).sortBy(_.toString) + val exprs = splitConjunctivePredicates(cnfNormalization(resultFilterExpression)) + .map(normalizeOrExpression).sortBy(_.toString) + val expectedExprs = splitConjunctivePredicates(cnfNormalization(expectedFilterExpression)) + .map(normalizeOrExpression).sortBy(_.toString) assert(exprs === expectedExprs) } @@ -74,6 +87,10 @@ class CNFNormalizationSuite extends SparkFunSuite with PredicateHelper { checkCondition(a && b && c, a && b && c) } + test("a && !(b || c) => a && !b && !c") { + checkCondition(a && !(b || c), a && !b && !c) + } + test("a && b || c => (a || c) && (b || c)") { checkCondition(a && b || c, (a || c) && (b || c)) } @@ -86,6 +103,10 @@ class CNFNormalizationSuite extends SparkFunSuite with PredicateHelper { checkCondition((a && b) || (c && d), (a || c) && (b || c) && (a || d) && (b || d)) } + test("(a && b) || !(c && d) => (a || !c || !d) && (b || !c || !d)") { + checkCondition((a && b) || !(c && d), (a || !c || !d) && (b || !c || !d)) + } + test("a || b || c && d => (a || b || c) && (a || b || d)") { checkCondition(a || b || c && d, (a || b || c) && (a || b || d)) } @@ -94,12 +115,20 @@ class CNFNormalizationSuite extends SparkFunSuite with PredicateHelper { checkCondition(a || (b && c || d), (a || b || d) && (a || c || d)) } + test("a || !(b && c || d) => (a || !b || !c) && (a || !d)") { + checkCondition(a || !(b && c || d), (a || !b || !c) && (a || !d)) + } + test("a && (b && c || d && e) => a && (b || d) && (c || d) && (b || e) && (c || e)") { val input = a && (b && c || d && e) val expected = a && (b || d) && (c || d) && (b || e) && (c || e) checkCondition(input, expected) } + test("a && !(b && c || d && e) => a && (!b || !c) && (!d || !e)") { + checkCondition(a && !(b && c || d && e), a && (!b || !c) && (!d || !e)) + } + test( "a || (b && c || d && e) => (a || b || d) && (a || c || d) && (a || b || e) && (a || c || e)") { val input = a || (b && c || d && e) @@ -107,6 +136,17 @@ class CNFNormalizationSuite extends SparkFunSuite with PredicateHelper { checkCondition(input, expected) } + test( + "a || !(b && c || d && e) => (a || !b || !c) && (a || !d || !e)") { + checkCondition(a || !(b && c || d && e), (a || !b || !c) && (a || !d || !e)) + } + + test("a && b && c || !(d && e) => (a || !d || !e) && (b || !d || !e) && (c || !d || !e)") { + val input = a && b && c || !(d && e) + val expected = (a || !d || !e) && (b || !d || !e) && (c || !d || !e) + checkCondition(input, expected) + } + test( "a && b && c || d && e && f => " + "(a || d) && (a || e) && (a || f) && (b || d) && " + From 125e97be4abb223d1f26629bc89f8c5436bf50d7 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Mon, 14 Sep 2015 19:08:27 +0800 Subject: [PATCH 3/5] merge apache-spark/master --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 9b34d1bd487c..f6088695a927 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -573,10 +573,6 @@ object BooleanSimplification extends Rule[LogicalPlan] with PredicateHelper { case And(l, r) => Or(Not(l), Not(r)) // not(not(e)) => e case Not(e) => e - // De Morgan's law: !(a || b) => !a && !b - case Or(lhs, rhs) => And(Not(lhs), Not(rhs)) - // De Morgan's law: !(a && b) => !a || !b - case And(lhs, rhs) => Or(Not(lhs), Not(rhs)) case _ => not } // end of Not(exp) From 7eebf6d058d0cd9eda271d98380c5ad72e6b334b Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Mon, 21 Dec 2015 18:21:35 +0800 Subject: [PATCH 4/5] make CNFNormalizaiton as a general optimization --- .../sql/catalyst/expressions/predicates.scala | 23 ------------ .../sql/catalyst/optimizer/Optimizer.scala | 37 +++++++++++++++---- .../sql/catalyst/planning/patterns.scala | 3 +- .../expressions/CNFNormalizationSuite.scala | 5 ++- .../BooleanSimplificationSuite.scala | 15 -------- 5 files changed, 33 insertions(+), 50 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index 7f6fea090cd6..304b438c84ba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -57,29 +57,6 @@ trait PredicateHelper { } } - /** - * Convert an expression into its conjunctive normal form (CNF), i.e. AND of ORs. - * For example, a && b || c is normalized to (a || c) && (b || c) by this method. - * - * Refer to https://en.wikipedia.org/wiki/Conjunctive_normal_form for more information - */ - protected def cnfNormalization(condition: Expression): Expression = { - // reverse Or with its And child to eliminate (And under Or) occurrence - def normalize(condition: Expression): Expression = { - condition match { - case Or(And(innerLhs, innerRhs), rhs) => - And(normalize(Or(innerLhs, rhs)), normalize(Or(innerRhs, rhs))) - case Or(lhs, And(innerLhs, innerRhs)) => - And(normalize(Or(lhs, innerLhs)), normalize(Or(lhs, innerRhs))) - case _ => condition - } - } - - condition.transformUp { - case or: Or => normalize(or) - } - } - protected def splitDisjunctivePredicates(condition: Expression): Seq[Expression] = { condition match { case Or(cond1, cond2) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index f6088695a927..acd02e7de063 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -58,6 +58,7 @@ object DefaultOptimizer extends Optimizer { ConstantFolding, LikeSimplification, BooleanSimplification, + CNFNormalization, RemoveDispensableExpressions, SimplifyFilters, SimplifyCasts, @@ -463,6 +464,27 @@ object OptimizeIn extends Rule[LogicalPlan] { } } +/** + * Convert an expression into its conjunctive normal form (CNF), i.e. AND of ORs. + * For example, a && b || c is normalized to (a || c) && (b || c) by this method. + * + * Refer to https://en.wikipedia.org/wiki/Conjunctive_normal_form for more information + */ +object CNFNormalization extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case q: LogicalPlan => q transformExpressionsUp { + case or @ Or(left, right) => (left, right) match { + // reverse Or with its And child to eliminate (And under Or) occurrence + case (And(innerLhs, innerRhs), rhs) => + And(Or(innerLhs, rhs), Or(innerRhs, rhs)) + case (lhs, And(innerLhs, innerRhs)) => + And(Or(lhs, innerLhs), Or(lhs, innerRhs)) + case _ => or + } + } + } +} + /** * Simplifies boolean expressions: * 1. Simplifies expressions whose answer can be determined without evaluating both sides. @@ -489,13 +511,13 @@ object BooleanSimplification extends Rule[LogicalPlan] with PredicateHelper { case (l, Or(r, l1)) if (Not(l) == l1) => And(l, r) case (Or(l, l1), r) if (l1 == Not(r)) => And(l, r) case (Or(l1, l), r) if (l1 == Not(r)) => And(l, r) - // (a || b) && (a || c) => a || (b && c) + // (a || b) && (a || b || c) => a || b case _ => // 1. Split left and right to get the disjunctive predicates, - // i.e. lhs = (a, b), rhs = (a, c) - // 2. Find the common predict between lhsSet and rhsSet, i.e. common = (a) - // 3. Remove common predict from lhsSet and rhsSet, i.e. ldiff = (b), rdiff = (c) - // 4. Apply the formula, get the optimized predicate: common || (ldiff && rdiff) + // i.e. lhs = (a, b), rhs = (a, b, c) + // 2. Find the common predict between lhsSet and rhsSet, i.e. common = (a, b) + // 3. If lhsSet or rhsSet contains only the common part, i.e. ldiff = () + // apply the formula, get the optimized predicate: common val lhs = splitDisjunctivePredicates(left) val rhs = splitDisjunctivePredicates(right) val common = lhs.filter(e => rhs.exists(e.semanticEquals(_))) @@ -509,9 +531,8 @@ object BooleanSimplification extends Rule[LogicalPlan] with PredicateHelper { // (a || b || c || ...) && (a || b) => (a || b) common.reduce(Or) } else { - // (a || b || c || ...) && (a || b || d || ...) => - // ((c || ...) && (d || ...)) || a || b - (common :+ And(ldiff.reduce(Or), rdiff.reduce(Or))).reduce(Or) + // both hand sides contain remaining parts, we cannot do simplification here + and } } } // end of And(left, right) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index c884e648f915..cd3f15cbe107 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -62,8 +62,7 @@ object PhysicalOperation extends PredicateHelper { case Filter(condition, child) if condition.deterministic => val (fields, filters, other, aliases) = collectProjectsAndFilters(child) val substitutedCondition = substitute(aliases)(condition) - (fields, filters ++ splitConjunctivePredicates(cnfNormalization(substitutedCondition)), - other, aliases) + (fields, filters ++ splitConjunctivePredicates(substitutedCondition), other, aliases) case other => (None, Nil, other, Map.empty) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CNFNormalizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CNFNormalizationSuite.scala index 4f25b4e9f211..caab60c39228 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CNFNormalizationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CNFNormalizationSuite.scala @@ -37,6 +37,7 @@ class CNFNormalizationSuite extends SparkFunSuite with PredicateHelper { NullPropagation, ConstantFolding, BooleanSimplification, + CNFNormalization, SimplifyFilters) :: Nil } @@ -64,9 +65,9 @@ class CNFNormalizationSuite extends SparkFunSuite with PredicateHelper { val resultFilterExpression = actual.collectFirst { case f: Filter => f.condition }.get val expectedFilterExpression = correctAnswer.collectFirst { case f: Filter => f.condition }.get - val exprs = splitConjunctivePredicates(cnfNormalization(resultFilterExpression)) + val exprs = splitConjunctivePredicates(resultFilterExpression) .map(normalizeOrExpression).sortBy(_.toString) - val expectedExprs = splitConjunctivePredicates(cnfNormalization(expectedFilterExpression)) + val expectedExprs = splitConjunctivePredicates(expectedFilterExpression) .map(normalizeOrExpression).sortBy(_.toString) assert(exprs === expectedExprs) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala index cde346e99eb1..e5132f3158bc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala @@ -81,12 +81,6 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper { checkCondition(('a < 2 || 'a > 3 || 'b > 5) && 'a < 2, 'a < 2) checkCondition('a < 2 && ('a < 2 || 'a > 3 || 'b > 5) , 'a < 2) - - checkCondition(('a < 2 || 'b > 3) && ('a < 2 || 'c > 5), 'a < 2 || ('b > 3 && 'c > 5)) - - checkCondition( - ('a === 'b || 'b > 3) && ('a === 'b || 'a > 3) && ('a === 'b || 'a < 5), - ('a === 'b || 'b > 3 && 'a > 3 && 'a < 5)) } test("a && (!a || b)") { @@ -116,13 +110,4 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper { testRelation.where('a > 2 && ('b > 3 || 'b < 5))) comparePlans(actual, expected) } - - test("(a || b) && (a || c) => a || (b && c) when case insensitive") { - val plan = caseInsensitiveAnalyzer.execute( - testRelation.where(('a > 2 || 'b > 3) && ('A > 2 || 'b < 5))) - val actual = Optimize.execute(plan) - val expected = caseInsensitiveAnalyzer.execute( - testRelation.where('a > 2 || ('b > 3 && 'b < 5))) - comparePlans(actual, expected) - } } From 91b2c26028696bd7a80045a9f7075a1d8d9561ff Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Tue, 22 Dec 2015 09:51:19 +0800 Subject: [PATCH 5/5] address Michael's comment --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index acd02e7de063..185878d251f4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -473,14 +473,11 @@ object OptimizeIn extends Rule[LogicalPlan] { object CNFNormalization extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsUp { - case or @ Or(left, right) => (left, right) match { - // reverse Or with its And child to eliminate (And under Or) occurrence - case (And(innerLhs, innerRhs), rhs) => - And(Or(innerLhs, rhs), Or(innerRhs, rhs)) - case (lhs, And(innerLhs, innerRhs)) => - And(Or(lhs, innerLhs), Or(lhs, innerRhs)) - case _ => or - } + // reverse Or with its And child to eliminate (And under Or) occurrence + case Or(And(innerLhs, innerRhs), rhs) => + And(Or(innerLhs, rhs), Or(innerRhs, rhs)) + case Or(lhs, And(innerLhs, innerRhs)) => + And(Or(lhs, innerLhs), Or(lhs, innerRhs)) } } }