From 7237f618b8f0728a1f1585e1f4db155e6d38519b Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 1 Jan 2016 20:04:38 -0800 Subject: [PATCH 1/5] reorder outer and inner joins --- .../sql/catalyst/optimizer/Optimizer.scala | 52 ++++++++- .../catalyst/optimizer/JoinOrderSuite.scala | 3 +- .../apache/spark/sql/DataFrameJoinSuite.scala | 100 ++++++++++++++++++ 3 files changed, 150 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 0b1c74293bb8..d31a53a7ac35 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -22,7 +22,7 @@ import scala.collection.immutable.HashSet import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, EliminateSubQueries} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins +import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, ExtractFiltersAndInnerJoins} import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ @@ -44,7 +44,8 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { // Operator push down SetOperationPushDown, SamplePushDown, - ReorderJoin, + ReorderInnerJoin, + ReorderOuterInner, PushPredicateThroughJoin, PushPredicateThroughProject, PushPredicateThroughGenerate, @@ -727,7 +728,7 @@ object PushPredicateThroughAggregate extends Rule[LogicalPlan] with PredicateHel * * The order of joins will not be changed if all of them already have at least one condition. */ -object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { +object ReorderInnerJoin extends Rule[LogicalPlan] with PredicateHelper { /** * Join a list of plans together and push down the conditions into them. @@ -768,6 +769,51 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { } } + +/** + * Reorder the adjacent outer and inner joins and push inner join below left/right outer join. + * + * TODO: improve the checking conditions to cover out-of-order cases. + */ +object ReorderOuterInner extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + + case j @ Join(left @ Join(ll, lr, joinType, lCondition), right, Inner, condition) => + val leftJoinKey = j match { + case ExtractEquiJoinKeys(_, leftKeys, _, _, _, _) => leftKeys + } + val (leftLeftJoinKey, leftRightJoinKey) = left match { + case ExtractEquiJoinKeys(_, leftKeys, rightKeys, _, _, _) => + (leftKeys, rightKeys) + } + + joinType match { + case LeftOuter if leftJoinKey == leftLeftJoinKey => + Join(Join(ll, right, Inner, condition), lr, LeftOuter, lCondition) + case RightOuter if leftJoinKey == leftRightJoinKey => + Join(ll, Join(lr, right, Inner, condition), RightOuter, lCondition) + case _ => j + } + + case j @ Join(left, right @ Join(rl, rr, joinType, rCondition), Inner, condition) => + val rightJoinKey = j match { + case ExtractEquiJoinKeys(_, _, rightKey, _, _, _) => rightKey + } + val (rightLeftJoinKey, rightRightJoinKey) = right match { + case ExtractEquiJoinKeys(_, leftKeys, rightKeys, _, _, _) => + (leftKeys, rightKeys) + } + + joinType match { + case LeftOuter if rightJoinKey == rightLeftJoinKey => + Join(Join(rl, left, Inner, condition), rr, LeftOuter, rCondition) + case RightOuter if rightJoinKey == rightRightJoinKey => + Join(rl, Join(left, rr, Inner, condition), RightOuter, rCondition) + case _ => j + } + } +} + /** * Pushes down [[Filter]] operators where the `condition` can be * evaluated using only the attributes of the left or right side of a join. Other diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala index 9b1e16c72764..1de1598fe0cd 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala @@ -38,13 +38,12 @@ class JoinOrderSuite extends PlanTest { CombineFilters, PushPredicateThroughProject, BooleanSimplification, - ReorderJoin, + ReorderInnerJoin, PushPredicateThroughJoin, PushPredicateThroughGenerate, PushPredicateThroughAggregate, ColumnPruning, ProjectCollapsing) :: Nil - } val testRelation = LocalRelation('a.int, 'b.int, 'c.int) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index 39a65413bd59..077d8852e5d1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import org.apache.spark.sql.catalyst.plans.{Inner, RightOuter, LeftOuter} +import org.apache.spark.sql.catalyst.plans.logical.Join import org.apache.spark.sql.execution.joins.BroadcastHashJoin import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext @@ -118,6 +120,104 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { Row(1, 1, 1, 1) :: Row(2, 1, 2, 2) :: Nil) } + test("join - left outer + inner reordering # 1") { + val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a") + val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b") + val df3 = Seq((1, 3, "1"), (3, 6, "5")).toDF("int", "int2", "str").as("c") + + // Left Then Inner -> Inner Then Left + val leftInnerJoin = df.join(df2, $"a.int" === $"b.int", "left") + .join(df3, $"c.int" === $"a.int", "inner").select($"a.*", $"b.*", $"c.*") + + // The order before reordering: Left Then Inner + assert(leftInnerJoin.queryExecution.analyzed.collect { + case j@Join(Join(_, _, LeftOuter, _), _, Inner, _) => j + }.size === 1) + + // The order after reordering: Inner Then Left + assert(leftInnerJoin.queryExecution.optimizedPlan.collect { + case j@Join(Join(_, _, Inner, _), _, LeftOuter, _) => j + }.size === 1) + + checkAnswer( + leftInnerJoin, + Row(1, 2, "1", 1, 3, "1", 1, 3, "1") :: + Row(3, 4, "3", null, null, null, 3, 6, "5") :: Nil) + } + + test("join - left outer + inner reordering # 2") { + val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a") + val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b") + val df3 = Seq((1, 3, "1"), (3, 6, "5")).toDF("int", "int2", "str").as("c") + + // Left Then Inner -> Inner Then Left + val right = df.join(df2, $"a.int" === $"b.int", "left") + val leftInnerJoin = df3.join(right, $"c.int" === $"a.int", "inner") + .select($"a.*", $"b.*", $"c.*") + + // The order before reordering: Left Then Inner + assert(leftInnerJoin.queryExecution.analyzed.collect { + case j@Join(_, Join(_, _, LeftOuter, _), Inner, _) => j + }.size === 1) + + // The order after reordering: Inner Then Left + assert(leftInnerJoin.queryExecution.optimizedPlan.collect { + case j@Join(Join(_, _, Inner, _), _, LeftOuter, _) => j + }.size === 1) + + checkAnswer( + leftInnerJoin, + Row(1, 2, "1", 1, 3, "1", 1, 3, "1") :: + Row(3, 4, "3", null, null, null, 3, 6, "5") :: Nil) + } + + test("join - right outer + inner reordering # 1") { + val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a") + val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b") + val df3 = Seq((1, 9, "8"), (5, 0, "4")).toDF("int", "int2", "str").as("c") + + // Right Then Inner -> Inner Then Right + val rightInnerJoin = df.join(df2, $"a.int" === $"b.int", "right") + .join(df3, $"c.int" === $"b.int", "inner").select($"a.*", $"b.*", $"c.*") + + // The order before reordering: Right Then Inner + assert(rightInnerJoin.queryExecution.analyzed.collect { + case j @ Join(Join(_, _, RightOuter, _), _, Inner, _) => j }.size === 1) + + // The order after reordering: Inner Then Right + assert(rightInnerJoin.queryExecution.optimizedPlan.collect { + case j @ Join(_, Join(_, _, Inner, _), RightOuter, _) => j }.size === 1) + + checkAnswer( + rightInnerJoin, + Row(1, 2, "1", 1, 3, "1", 1, 9, "8") :: + Row(null, null, null, 5, 6, "5", 5, 0, "4") :: Nil) + } + + test("join - right outer + inner reordering #2") { + val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a") + val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b") + val df3 = Seq((1, 9, "8"), (5, 0, "4")).toDF("int", "int2", "str").as("c") + + // Right Then Inner -> Inner Then Right + val right = df.join(df2, $"a.int" === $"b.int", "right") + val rightInnerJoin = df3.join(right, $"c.int" === $"b.int", "inner") + .select($"a.*", $"b.*", $"c.*") + + // The order before reordering: Right Then Inner + assert(rightInnerJoin.queryExecution.analyzed.collect { + case j @ Join(_, Join(_, _, RightOuter, _), Inner, _) => j }.size === 1) + + // The order after reordering: Inner Then Right + assert(rightInnerJoin.queryExecution.optimizedPlan.collect { + case j @ Join(_, Join(_, _, Inner, _), RightOuter, _) => j }.size === 1) + + checkAnswer( + rightInnerJoin, + Row(1, 2, "1", 1, 3, "1", 1, 9, "8") :: + Row(null, null, null, 5, 6, "5", 5, 0, "4") :: Nil) + } + test("broadcast join hint") { val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value") val df2 = Seq((1, "1"), (2, "2")).toDF("key", "value") From d9b54115f6dbd3e815963cec702a4a8aae28c154 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 1 Jan 2016 20:18:32 -0800 Subject: [PATCH 2/5] renaming. --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index d31a53a7ac35..8e62c2d4ac66 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -45,7 +45,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { SetOperationPushDown, SamplePushDown, ReorderInnerJoin, - ReorderOuterInner, + ReorderOuterInnerJoins, PushPredicateThroughJoin, PushPredicateThroughProject, PushPredicateThroughGenerate, @@ -771,11 +771,11 @@ object ReorderInnerJoin extends Rule[LogicalPlan] with PredicateHelper { /** - * Reorder the adjacent outer and inner joins and push inner join below left/right outer join. + * Reorder the adjacent outer and inner joins and push inner join through left/right outer join. * * TODO: improve the checking conditions to cover out-of-order cases. */ -object ReorderOuterInner extends Rule[LogicalPlan] { +object ReorderOuterInnerJoins extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case j @ Join(left @ Join(ll, lr, joinType, lCondition), right, Inner, condition) => From 5f67a7476365828a5eb920b3af3ddad312799509 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 1 Jan 2016 22:31:46 -0800 Subject: [PATCH 3/5] restrict the types --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 8e62c2d4ac66..0a90f5fde408 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -778,7 +778,7 @@ object ReorderInnerJoin extends Rule[LogicalPlan] with PredicateHelper { object ReorderOuterInnerJoins extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case j @ Join(left @ Join(ll, lr, joinType, lCondition), right, Inner, condition) => + case j @ Join(left @ Join(ll, lr, LeftOuter|RightOuter, lCond), right, Inner, condition) => val leftJoinKey = j match { case ExtractEquiJoinKeys(_, leftKeys, _, _, _, _) => leftKeys } @@ -787,15 +787,15 @@ object ReorderOuterInnerJoins extends Rule[LogicalPlan] { (leftKeys, rightKeys) } - joinType match { + left.joinType match { case LeftOuter if leftJoinKey == leftLeftJoinKey => - Join(Join(ll, right, Inner, condition), lr, LeftOuter, lCondition) + Join(Join(ll, right, Inner, condition), lr, LeftOuter, lCond) case RightOuter if leftJoinKey == leftRightJoinKey => - Join(ll, Join(lr, right, Inner, condition), RightOuter, lCondition) + Join(ll, Join(lr, right, Inner, condition), RightOuter, lCond) case _ => j } - case j @ Join(left, right @ Join(rl, rr, joinType, rCondition), Inner, condition) => + case j @ Join(left, right @ Join(rl, rr, LeftOuter|RightOuter, rCond), Inner, condition) => val rightJoinKey = j match { case ExtractEquiJoinKeys(_, _, rightKey, _, _, _) => rightKey } @@ -804,11 +804,11 @@ object ReorderOuterInnerJoins extends Rule[LogicalPlan] { (leftKeys, rightKeys) } - joinType match { + right.joinType match { case LeftOuter if rightJoinKey == rightLeftJoinKey => - Join(Join(rl, left, Inner, condition), rr, LeftOuter, rCondition) + Join(Join(rl, left, Inner, condition), rr, LeftOuter, rCond) case RightOuter if rightJoinKey == rightRightJoinKey => - Join(rl, Join(left, rr, Inner, condition), RightOuter, rCondition) + Join(rl, Join(left, rr, Inner, condition), RightOuter, rCond) case _ => j } } From 70336a4fe841c1e6581b0c9cbefa6296bb9818cf Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 2 Jan 2016 12:56:19 -0800 Subject: [PATCH 4/5] support mixed order and unmatched join key --- .../sql/catalyst/optimizer/Optimizer.scala | 26 ++----- .../catalyst/optimizer/JoinOrderSuite.scala | 78 ++++++++++++++++++- .../apache/spark/sql/DataFrameJoinSuite.scala | 14 ++-- 3 files changed, 92 insertions(+), 26 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 0a90f5fde408..d9a1866b2f59 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -45,7 +45,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { SetOperationPushDown, SamplePushDown, ReorderInnerJoin, - ReorderOuterInnerJoins, + ReorderOuterInnerJoin, PushPredicateThroughJoin, PushPredicateThroughProject, PushPredicateThroughGenerate, @@ -772,42 +772,32 @@ object ReorderInnerJoin extends Rule[LogicalPlan] with PredicateHelper { /** * Reorder the adjacent outer and inner joins and push inner join through left/right outer join. - * - * TODO: improve the checking conditions to cover out-of-order cases. */ -object ReorderOuterInnerJoins extends Rule[LogicalPlan] { +object ReorderOuterInnerJoin extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case j @ Join(left @ Join(ll, lr, LeftOuter|RightOuter, lCond), right, Inner, condition) => - val leftJoinKey = j match { + val leftJoinKey: Seq[Expression] = j match { case ExtractEquiJoinKeys(_, leftKeys, _, _, _, _) => leftKeys } - val (leftLeftJoinKey, leftRightJoinKey) = left match { - case ExtractEquiJoinKeys(_, leftKeys, rightKeys, _, _, _) => - (leftKeys, rightKeys) - } left.joinType match { - case LeftOuter if leftJoinKey == leftLeftJoinKey => + case LeftOuter if leftJoinKey.forall(canEvaluate(_, ll)) => Join(Join(ll, right, Inner, condition), lr, LeftOuter, lCond) - case RightOuter if leftJoinKey == leftRightJoinKey => + case RightOuter if leftJoinKey.forall(canEvaluate(_, lr)) => Join(ll, Join(lr, right, Inner, condition), RightOuter, lCond) case _ => j } case j @ Join(left, right @ Join(rl, rr, LeftOuter|RightOuter, rCond), Inner, condition) => - val rightJoinKey = j match { + val rightJoinKey: Seq[Expression] = j match { case ExtractEquiJoinKeys(_, _, rightKey, _, _, _) => rightKey } - val (rightLeftJoinKey, rightRightJoinKey) = right match { - case ExtractEquiJoinKeys(_, leftKeys, rightKeys, _, _, _) => - (leftKeys, rightKeys) - } right.joinType match { - case LeftOuter if rightJoinKey == rightLeftJoinKey => + case LeftOuter if rightJoinKey.forall(canEvaluate(_, rl)) => Join(Join(rl, left, Inner, condition), rr, LeftOuter, rCond) - case RightOuter if rightJoinKey == rightRightJoinKey => + case RightOuter if rightJoinKey.forall(canEvaluate(_, rr)) => Join(rl, Join(left, rr, Inner, condition), RightOuter, rCond) case _ => j } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala index 1de1598fe0cd..9ac1618797d6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins -import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, PlanTest, RightOuter} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor @@ -39,6 +39,7 @@ class JoinOrderSuite extends PlanTest { PushPredicateThroughProject, BooleanSimplification, ReorderInnerJoin, + ReorderOuterInnerJoin, PushPredicateThroughJoin, PushPredicateThroughGenerate, PushPredicateThroughAggregate, @@ -91,4 +92,79 @@ class JoinOrderSuite extends PlanTest { comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer)) } + + test("reorder left and inner joins # 1") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val z = testRelation.subquery('z) + + val originalQuery = { + x.join(y, LeftOuter, Some("x.b".attr === "y.b".attr && "x.a".attr === "y.a".attr)) + .join(z, Inner, Some("x.b".attr === "z.b".attr && "x.a".attr === "z.a".attr)) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = + x.join(z, Inner, Some("x.b".attr === "z.b".attr && "x.a".attr === "z.a".attr)) + .join(y, LeftOuter, Some("x.b".attr === "y.b".attr && "x.a".attr === "y.a".attr)) + .analyze + + comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer)) + } + + test("reorder left and inner joins # 2") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val z = testRelation.subquery('z) + + val originalQuery = { + val right = x.join(y, LeftOuter, + Some("x.c".attr === "y.c".attr && "x.b".attr === "y.b".attr)) + z.join(right, Inner, Some("z.b".attr === "x.b".attr && "z.a".attr === "x.a".attr)) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = + x.join(y, Inner, Some("x.b".attr === "y.b".attr && "x.a".attr === "y.a".attr)) + .join(z, LeftOuter, Some("z.c".attr === "x.c".attr && "z.b".attr === "x.b".attr)).analyze + + comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer)) + } + + test("reorder right and inner joins # 1") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val z = testRelation.subquery('z) + + val originalQuery = { + x.join(y, RightOuter, Some("x.a".attr === "y.a".attr)) + .join(z, Inner, Some("z.c".attr === "y.c".attr)) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = + y.join(x.join(z, Inner, Some("z.c".attr === "x.c".attr)), + RightOuter, Some("x.a".attr === "y.a".attr)).analyze + + comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer)) + } + + test("reorder right and inner joins # 2") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val z = testRelation.subquery('z) + + val originalQuery = { + val right = x.join(y, RightOuter, Some("x.b".attr === "y.b".attr)) + z.join(right, Inner, Some("z.c".attr === "y.c".attr)) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = + x.join(z.join(y, Inner, Some("z.c".attr === "y.c".attr)), + RightOuter, Some("x.b".attr === "y.b".attr)).analyze + + comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer)) + } + } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index 077d8852e5d1..47e1de6370c0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -147,11 +147,11 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { test("join - left outer + inner reordering # 2") { val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a") - val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b") + val df2 = Seq((1, 2, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b") val df3 = Seq((1, 3, "1"), (3, 6, "5")).toDF("int", "int2", "str").as("c") // Left Then Inner -> Inner Then Left - val right = df.join(df2, $"a.int" === $"b.int", "left") + val right = df.join(df2, $"a.int" === $"b.int" && $"a.int2" === $"b.int2", "left") val leftInnerJoin = df3.join(right, $"c.int" === $"a.int", "inner") .select($"a.*", $"b.*", $"c.*") @@ -167,7 +167,7 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { checkAnswer( leftInnerJoin, - Row(1, 2, "1", 1, 3, "1", 1, 3, "1") :: + Row(1, 2, "1", 1, 2, "1", 1, 3, "1") :: Row(3, 4, "3", null, null, null, 3, 6, "5") :: Nil) } @@ -195,12 +195,12 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { } test("join - right outer + inner reordering #2") { - val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a") - val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b") + val df = Seq((0, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a") + val df2 = Seq((1, 2, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b") val df3 = Seq((1, 9, "8"), (5, 0, "4")).toDF("int", "int2", "str").as("c") // Right Then Inner -> Inner Then Right - val right = df.join(df2, $"a.int" === $"b.int", "right") + val right = df.join(df2, $"a.int2" === $"b.int2", "right") val rightInnerJoin = df3.join(right, $"c.int" === $"b.int", "inner") .select($"a.*", $"b.*", $"c.*") @@ -214,7 +214,7 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { checkAnswer( rightInnerJoin, - Row(1, 2, "1", 1, 3, "1", 1, 9, "8") :: + Row(0, 2, "1", 1, 2, "1", 1, 9, "8") :: Row(null, null, null, 5, 6, "5", 5, 0, "4") :: Nil) } From 7909d4ca68dda7bfa428ef2d4e7d02b2641be8c5 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 2 Jan 2016 14:34:58 -0800 Subject: [PATCH 5/5] convert right outer join to left outer join for generating left depp tree. --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 12 ++++++++++-- .../sql/catalyst/optimizer/JoinOrderSuite.scala | 8 ++++---- .../org/apache/spark/sql/DataFrameJoinSuite.scala | 8 ++++---- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index d9a1866b2f59..8d579a051ab6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -772,6 +772,14 @@ object ReorderInnerJoin extends Rule[LogicalPlan] with PredicateHelper { /** * Reorder the adjacent outer and inner joins and push inner join through left/right outer join. + * + * Basic rules are based on associativity of outer and inner joins: + * 1. (R1 left R2 on p12) inner R3 on p13 = (R1 inner R3 on p13) left R2 on p12 + * 2. (R1 right R2 on p12) inner R3 on p23 = R1 right (R2 inner R3 on p23) on p12 + * = (R2 inner R3 on p23) left R1 on p1 (<-- left deep tree is preferred) + * 3. R1 inner (R2 left R3 on p23) on p12 = (R1 inner R2 on p12) left R3 on p23 + * 4. R1 inner (R2 right R3 on p23) on p13 = R2 right (R1 inner R3 on p13) on p23 + * = (R1 inner R3 on p13) left R2 on p23 (<-- left deep tree is preferred) */ object ReorderOuterInnerJoin extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { @@ -785,7 +793,7 @@ object ReorderOuterInnerJoin extends Rule[LogicalPlan] with PredicateHelper { case LeftOuter if leftJoinKey.forall(canEvaluate(_, ll)) => Join(Join(ll, right, Inner, condition), lr, LeftOuter, lCond) case RightOuter if leftJoinKey.forall(canEvaluate(_, lr)) => - Join(ll, Join(lr, right, Inner, condition), RightOuter, lCond) + Join(Join(lr, right, Inner, condition), ll, LeftOuter, lCond) case _ => j } @@ -798,7 +806,7 @@ object ReorderOuterInnerJoin extends Rule[LogicalPlan] with PredicateHelper { case LeftOuter if rightJoinKey.forall(canEvaluate(_, rl)) => Join(Join(rl, left, Inner, condition), rr, LeftOuter, rCond) case RightOuter if rightJoinKey.forall(canEvaluate(_, rr)) => - Join(rl, Join(left, rr, Inner, condition), RightOuter, rCond) + Join(Join(left, rr, Inner, condition), rl, LeftOuter, rCond) case _ => j } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala index 9ac1618797d6..d04cec37e8c1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala @@ -143,8 +143,8 @@ class JoinOrderSuite extends PlanTest { val optimized = Optimize.execute(originalQuery.analyze) val correctAnswer = - y.join(x.join(z, Inner, Some("z.c".attr === "x.c".attr)), - RightOuter, Some("x.a".attr === "y.a".attr)).analyze + x.join(z, Inner, Some("z.c".attr === "x.c".attr)) + .join(y, LeftOuter, Some("x.a".attr === "y.a".attr)).analyze comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer)) } @@ -161,8 +161,8 @@ class JoinOrderSuite extends PlanTest { val optimized = Optimize.execute(originalQuery.analyze) val correctAnswer = - x.join(z.join(y, Inner, Some("z.c".attr === "y.c".attr)), - RightOuter, Some("x.b".attr === "y.b".attr)).analyze + z.join(y, Inner, Some("z.c".attr === "y.c".attr)) + .join(x, LeftOuter, Some("x.b".attr === "y.b".attr)).analyze comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index 47e1de6370c0..c287e3792dc6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -184,9 +184,9 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { assert(rightInnerJoin.queryExecution.analyzed.collect { case j @ Join(Join(_, _, RightOuter, _), _, Inner, _) => j }.size === 1) - // The order after reordering: Inner Then Right + // The order after reordering: Inner Then Left assert(rightInnerJoin.queryExecution.optimizedPlan.collect { - case j @ Join(_, Join(_, _, Inner, _), RightOuter, _) => j }.size === 1) + case j @ Join(Join(_, _, Inner, _), _, LeftOuter, _) => j }.size === 1) checkAnswer( rightInnerJoin, @@ -208,9 +208,9 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { assert(rightInnerJoin.queryExecution.analyzed.collect { case j @ Join(_, Join(_, _, RightOuter, _), Inner, _) => j }.size === 1) - // The order after reordering: Inner Then Right + // The order after reordering: Inner Then Left assert(rightInnerJoin.queryExecution.optimizedPlan.collect { - case j @ Join(_, Join(_, _, Inner, _), RightOuter, _) => j }.size === 1) + case j @ Join(Join(_, _, Inner, _), _, LeftOuter, _) => j }.size === 1) checkAnswer( rightInnerJoin,