From 11de68dde46f68eb80e468347b0a4eb96eb7da9c Mon Sep 17 00:00:00 2001 From: vidmantas zemleris Date: Wed, 4 Nov 2015 00:38:08 +0200 Subject: [PATCH 1/5] Optimize Inner joins with skewed null values --- .../sql/catalyst/optimizer/Optimizer.scala | 47 +++++++++++++++++++ .../optimizer/FilterPushdownSuite.scala | 16 +++++-- 2 files changed, 58 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 0b1c74293bb8..e95ac9730d56 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -40,6 +40,10 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { Batch("Aggregate", FixedPoint(100), ReplaceDistinctWithAggregate, RemoveLiteralFromGroupExpressions) :: + // run only once, and before other condition push-down optimizations + // FIXME: need a better way of identifying if rule was already applied. attribute.nullable do not help + Batch("Join Skew optimization", FixedPoint(1), + JoinSkewOptimizer) :: Batch("Operator Optimizations", FixedPoint(100), // Operator push down SetOperationPushDown, @@ -976,3 +980,46 @@ object RemoveLiteralFromGroupExpressions extends Rule[LogicalPlan] { a.copy(groupingExpressions = newGrouping) } } + +/** + * For an inner join - remove rows with null keys on both sides + */ +object JoinSkewOptimizer extends Rule[LogicalPlan] with PredicateHelper { + /** + * Adds a null filter on given columns, if any + */ + def addNullFilter(columns: AttributeSet, expr: LogicalPlan) = { + columns.map(IsNotNull(_)) + .reduceLeftOption(And) + .map(Filter(_, expr)) + .getOrElse(expr) + } + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case f@Join(left, right, joinType, joinCondition) => + // get "real" join conditions, which refer both left and right + val joinConditionsOnBothRelations = joinCondition + .map(splitConjunctivePredicates).getOrElse(Nil) + .filter(cond => !canEvaluate(cond, left) && !canEvaluate(cond, right)) + + def nullableJoinKeys(leftOrRight: LogicalPlan) = { + val joinKeys = leftOrRight.outputSet.intersect( + joinConditionsOnBothRelations.map(_.references).reduceLeftOption(_ ++ _).getOrElse(AttributeSet.empty) + ) + joinKeys.filter(_.nullable) + } + + def hasNullableKeys = Seq(left, right).exists(nullableJoinKeys(_).nonEmpty) + + joinType match { + case _ @ (Inner | LeftSemi) if hasNullableKeys => + // add a non-null keys filter for both sides sub queries + val newLeft = addNullFilter(nullableJoinKeys(left), left) + val newRight = addNullFilter(nullableJoinKeys(right), right) + + Join(newLeft, newRight, joinType, joinCondition) + + case _ => f + } + } +} \ No newline at end of file diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index fba4c5ca77d6..426437ea2ac9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -33,6 +33,8 @@ class FilterPushdownSuite extends PlanTest { val batches = Batch("Subqueries", Once, EliminateSubQueries) :: + Batch("Join Skew optimization", FixedPoint(1), + JoinSkewOptimizer) :: Batch("Filter Pushdown", Once, SamplePushDown, CombineFilters, @@ -279,7 +281,7 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } - test("joins: push down left semi join") { + test("joins: push down left semi join, and add null filter") { val x = testRelation.subquery('x) val y = testRelation1.subquery('y) @@ -288,8 +290,8 @@ class FilterPushdownSuite extends PlanTest { } val optimized = Optimize.execute(originalQuery.analyze) - val left = testRelation.where('b >= 1) - val right = testRelation1.where('d >= 2) + val left = testRelation.where('a.isNotNull).where('b >= 1) + val right = testRelation1.where('d.isNotNull).where('d >= 2) val correctAnswer = left.join(right, LeftSemi, Option("a".attr === "d".attr)).analyze @@ -474,7 +476,7 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } - test("joins: can't push down") { + test("joins: can't push down query filters, but inner join can be optimized for null skew") { val x = testRelation.subquery('x) val y = testRelation.subquery('y) @@ -483,7 +485,11 @@ class FilterPushdownSuite extends PlanTest { } val optimized = Optimize.execute(originalQuery.analyze) - comparePlans(analysis.EliminateSubQueries(originalQuery.analyze), optimized) + val expectedQueryWithNullFilters = { + x.where('b.isNotNull) + .join(y.where('b.isNotNull), condition = Some("x.b".attr === "y.b".attr)) + } + comparePlans(analysis.EliminateSubQueries(expectedQueryWithNullFilters.analyze), optimized) } test("joins: conjunctive predicates") { From c31a290363fde7eb9d6f742c202f80e713d45256 Mon Sep 17 00:00:00 2001 From: vidmantas zemleris Date: Wed, 4 Nov 2015 01:26:10 +0200 Subject: [PATCH 2/5] Add null filter only for EqualTo join conditions i.e. should not rewrite <=> or comparison, where null semantics are more subtle --- .../sql/catalyst/optimizer/Optimizer.scala | 3 ++- .../optimizer/FilterPushdownSuite.scala | 17 +++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index e95ac9730d56..9cd40bae7693 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1000,6 +1000,7 @@ object JoinSkewOptimizer extends Rule[LogicalPlan] with PredicateHelper { // get "real" join conditions, which refer both left and right val joinConditionsOnBothRelations = joinCondition .map(splitConjunctivePredicates).getOrElse(Nil) + .filter(_.isInstanceOf[EqualTo]) .filter(cond => !canEvaluate(cond, left) && !canEvaluate(cond, right)) def nullableJoinKeys(leftOrRight: LogicalPlan) = { @@ -1022,4 +1023,4 @@ object JoinSkewOptimizer extends Rule[LogicalPlan] with PredicateHelper { case _ => f } } -} \ No newline at end of file +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 426437ea2ac9..5469a20b63e8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -281,6 +281,23 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("joins: push down left semi join, do NOT add null skew filter for <=>") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + + val originalQuery = { + x.join(y, LeftSemi, Option("x.a".attr <=> "y.d".attr && "x.b".attr >= 1 && "y.d".attr >= 2)) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation.where('b >= 1) + val right = testRelation1.where('d >= 2) + val correctAnswer = + left.join(right, LeftSemi, Option("a".attr <=> "d".attr)).analyze + + comparePlans(optimized, correctAnswer) + } + test("joins: push down left semi join, and add null filter") { val x = testRelation.subquery('x) val y = testRelation1.subquery('y) From 8f047f22dfdd3aab956f02f04ca24fb649d95364 Mon Sep 17 00:00:00 2001 From: vidmantas zemleris Date: Wed, 4 Nov 2015 10:59:58 +0200 Subject: [PATCH 3/5] Fix scalaStyle --- .../apache/spark/sql/catalyst/optimizer/Optimizer.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 9cd40bae7693..0875e301f46f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -41,7 +41,6 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { ReplaceDistinctWithAggregate, RemoveLiteralFromGroupExpressions) :: // run only once, and before other condition push-down optimizations - // FIXME: need a better way of identifying if rule was already applied. attribute.nullable do not help Batch("Join Skew optimization", FixedPoint(1), JoinSkewOptimizer) :: Batch("Operator Optimizations", FixedPoint(100), @@ -988,7 +987,7 @@ object JoinSkewOptimizer extends Rule[LogicalPlan] with PredicateHelper { /** * Adds a null filter on given columns, if any */ - def addNullFilter(columns: AttributeSet, expr: LogicalPlan) = { + def addNullFilter(columns: AttributeSet, expr: LogicalPlan): LogicalPlan = { columns.map(IsNotNull(_)) .reduceLeftOption(And) .map(Filter(_, expr)) @@ -1005,7 +1004,9 @@ object JoinSkewOptimizer extends Rule[LogicalPlan] with PredicateHelper { def nullableJoinKeys(leftOrRight: LogicalPlan) = { val joinKeys = leftOrRight.outputSet.intersect( - joinConditionsOnBothRelations.map(_.references).reduceLeftOption(_ ++ _).getOrElse(AttributeSet.empty) + joinConditionsOnBothRelations + .map(_.references) + .reduceLeftOption(_ ++ _).getOrElse(AttributeSet.empty) ) joinKeys.filter(_.nullable) } From eaa12bc39c165d0160d204df2953fc819169a3e4 Mon Sep 17 00:00:00 2001 From: vidmantas zemleris Date: Sat, 7 Nov 2015 21:12:18 +0200 Subject: [PATCH 4/5] Refactor using ExtractEquiJoinKeys --- .../sql/catalyst/optimizer/Optimizer.scala | 47 +++++++------------ 1 file changed, 18 insertions(+), 29 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 0875e301f46f..b9baf3f919be 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.optimizer +import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys + import scala.collection.immutable.HashSet import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, EliminateSubQueries} @@ -987,41 +989,28 @@ object JoinSkewOptimizer extends Rule[LogicalPlan] with PredicateHelper { /** * Adds a null filter on given columns, if any */ - def addNullFilter(columns: AttributeSet, expr: LogicalPlan): LogicalPlan = { - columns.map(IsNotNull(_)) + def addNullFilter(columns: Seq[Expression], expr: LogicalPlan): LogicalPlan = { + columns.map(IsNotNull) .reduceLeftOption(And) .map(Filter(_, expr)) .getOrElse(expr) } - def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case f@Join(left, right, joinType, joinCondition) => - // get "real" join conditions, which refer both left and right - val joinConditionsOnBothRelations = joinCondition - .map(splitConjunctivePredicates).getOrElse(Nil) - .filter(_.isInstanceOf[EqualTo]) - .filter(cond => !canEvaluate(cond, left) && !canEvaluate(cond, right)) - - def nullableJoinKeys(leftOrRight: LogicalPlan) = { - val joinKeys = leftOrRight.outputSet.intersect( - joinConditionsOnBothRelations - .map(_.references) - .reduceLeftOption(_ ++ _).getOrElse(AttributeSet.empty) - ) - joinKeys.filter(_.nullable) - } - - def hasNullableKeys = Seq(left, right).exists(nullableJoinKeys(_).nonEmpty) - - joinType match { - case _ @ (Inner | LeftSemi) if hasNullableKeys => - // add a non-null keys filter for both sides sub queries - val newLeft = addNullFilter(nullableJoinKeys(left), left) - val newRight = addNullFilter(nullableJoinKeys(right), right) - - Join(newLeft, newRight, joinType, joinCondition) + private def hasNullableKeys(leftKeys: Seq[Expression], rightKeys: Seq[Expression]) = { + leftKeys.exists(_.nullable) || rightKeys.exists(_.nullable) + } - case _ => f + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case join @ Join(left, right, joinType, originalJoinCondition) => + join match { + case ExtractEquiJoinKeys(_, leftKeys, rightKeys, _, _, _) + if hasNullableKeys(leftKeys, rightKeys) && Seq(Inner, LeftSemi).contains(joinType) => + // add a non-null join-key filter on both sides of join + val newLeft = addNullFilter(leftKeys.filter(_.nullable), left) + val newRight = addNullFilter(rightKeys.filter(_.nullable), right) + Join(newLeft, newRight, joinType, originalJoinCondition) + + case _ => join } } } From cd8ca343019d1e7a2a43128ea070f9cda828dc81 Mon Sep 17 00:00:00 2001 From: vidmantas zemleris Date: Sun, 8 Nov 2015 11:35:07 +0200 Subject: [PATCH 5/5] Refactor to add null filter to joinConditions it will be pushed down by other rules, such as PushPredicateThroughJoin --- .../sql/catalyst/optimizer/Optimizer.scala | 23 +++++++------------ .../optimizer/FilterPushdownSuite.scala | 4 ++-- 2 files changed, 10 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index b9baf3f919be..92da389e20e9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -986,16 +986,6 @@ object RemoveLiteralFromGroupExpressions extends Rule[LogicalPlan] { * For an inner join - remove rows with null keys on both sides */ object JoinSkewOptimizer extends Rule[LogicalPlan] with PredicateHelper { - /** - * Adds a null filter on given columns, if any - */ - def addNullFilter(columns: Seq[Expression], expr: LogicalPlan): LogicalPlan = { - columns.map(IsNotNull) - .reduceLeftOption(And) - .map(Filter(_, expr)) - .getOrElse(expr) - } - private def hasNullableKeys(leftKeys: Seq[Expression], rightKeys: Seq[Expression]) = { leftKeys.exists(_.nullable) || rightKeys.exists(_.nullable) } @@ -1003,12 +993,15 @@ object JoinSkewOptimizer extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case join @ Join(left, right, joinType, originalJoinCondition) => join match { + // add a non-null join-key filter on both sides of Inner or LeftSemi join case ExtractEquiJoinKeys(_, leftKeys, rightKeys, _, _, _) - if hasNullableKeys(leftKeys, rightKeys) && Seq(Inner, LeftSemi).contains(joinType) => - // add a non-null join-key filter on both sides of join - val newLeft = addNullFilter(leftKeys.filter(_.nullable), left) - val newRight = addNullFilter(rightKeys.filter(_.nullable), right) - Join(newLeft, newRight, joinType, originalJoinCondition) + if Seq(Inner, LeftSemi).contains(joinType) && hasNullableKeys(leftKeys, rightKeys) => + val nullFilters = (leftKeys ++ rightKeys) + .filter(_.nullable) + .map(IsNotNull) + val newJoinCondition = (originalJoinCondition ++ nullFilters).reduceLeftOption(And) + + Join(left, right, joinType, newJoinCondition) case _ => join } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 5469a20b63e8..b1e6851f17ee 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -307,8 +307,8 @@ class FilterPushdownSuite extends PlanTest { } val optimized = Optimize.execute(originalQuery.analyze) - val left = testRelation.where('a.isNotNull).where('b >= 1) - val right = testRelation1.where('d.isNotNull).where('d >= 2) + val left = testRelation.where('b >= 1 && 'a.isNotNull) + val right = testRelation1.where('d >= 2 && 'd.isNotNull) val correctAnswer = left.join(right, LeftSemi, Option("a".attr === "d".attr)).analyze