From d09d905180c0eb9d8b1ed9aaf5d207e2fac6547c Mon Sep 17 00:00:00 2001 From: Min Qiu Date: Tue, 1 Dec 2015 18:01:02 -0800 Subject: [PATCH 1/3] Extract the common equality conditions that can be used as a join condition --- .../sql/catalyst/optimizer/Optimizer.scala | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index f4dba67f13b54..f390751f8e8c2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -40,6 +40,8 @@ object DefaultOptimizer extends Optimizer { Batch("Aggregate", FixedPoint(100), ReplaceDistinctWithAggregate, RemoveLiteralFromGroupExpressions) :: + Batch("CNF factorization", FixedPoint(100), + ExtractEqualJoinCondition) :: Batch("Operator Optimizations", FixedPoint(100), // Operator push down SetOperationPushDown, @@ -911,3 +913,51 @@ object RemoveLiteralFromGroupExpressions extends Rule[LogicalPlan] { a.copy(groupingExpressions = newGrouping) } } + +/** + * Extracts the equal-join condition if any, so that query planner avoids generating cartsian + * product which cause out of memory exception, and performance issues + */ +object ExtractEqualJoinCondition extends Rule[LogicalPlan] with PredicateHelper{ + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case f @ Join(left, right, joinType, joinCondition) => + joinCondition match { + case Some(e) if isDNF(e) => { + val disjConditions = splitDisjunctivePredicates(e) + val exprMatrix = disjConditions.map(splitConjunctivePredicates) + if(exprMatrix.length <= 1) f + else { + val pattern = exprMatrix(0) + val comExprs: Seq[Expression] = pattern.filter(p => isCommonExpr(p, exprMatrix, 1)) + val newExprMatrix = exprMatrix.map(_.diff(comExprs)) + val newJoinCond = (comExprs :+ newExprMatrix.map(_.reduceLeft(And)).reduceLeft(Or)) + .reduceLeftOption(And) + Join(left, right, joinType, newJoinCond) + } + } + case _ => f + } + } + + def isCommonExpr(pattern: Expression, matrix: Seq[Seq[Expression]], startIndex: Int) : Boolean = { + val duplicatedCount = matrix.drop(startIndex).count(arr => arr.contains(pattern)) + return duplicatedCount == matrix.length - startIndex + } + + def isDNF(condition: Expression) : Boolean = { + condition match { + case Or(left, right) => isDNF(left) && isDNF(right) + case And(left, right) => isCNF(left) && isCNF(right) + case _ => true + } + } + + def isCNF(condition: Expression): Boolean = { + condition match { + case And(left, right) => isCNF(left) && isCNF(right) + case Or(left, right) => false + case _ => true + } + } +} + From 08a76aefcc6036d600fa92aee037515cce22ae09 Mon Sep 17 00:00:00 2001 From: Min Qiu Date: Wed, 2 Dec 2015 17:01:41 -0800 Subject: [PATCH 2/3] bug fix for ColumnPruning rule to deal with Project <- Filter <- Join case --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index f390751f8e8c2..406e8426374b0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -199,6 +199,7 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper { * - Aggregate * - Generate * - Project <- Join + * - Project <- Filter <- Join * - LeftSemiJoin */ object ColumnPruning extends Rule[LogicalPlan] { @@ -248,6 +249,16 @@ object ColumnPruning extends Rule[LogicalPlan] { Project(projectList, Join(pruneJoinChild(left), pruneJoinChild(right), joinType, condition)) + // Eliminate unneeded attributes from either side of a Join. + case Project(projectList, Filter(predicates, Join(left, right, joinType, condition))) => + val allReferences: AttributeSet = + AttributeSet( + projectList.flatMap(_.references.iterator)) ++ + predicates.references ++ + condition.map(_.references).getOrElse(AttributeSet(Seq.empty)) + Project(projectList, Filter(predicates, Join( + prunedChild(left, allReferences), prunedChild(right, allReferences), joinType, condition))) + // Eliminate unneeded attributes from right side of a LeftSemiJoin. case Join(left, right, LeftSemi, condition) => // Collect the list of all references required to evaluate the condition. From b002b393124568f6d171e5619d40ff749b9b639d Mon Sep 17 00:00:00 2001 From: Min Qiu Date: Wed, 2 Dec 2015 18:04:54 -0800 Subject: [PATCH 3/3] bug fix for ColumnPruning rule to deal with Project <- Filter <- Join case --- .../sql/catalyst/optimizer/Optimizer.scala | 50 ------------------- 1 file changed, 50 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 406e8426374b0..098a5a8ee7154 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -40,8 +40,6 @@ object DefaultOptimizer extends Optimizer { Batch("Aggregate", FixedPoint(100), ReplaceDistinctWithAggregate, RemoveLiteralFromGroupExpressions) :: - Batch("CNF factorization", FixedPoint(100), - ExtractEqualJoinCondition) :: Batch("Operator Optimizations", FixedPoint(100), // Operator push down SetOperationPushDown, @@ -924,51 +922,3 @@ object RemoveLiteralFromGroupExpressions extends Rule[LogicalPlan] { a.copy(groupingExpressions = newGrouping) } } - -/** - * Extracts the equal-join condition if any, so that query planner avoids generating cartsian - * product which cause out of memory exception, and performance issues - */ -object ExtractEqualJoinCondition extends Rule[LogicalPlan] with PredicateHelper{ - def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case f @ Join(left, right, joinType, joinCondition) => - joinCondition match { - case Some(e) if isDNF(e) => { - val disjConditions = splitDisjunctivePredicates(e) - val exprMatrix = disjConditions.map(splitConjunctivePredicates) - if(exprMatrix.length <= 1) f - else { - val pattern = exprMatrix(0) - val comExprs: Seq[Expression] = pattern.filter(p => isCommonExpr(p, exprMatrix, 1)) - val newExprMatrix = exprMatrix.map(_.diff(comExprs)) - val newJoinCond = (comExprs :+ newExprMatrix.map(_.reduceLeft(And)).reduceLeft(Or)) - .reduceLeftOption(And) - Join(left, right, joinType, newJoinCond) - } - } - case _ => f - } - } - - def isCommonExpr(pattern: Expression, matrix: Seq[Seq[Expression]], startIndex: Int) : Boolean = { - val duplicatedCount = matrix.drop(startIndex).count(arr => arr.contains(pattern)) - return duplicatedCount == matrix.length - startIndex - } - - def isDNF(condition: Expression) : Boolean = { - condition match { - case Or(left, right) => isDNF(left) && isDNF(right) - case And(left, right) => isCNF(left) && isCNF(right) - case _ => true - } - } - - def isCNF(condition: Expression): Boolean = { - condition match { - case And(left, right) => isCNF(left) && isCNF(right) - case Or(left, right) => false - case _ => true - } - } -} -