Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.{RuleExecutor, Rule}
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
Expand All @@ -47,6 +48,34 @@ trait Predicate extends Expression {
override def dataType: DataType = BooleanType
}

object Predicate extends PredicateHelper {
def toCNF(predicate: Expression, maybeThreshold: Option[Double] = None): Expression = {
val cnf = new CNFExecutor(predicate).execute(predicate)
val threshold = maybeThreshold.map(predicate.size * _).getOrElse(Double.MaxValue)
if (cnf.size > threshold) predicate else cnf
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the threshold is exceeded, the original predicate rather than the intermediate converted predicate is returned. This because the intermediate result may not be in CNF, thus:

  1. It doesn't bring much benefit for filter push-down, and
  2. It's much larger than the original predicate and brings extra evaluation cost.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree with 1. I don't see why it matters if it is all CNF or none. I think the heuristic we want is something like "maximize the number of simple predicates that are in CNF form". Simple here means contains just 1 attribute or binary predicate between two. These are candidates for benefiting from further optimization.

We could try cost basing this or just stopping the expansion after some amount.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maximizing the number of simple predicates sounds reasonable. We may do the conversion in a depth-first manner, i.e. always convert the left branch of an And and then its right branch, until either no more predicates can be converted or we reach the size limit. In this way the intermediate result is still useful.

BTW, searched for CNF conversion in Hive and found HIVE-9166, which also tries to put an upper limit for ORC SARG CNF conversion. @nongli Any clues about how Impala does this?

}

private class CNFNormalization(input: Expression)
extends Rule[Expression] {

override def apply(tree: Expression): Expression = {
import org.apache.spark.sql.catalyst.dsl.expressions._

tree transformDown {
case Not(Not(e)) => e
case Not(a And b) => !a || !b
case Not(a Or b) => !a && !b
case a Or (b And c) => (a || b) && (a || c)
case (a And b) Or c => (a || c) && (b || c)
}
}
}

private class CNFExecutor(input: Expression) extends RuleExecutor[Expression] {
override protected val batches: Seq[Batch] =
Batch("CNFNormalization", FixedPoint.Unlimited, new CNFNormalization(input)) :: Nil
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FixedPoint.Unlimited is used here to guarantee that we can really reach CNF when no expansion threshold is provided. This should be safe since all the rules defined here converge.

}
}

trait PredicateHelper {
protected def splitConjunctivePredicates(condition: Expression): Seq[Expression] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ object DefaultOptimizer extends Optimizer {
ConstantFolding,
LikeSimplification,
BooleanSimplification,
CNFNormalization,
RemoveDispensableExpressions,
SimplifyFilters,
SimplifyCasts,
Expand Down Expand Up @@ -583,6 +584,12 @@ object BooleanSimplification extends Rule[LogicalPlan] with PredicateHelper {
}
}

object CNFNormalization extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case f @ Filter(condition, _) => f.copy(condition = Predicate.toCNF(condition, Some(10)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently, the expansion threshold can be made a configuration option.

}
}

/**
* Combines two adjacent [[Filter]] operators into one, merging the
* conditions into one conjunctive predicate.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
/** A strategy that runs until fix point or maxIterations times, whichever comes first. */
case class FixedPoint(maxIterations: Int) extends Strategy

object FixedPoint {
val Unlimited: FixedPoint = FixedPoint(-1)
}

/** A batch of rules. */
protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)

Expand Down Expand Up @@ -95,7 +99,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
result
}
iteration += 1
if (iteration > batch.strategy.maxIterations) {
if (batch.strategy.maxIterations > 0 && iteration > batch.strategy.maxIterations) {
// Only log if this is a rule that is supposed to run more than once.
if (iteration != 2) {
logInfo(s"Max iterations (${iteration - 1}) reached for batch ${batch.name}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
}
case _ => JNull
}

/** Returns total number of tree nodes in this tree. */
def size: Int = 1 + children.map(_.size).sum
}

object TreeNode {
Expand Down