From baac6327b5a9c1a234e34da538a72d8ef87a9e35 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 6 Oct 2016 14:47:34 +0000 Subject: [PATCH 1/4] Convert filter predicate to CNF in Optimizer. --- .../spark/sql/catalyst/CatalystConf.scala | 5 + .../sql/catalyst/optimizer/Optimizer.scala | 1 + .../sql/catalyst/optimizer/expressions.scala | 30 +++ .../expressions/CNFNormalizationSuite.scala | 175 ++++++++++++++++++ .../optimizer/FilterPushdownSuite.scala | 44 +++++ .../apache/spark/sql/internal/SQLConf.scala | 16 ++ 6 files changed, 271 insertions(+) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CNFNormalizationSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala index 75ae588c18ec6..cc216cfc40d0f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala @@ -32,6 +32,9 @@ trait CatalystConf { def optimizerInSetConversionThreshold: Int def maxCaseBranchesForCodegen: Int + def maxDepthForCNFNormalization: Int + def maxPredicateNumberForCNFNormalization: Int + def runSQLonFile: Boolean def warehousePath: String @@ -60,6 +63,8 @@ case class SimpleCatalystConf( optimizerMaxIterations: Int = 100, optimizerInSetConversionThreshold: Int = 10, maxCaseBranchesForCodegen: Int = 20, + maxDepthForCNFNormalization: Int = 10, + maxPredicateNumberForCNFNormalization: Int = 20, runSQLonFile: Boolean = true, crossJoinEnabled: Boolean = false, warehousePath: String = "/user/hive/warehouse") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index e5e2cd7d27d15..02ed6115b4605 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -100,6 +100,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) ReorderAssociativeOperator, LikeSimplification, BooleanSimplification, + CNFNormalization(conf), SimplifyConditionals, RemoveDispensableExpressions, SimplifyBinaryComparison, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index b7458910da13e..5744c48f33f02 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.optimizer import scala.collection.immutable.HashSet +import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.CatalystConf import org.apache.spark.sql.catalyst.analysis._ @@ -132,6 +133,35 @@ case class OptimizeIn(conf: CatalystConf) extends Rule[LogicalPlan] { } } +/** + * Convert the predicates of [[Filter]] operators to CNF form. + */ +case class CNFNormalization(conf: CatalystConf) extends Rule[LogicalPlan] with PredicateHelper { + private def toCNF(predicate: Expression, depth: Int = 0): Expression = { + if (depth > conf.maxDepthForCNFNormalization) { + return predicate + } + val disjunctives = splitDisjunctivePredicates(predicate) + var finalPredicates = splitConjunctivePredicates(disjunctives.head) + disjunctives.tail.foreach { cond => + val predicates = new ArrayBuffer[Expression]() + splitConjunctivePredicates(cond).map { p => + predicates ++= finalPredicates.map(Or(_, p)) + } + finalPredicates = predicates.toSeq + } + val cnf = finalPredicates.map(toCNF(_, depth + 1)) + if (depth == 0 && cnf.length > conf.maxPredicateNumberForCNFNormalization) { + return predicate + } else { + cnf.reduce(And) + } + } + + override def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case f @ Filter(condition, _) => f.copy(condition = toCNF(condition)) + } +} /** * Simplifies boolean expressions: diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CNFNormalizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CNFNormalizationSuite.scala new file mode 100644 index 0000000000000..0dcbe88c2b9a1 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CNFNormalizationSuite.scala @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.optimizer._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.catalyst.SimpleCatalystConf + +class CNFNormalizationSuite extends SparkFunSuite with PredicateHelper { + + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = + Batch("AnalysisNodes", Once, + EliminateSubqueryAliases) :: + Batch("Constant Folding", FixedPoint(50), + NullPropagation, + ConstantFolding, + BooleanSimplification, + CNFNormalization(SimpleCatalystConf(true)), + PruneFilters) :: Nil + } + + val testRelation = LocalRelation('a.int, 'b.int, 'c.int, 'd.int, 'e.int) + + // Change the predicate orders in [[And]] and [[Or]] so we can compare them consistently. + private def normalizationPredicate(predicate: Expression): Expression = { + predicate transformUp { + case Or(a, b) => + if (a.hashCode() > b.hashCode) { + Or(b, a) + } else { + Or(a, b) + } + case And(a, b) => + if (a.hashCode() > b.hashCode) { + And(b, a) + } else { + And(a, b) + } + } + } + + private def checkCondition(input: Expression, expected: Expression): Unit = { + val actual = Optimize.execute(testRelation.where(input).analyze) + val correctAnswer = Optimize.execute(testRelation.where(expected).analyze) + + val resultFilterExpression = actual.collectFirst { case f: Filter => f.condition }.get + val expectedFilterExpression = correctAnswer.collectFirst { case f: Filter => f.condition }.get + + val normalizedResult = splitConjunctivePredicates(resultFilterExpression) + .map(normalizationPredicate).sortBy(_.toString) + val normalizedExpected = splitConjunctivePredicates(expectedFilterExpression) + .map(normalizationPredicate).sortBy(_.toString) + + assert(normalizedResult == normalizedExpected) + } + + private val a = Literal(1) < 'a + private val b = Literal(1) < 'b + private val c = Literal(1) < 'c + private val d = Literal(1) < 'd + private val e = Literal(1) < 'e + private val f = ! a + + test("a || b => a || b") { + checkCondition(a || b, a || b) + } + + test("a && b && c => a && b && c") { + checkCondition(a && b && c, a && b && c) + } + + test("a && !(b || c) => a && !b && !c") { + checkCondition(a && !(b || c), a && !b && !c) + } + + test("a && b || c => (a || c) && (b || c)") { + checkCondition(a && b || c, (a || c) && (b || c)) + } + + test("a && b || f => (a || f) && (b || f)") { + checkCondition(a && b || f, b || f) + } + + test("(a && b) || (c && d) => (c || a) && (c || b) && ((d || a) && (d || b))") { + checkCondition((a && b) || (c && d), (a || c) && (b || c) && (a || d) && (b || d)) + } + + test("(a && b) || !(c && d) => (a || !c || !d) && (b || !c || !d)") { + checkCondition((a && b) || !(c && d), (a || !c || !d) && (b || !c || !d)) + } + + test("a || b || c && d => (a || b || c) && (a || b || d)") { + checkCondition(a || b || c && d, (a || b || c) && (a || b || d)) + } + + test("a || (b && c || d) => (a || b || d) && (a || c || d)") { + checkCondition(a || (b && c || d), (a || b || d) && (a || c || d)) + } + + test("a || !(b && c || d) => (a || !b || !c) && (a || !d)") { + checkCondition(a || !(b && c || d), (a || !b || !c) && (a || !d)) + } + + test("a && (b && c || d && e) => a && (b || d) && (c || d) && (b || e) && (c || e)") { + val input = a && (b && c || d && e) + val expected = a && (b || d) && (c || d) && (b || e) && (c || e) + checkCondition(input, expected) + } + + test("a && !(b && c || d && e) => a && (!b || !c) && (!d || !e)") { + checkCondition(a && !(b && c || d && e), a && (!b || !c) && (!d || !e)) + } + + test( + "a || (b && c || d && e) => (a || b || d) && (a || c || d) && (a || b || e) && (a || c || e)") { + val input = a || (b && c || d && e) + val expected = (a || b || d) && (a || c || d) && (a || b || e) && (a || c || e) + checkCondition(input, expected) + } + + test( + "a || !(b && c || d && e) => (a || !b || !c) && (a || !d || !e)") { + checkCondition(a || !(b && c || d && e), (a || !b || !c) && (a || !d || !e)) + } + + test("a && b && c || !(d && e) => (a || !d || !e) && (b || !d || !e) && (c || !d || !e)") { + val input = a && b && c || !(d && e) + val expected = (a || !d || !e) && (b || !d || !e) && (c || !d || !e) + checkCondition(input, expected) + } + + test( + "a && b && c || d && e && f => " + + "(a || d) && (a || e) && (a || f) && (b || d) && " + + "(b || e) && (b || f) && (c || d) && (c || e) && (c || f)") { + val input = (a && b && c) || (d && e && f) + val expected = (a || d) && (a || e) && (a || f) && + (b || d) && (b || e) && (b || f) && + (c || d) && (c || e) && (c || f) + checkCondition(input, expected) + } + + test("CNF normalization exceeds max predicate numbers") { + val input = (1 to 100).map(i => Literal(i) < 'c).reduce(And) || + (1 to 10).map(i => Literal(i) < 'a).reduce(And) + val analyzed = testRelation.where(input).analyze + val optimized = Optimize.execute(analyzed) + val resultFilterExpression = optimized.collectFirst { case f: Filter => f.condition }.get + val expectedFilterExpression = analyzed.collectFirst { case f: Filter => f.condition }.get + assert(resultFilterExpression.semanticEquals(expectedFilterExpression)) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 019f132d94cb2..9adb85501d76d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.catalyst.SimpleCatalystConf import org.apache.spark.sql.types.IntegerType class FilterPushdownSuite extends PlanTest { @@ -37,6 +38,7 @@ class FilterPushdownSuite extends PlanTest { CombineFilters, PushDownPredicate, BooleanSimplification, + CNFNormalization(SimpleCatalystConf(true)), PushPredicateThroughJoin, CollapseProject) :: Nil } @@ -1018,4 +1020,46 @@ class FilterPushdownSuite extends PlanTest { comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze) } + + test("push down filters that are not be able to pushed down after simplification") { + // The following predicate ('a === 2 || 'a === 3) && ('c > 10 || 'a === 2) + // will be simplified as ('a == 2) || ('c > 10 && 'a == 3). + // In its original form, ('a === 2 || 'a === 3) can be pushed down. + // But the simplified one can't. + val originalQuery = testRelation + .select('a, 'b, ('c + 1) as 'cc) + .groupBy('a)('a, count('cc) as 'c) + .where('c > 10) // this predicate can't be pushed down. + .where(('a === 2 || 'a === 3) && ('c > 10 || 'a === 2)) + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = + testRelation + .where('a === 2 || 'a === 3) + .select('a, 'b, ('c + 1) as 'cc) + .groupBy('a)('a, count('cc) as 'c) + .where('c > 10).analyze + + comparePlans(optimized, correctAnswer) + } + + test("disjunctive predicates which are able to pushdown should be pushed down after converted") { + // (('a === 2) || ('c > 10 || 'a === 3)) can't be pushdown due to the disjunctive form. + // However, its conjunctive normal form can be pushdown. + val originalQuery = testRelation + .select('a, 'b, ('c + 1) as 'cc) + .groupBy('a)('a, count('cc) as 'c) + .where('c > 10) + .where(('a === 2) || ('c > 10 && 'a === 3)) + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = + testRelation + .where('a === 2 || 'a === 3) + .select('a, 'b, ('c + 1) as 'cc) + .groupBy('a)('a, count('cc) as 'c) + .where('c > 10).analyze + + comparePlans(optimized, correctAnswer) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index fecdf792fd14a..65058f6a0f370 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -454,6 +454,18 @@ object SQLConf { .intConf .createWithDefault(20) + val MAX_DEPTH_CNF_PREDICATE = SQLConfigBuilder("spark.sql.expression.cnf.maxDepth") + .internal() + .doc("The maximum depth of converting recursively filter predicates to CNF normalization.") + .intConf + .createWithDefault(10) + + val MAX_PREDICATE_NUMBER_CNF_PREDICATE = SQLConfigBuilder("spark.sql.expression.cnf.maxNumber") + .internal() + .doc("The maximum number of predicates in the CNF normalization of filter predicates") + .intConf + .createWithDefault(20) + val FILES_MAX_PARTITION_BYTES = SQLConfigBuilder("spark.sql.files.maxPartitionBytes") .doc("The maximum number of bytes to pack into a single partition when reading files.") .longConf @@ -685,6 +697,10 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def maxCaseBranchesForCodegen: Int = getConf(MAX_CASES_BRANCHES) + def maxDepthForCNFNormalization: Int = getConf(MAX_DEPTH_CNF_PREDICATE) + + def maxPredicateNumberForCNFNormalization: Int = getConf(MAX_PREDICATE_NUMBER_CNF_PREDICATE) + def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED) def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE) From c0637b26808aed386c4d937ebca44958e9f89c09 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 7 Oct 2016 02:49:35 +0000 Subject: [PATCH 2/4] Improve test. --- .../catalyst/expressions/Canonicalize.scala | 3 ++ .../sql/catalyst/optimizer/expressions.scala | 8 ++++- .../expressions/CNFNormalizationSuite.scala | 35 ++++++------------- .../expressions/ExpressionSetSuite.scala | 18 ++++++++++ 4 files changed, 39 insertions(+), 25 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala index 07ba7d5e4a849..7ef9d1e9c881b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala @@ -62,6 +62,9 @@ object Canonicalize extends { case a: Add => orderCommutative(a, { case Add(l, r) => Seq(l, r) }).reduce(Add) case m: Multiply => orderCommutative(m, { case Multiply(l, r) => Seq(l, r) }).reduce(Multiply) + case o: Or => orderCommutative(o, { case Or(l, r) => Seq(l, r) }).reduce(Or) + case a: And => orderCommutative(a, { case And(l, r) => Seq(l, r)}).reduce(And) + case EqualTo(l, r) if l.hashCode() > r.hashCode() => EqualTo(r, l) case EqualNullSafe(l, r) if l.hashCode() > r.hashCode() => EqualNullSafe(r, l) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index 5744c48f33f02..df6ed026e8e1d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -150,7 +150,13 @@ case class CNFNormalization(conf: CatalystConf) extends Rule[LogicalPlan] with P } finalPredicates = predicates.toSeq } - val cnf = finalPredicates.map(toCNF(_, depth + 1)) + val cnf = finalPredicates.map { p => + if (p.semanticEquals(predicate)) { + p + } else { + toCNF(p, depth + 1) + } + } if (depth == 0 && cnf.length > conf.maxPredicateNumberForCNFNormalization) { return predicate } else { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CNFNormalizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CNFNormalizationSuite.scala index 0dcbe88c2b9a1..adc66226ea15a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CNFNormalizationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CNFNormalizationSuite.scala @@ -44,24 +44,6 @@ class CNFNormalizationSuite extends SparkFunSuite with PredicateHelper { val testRelation = LocalRelation('a.int, 'b.int, 'c.int, 'd.int, 'e.int) - // Change the predicate orders in [[And]] and [[Or]] so we can compare them consistently. - private def normalizationPredicate(predicate: Expression): Expression = { - predicate transformUp { - case Or(a, b) => - if (a.hashCode() > b.hashCode) { - Or(b, a) - } else { - Or(a, b) - } - case And(a, b) => - if (a.hashCode() > b.hashCode) { - And(b, a) - } else { - And(a, b) - } - } - } - private def checkCondition(input: Expression, expected: Expression): Unit = { val actual = Optimize.execute(testRelation.where(input).analyze) val correctAnswer = Optimize.execute(testRelation.where(expected).analyze) @@ -69,12 +51,7 @@ class CNFNormalizationSuite extends SparkFunSuite with PredicateHelper { val resultFilterExpression = actual.collectFirst { case f: Filter => f.condition }.get val expectedFilterExpression = correctAnswer.collectFirst { case f: Filter => f.condition }.get - val normalizedResult = splitConjunctivePredicates(resultFilterExpression) - .map(normalizationPredicate).sortBy(_.toString) - val normalizedExpected = splitConjunctivePredicates(expectedFilterExpression) - .map(normalizationPredicate).sortBy(_.toString) - - assert(normalizedResult == normalizedExpected) + assert(resultFilterExpression.semanticEquals(expectedFilterExpression)) } private val a = Literal(1) < 'a @@ -163,6 +140,16 @@ class CNFNormalizationSuite extends SparkFunSuite with PredicateHelper { checkCondition(input, expected) } + test("((a && b) || (c && d)) || e") { + val input = ((a && b) || (c && d)) || e + val expected = ((a || c) || e) && ((a || d) || e) && ((b || c) || e) && ((b || d) || e) + checkCondition(input, expected) + val analyzed = testRelation.where(input).analyze + val optimized = Optimize.execute(analyzed) + val resultFilterExpression = optimized.collectFirst { case f: Filter => f.condition }.get + println(s"resultFilterExpression: $resultFilterExpression") + } + test("CNF normalization exceeds max predicate numbers") { val input = (1 to 100).map(i => Literal(i) < 'c).reduce(And) || (1 to 10).map(i => Literal(i) < 'a).reduce(And) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSetSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSetSuite.scala index 60939ee0eda5d..55ee4f7aed0f1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSetSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSetSuite.scala @@ -80,6 +80,24 @@ class ExpressionSetSuite extends SparkFunSuite { setTest(1, Not(aUpper >= 1), aUpper < 1, Not(Literal(1) <= aUpper), Literal(1) > aUpper) setTest(1, Not(aUpper <= 1), aUpper > 1, Not(Literal(1) >= aUpper), Literal(1) < aUpper) + setTest(1, aUpper > bUpper && aUpper <= 10, aUpper <= 10 && aUpper > bUpper) + setTest(1, + aUpper > bUpper && + bUpper > 100 && + aUpper <= 10, + bUpper > 100 && + aUpper <= 10 && + aUpper > bUpper) + + setTest(1, aUpper > bUpper || aUpper <= 10, aUpper <= 10 || aUpper > bUpper) + setTest(1, + aUpper > bUpper || + bUpper > 100 || + aUpper <= 10, + bUpper > 100 || + aUpper <= 10 || + aUpper > bUpper) + test("add to / remove from set") { val initialSet = ExpressionSet(aUpper + 1 :: Nil) From f0872fe8b208ddda6e2cb335f9c6a58a195a0960 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 7 Oct 2016 02:50:08 +0000 Subject: [PATCH 3/4] improve test. --- .../sql/catalyst/expressions/CNFNormalizationSuite.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CNFNormalizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CNFNormalizationSuite.scala index adc66226ea15a..2068032a94eeb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CNFNormalizationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CNFNormalizationSuite.scala @@ -144,10 +144,6 @@ class CNFNormalizationSuite extends SparkFunSuite with PredicateHelper { val input = ((a && b) || (c && d)) || e val expected = ((a || c) || e) && ((a || d) || e) && ((b || c) || e) && ((b || d) || e) checkCondition(input, expected) - val analyzed = testRelation.where(input).analyze - val optimized = Optimize.execute(analyzed) - val resultFilterExpression = optimized.collectFirst { case f: Filter => f.condition }.get - println(s"resultFilterExpression: $resultFilterExpression") } test("CNF normalization exceeds max predicate numbers") { From 5343947cfeb287e1f0e02e472cc2ada441c671a4 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 19 Oct 2016 15:36:53 +0000 Subject: [PATCH 4/4] Add comments. --- .../sql/catalyst/optimizer/expressions.scala | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index df6ed026e8e1d..a7ae7eef35c84 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -134,13 +134,27 @@ case class OptimizeIn(conf: CatalystConf) extends Rule[LogicalPlan] { } /** - * Convert the predicates of [[Filter]] operators to CNF form. + * Converts the predicates of [[Filter]] operators to CNF form. */ case class CNFNormalization(conf: CatalystConf) extends Rule[LogicalPlan] with PredicateHelper { + /** + * Converts a predicate expression to its CNF format. There is a given parameter `depth` which + * can be used to control the processing depth of CNF conversion. + */ private def toCNF(predicate: Expression, depth: Int = 0): Expression = { if (depth > conf.maxDepthForCNFNormalization) { return predicate } + // For a predicate like: (A && B) || (C && D) || (E) + // The steps in follows looks like: + // 1. (A && B) || (C && D) => (A && B), (C && D), E + // 2. (A && B) => A, B + // 3. foreach predicate in (C && D), E + // 3.a. generate (A || C), (B || C), (A || D), (B || D) + // 3.b. generate ((A || C) || E), ((B || C) || E), ((A || D) || E), ((B || D) || E) + // 4. Recursively apply on each predicate with increasing depth. + // 5. Concatenate them with `AND`: + // ((A || C) || E) && ((B || C) || E) && ((A || D) || E) && ((B || D) || E) val disjunctives = splitDisjunctivePredicates(predicate) var finalPredicates = splitConjunctivePredicates(disjunctives.head) disjunctives.tail.foreach { cond => @@ -157,7 +171,9 @@ case class CNFNormalization(conf: CatalystConf) extends Rule[LogicalPlan] with P toCNF(p, depth + 1) } } - if (depth == 0 && cnf.length > conf.maxPredicateNumberForCNFNormalization) { + // To prevent expression explosion problem in CNF conversion, we throw away the CNF format if + // its length is more then a threshold. + if (cnf.length > conf.maxPredicateNumberForCNFNormalization) { return predicate } else { cnf.reduce(And)