From f4809c19f35c9e18a610920d619d35ed32ca9ec7 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Tue, 15 Apr 2014 11:16:49 -0700 Subject: [PATCH 1/4] Move common functions to PredicateHelper. --- .../sql/catalyst/expressions/predicates.scala | 16 ++++++++++++---- .../spark/sql/execution/SparkStrategies.scala | 7 ------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index da5b2cf5b0362..b526803174a69 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -17,10 +17,11 @@ package org.apache.spark.sql.catalyst.expressions -import org.apache.spark.sql.catalyst.trees -import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.analysis.UnresolvedException -import org.apache.spark.sql.catalyst.types.{BooleanType, StringType, TimestampType} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.trees +import org.apache.spark.sql.catalyst.types.BooleanType + object InterpretedPredicate { def apply(expression: Expression): (Row => Boolean) = { @@ -37,10 +38,17 @@ trait Predicate extends Expression { } trait PredicateHelper { - def splitConjunctivePredicates(condition: Expression): Seq[Expression] = condition match { + protected def splitConjunctivePredicates(condition: Expression): Seq[Expression] = condition match { case And(cond1, cond2) => splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2) case other => other :: Nil } + + private def combineConjunctivePredicates(predicates: Seq[Expression]) = + predicates.reduceLeft(And) + + /** Returns true if `expr` can be evaluated using only the output of `plan`. */ + protected def canEvaluate(expr: Expression, plan: LogicalPlan): Boolean = + expr.references subsetOf plan.outputSet } abstract class BinaryPredicate extends BinaryExpression with Predicate { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index fe8bd5a508820..2d83d19e796e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -66,13 +66,6 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } case _ => Nil } - - private def combineConjunctivePredicates(predicates: Seq[Expression]) = - predicates.reduceLeft(And) - - /** Returns true if `expr` can be evaluated using only the output of `plan`. */ - protected def canEvaluate(expr: Expression, plan: LogicalPlan): Boolean = - expr.references subsetOf plan.outputSet } object PartialAggregation extends Strategy { From 14560eb84e9969e89d9ec8e3b93ba130b92f4a39 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Tue, 15 Apr 2014 11:17:24 -0700 Subject: [PATCH 2/4] Generalize pattern for planning hash joins. --- .../sql/catalyst/planning/patterns.scala | 52 +++++++++++++++++++ .../spark/sql/execution/SparkStrategies.scala | 40 ++------------ 2 files changed, 57 insertions(+), 35 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 6dd816aa91dd1..add4c20702c56 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -19,7 +19,10 @@ package org.apache.spark.sql.catalyst.planning import scala.annotation.tailrec +import org.apache.spark.sql.Logging + import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ /** @@ -101,6 +104,55 @@ object PhysicalOperation extends PredicateHelper { } } +/** + * A pattern that finds joins with equality conditions that can be evaluated using hashing + * techniques. For inner joins, any filters on top of the join operator are also matched. + */ +object HashFilteredJoin extends Logging with PredicateHelper { + /** (joinType, rightKeys, leftKeys, condition, left, right) */ + type ReturnType = + (JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan) + + def unapply(plan: LogicalPlan): Option[ReturnType] = plan match { + // All predicates can be evaluated for inner join (i.e., those that are in the ON + // clause and WHERE clause.) + case FilteredOperation(predicates, join @ Join(left, right, Inner, condition)) => + logger.debug(s"Considering hash inner join on: ${predicates ++ condition}") + splitPredicates(predicates ++ condition, join) + case join @ Join(left, right, joinType, condition) => + logger.debug(s"Considering hash join on: $condition") + splitPredicates(condition.toSeq, join) + case _ => None + } + + // Find equi-join predicates that can be evaluated before the join, and thus can be used + // as join keys. + def splitPredicates(allPredicates: Seq[Expression], join: Join): Option[ReturnType] = { + val Join(left, right, joinType, _) = join + val (joinPredicates, otherPredicates) = allPredicates.partition { + case Equals(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) || + (canEvaluate(l, right) && canEvaluate(r, left)) => true + case _ => false + } + + val joinKeys = joinPredicates.map { + case Equals(l,r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r) + case Equals(l,r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l) + } + + // Do not consider this strategy if there are no join keys. + if (joinKeys.nonEmpty) { + val leftKeys = joinKeys.map(_._1) + val rightKeys = joinKeys.map(_._2) + + Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right)) + } else { + logger.debug(s"Avoiding hash join with no join keys.") + None + } + } +} + /** * A pattern that collects all adjacent unions and returns their children as a Seq. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 2d83d19e796e6..bafd1708740b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -28,42 +28,12 @@ import org.apache.spark.sql.parquet._ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { self: SQLContext#SparkPlanner => - object HashJoin extends Strategy { + object HashJoin extends Strategy with PredicateHelper { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case FilteredOperation(predicates, logical.Join(left, right, Inner, condition)) => - logger.debug(s"Considering join: ${predicates ++ condition}") - // Find equi-join predicates that can be evaluated before the join, and thus can be used - // as join keys. Note we can only mix in the conditions with other predicates because the - // match above ensures that this is and Inner join. - val (joinPredicates, otherPredicates) = (predicates ++ condition).partition { - case Equals(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) || - (canEvaluate(l, right) && canEvaluate(r, left)) => true - case _ => false - } - - val joinKeys = joinPredicates.map { - case Equals(l,r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r) - case Equals(l,r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l) - } - - // Do not consider this strategy if there are no join keys. - if (joinKeys.nonEmpty) { - val leftKeys = joinKeys.map(_._1) - val rightKeys = joinKeys.map(_._2) - - val joinOp = execution.HashJoin( - leftKeys, rightKeys, BuildRight, planLater(left), planLater(right)) - - // Make sure other conditions are met if present. - if (otherPredicates.nonEmpty) { - execution.Filter(combineConjunctivePredicates(otherPredicates), joinOp) :: Nil - } else { - joinOp :: Nil - } - } else { - logger.debug(s"Avoiding spark join with no join keys.") - Nil - } + case HashFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) => + val hashJoin = + execution.HashJoin(leftKeys, rightKeys, BuildRight, planLater(left), planLater(right)) + condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil case _ => Nil } } From 366b6d9f0df1e60a4242248dba6fefe9844852c4 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Wed, 16 Apr 2014 14:43:51 -0700 Subject: [PATCH 3/4] style fixes --- .../spark/sql/catalyst/expressions/predicates.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index b526803174a69..59f2d99549700 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -38,9 +38,12 @@ trait Predicate extends Expression { } trait PredicateHelper { - protected def splitConjunctivePredicates(condition: Expression): Seq[Expression] = condition match { - case And(cond1, cond2) => splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2) - case other => other :: Nil + protected def splitConjunctivePredicates(condition: Expression): Seq[Expression] = { + condition match { + case And(cond1, cond2) => + splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2) + case other => other :: Nil + } } private def combineConjunctivePredicates(predicates: Seq[Expression]) = From d5cc79b5b0d0abbb1638e3f0fa98ec60ccbbe404 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Thu, 24 Apr 2014 17:35:25 -0700 Subject: [PATCH 4/4] Address @rxin 's comments. --- .../sql/catalyst/expressions/predicates.scala | 16 +++++++++++----- .../spark/sql/catalyst/planning/patterns.scala | 6 +++--- .../spark/sql/execution/SparkStrategies.scala | 2 ++ 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index 59f2d99549700..82c7af684459f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -46,12 +46,18 @@ trait PredicateHelper { } } - private def combineConjunctivePredicates(predicates: Seq[Expression]) = - predicates.reduceLeft(And) - - /** Returns true if `expr` can be evaluated using only the output of `plan`. */ + /** + * Returns true if `expr` can be evaluated using only the output of `plan`. This method + * can be used to determine when is is acceptable to move expression evaluation within a query + * plan. + * + * For example consider a join between two relations R(a, b) and S(c, d). + * + * `canEvaluate(Equals(a,b), R)` returns `true` where as `canEvaluate(Equals(a,c), R)` returns + * `false`. + */ protected def canEvaluate(expr: Expression, plan: LogicalPlan): Boolean = - expr.references subsetOf plan.outputSet + expr.references.subsetOf(plan.outputSet) } abstract class BinaryPredicate extends BinaryExpression with Predicate { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index add4c20702c56..0e3a8a6bd30a8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -109,7 +109,7 @@ object PhysicalOperation extends PredicateHelper { * techniques. For inner joins, any filters on top of the join operator are also matched. */ object HashFilteredJoin extends Logging with PredicateHelper { - /** (joinType, rightKeys, leftKeys, condition, left, right) */ + /** (joinType, rightKeys, leftKeys, condition, leftChild, rightChild) */ type ReturnType = (JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan) @@ -136,8 +136,8 @@ object HashFilteredJoin extends Logging with PredicateHelper { } val joinKeys = joinPredicates.map { - case Equals(l,r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r) - case Equals(l,r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l) + case Equals(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r) + case Equals(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l) } // Do not consider this strategy if there are no join keys. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index bafd1708740b7..3779f211218d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -30,6 +30,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object HashJoin extends Strategy with PredicateHelper { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + // Find inner joins where at least some predicates can be evaluated by matching hash keys + // using the HashFilteredJoin pattern. case HashFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) => val hashJoin = execution.HashJoin(leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))