diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 0b1c74293bb8..8d579a051ab6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -22,7 +22,7 @@ import scala.collection.immutable.HashSet import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, EliminateSubQueries} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins +import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, ExtractFiltersAndInnerJoins} import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ @@ -44,7 +44,8 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { // Operator push down SetOperationPushDown, SamplePushDown, - ReorderJoin, + ReorderInnerJoin, + ReorderOuterInnerJoin, PushPredicateThroughJoin, PushPredicateThroughProject, PushPredicateThroughGenerate, @@ -727,7 +728,7 @@ object PushPredicateThroughAggregate extends Rule[LogicalPlan] with PredicateHel * * The order of joins will not be changed if all of them already have at least one condition. */ -object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { +object ReorderInnerJoin extends Rule[LogicalPlan] with PredicateHelper { /** * Join a list of plans together and push down the conditions into them. @@ -768,6 +769,49 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { } } + +/** + * Reorder the adjacent outer and inner joins and push inner join through left/right outer join. + * + * Basic rules are based on associativity of outer and inner joins: + * 1. (R1 left R2 on p12) inner R3 on p13 = (R1 inner R3 on p13) left R2 on p12 + * 2. (R1 right R2 on p12) inner R3 on p23 = R1 right (R2 inner R3 on p23) on p12 + * = (R2 inner R3 on p23) left R1 on p1 (<-- left deep tree is preferred) + * 3. R1 inner (R2 left R3 on p23) on p12 = (R1 inner R2 on p12) left R3 on p23 + * 4. R1 inner (R2 right R3 on p23) on p13 = R2 right (R1 inner R3 on p13) on p23 + * = (R1 inner R3 on p13) left R2 on p23 (<-- left deep tree is preferred) + */ +object ReorderOuterInnerJoin extends Rule[LogicalPlan] with PredicateHelper { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + + case j @ Join(left @ Join(ll, lr, LeftOuter|RightOuter, lCond), right, Inner, condition) => + val leftJoinKey: Seq[Expression] = j match { + case ExtractEquiJoinKeys(_, leftKeys, _, _, _, _) => leftKeys + } + + left.joinType match { + case LeftOuter if leftJoinKey.forall(canEvaluate(_, ll)) => + Join(Join(ll, right, Inner, condition), lr, LeftOuter, lCond) + case RightOuter if leftJoinKey.forall(canEvaluate(_, lr)) => + Join(Join(lr, right, Inner, condition), ll, LeftOuter, lCond) + case _ => j + } + + case j @ Join(left, right @ Join(rl, rr, LeftOuter|RightOuter, rCond), Inner, condition) => + val rightJoinKey: Seq[Expression] = j match { + case ExtractEquiJoinKeys(_, _, rightKey, _, _, _) => rightKey + } + + right.joinType match { + case LeftOuter if rightJoinKey.forall(canEvaluate(_, rl)) => + Join(Join(rl, left, Inner, condition), rr, LeftOuter, rCond) + case RightOuter if rightJoinKey.forall(canEvaluate(_, rr)) => + Join(Join(left, rr, Inner, condition), rl, LeftOuter, rCond) + case _ => j + } + } +} + /** * Pushes down [[Filter]] operators where the `condition` can be * evaluated using only the attributes of the left or right side of a join. Other diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala index 9b1e16c72764..d04cec37e8c1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins -import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, PlanTest, RightOuter} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor @@ -38,13 +38,13 @@ class JoinOrderSuite extends PlanTest { CombineFilters, PushPredicateThroughProject, BooleanSimplification, - ReorderJoin, + ReorderInnerJoin, + ReorderOuterInnerJoin, PushPredicateThroughJoin, PushPredicateThroughGenerate, PushPredicateThroughAggregate, ColumnPruning, ProjectCollapsing) :: Nil - } val testRelation = LocalRelation('a.int, 'b.int, 'c.int) @@ -92,4 +92,79 @@ class JoinOrderSuite extends PlanTest { comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer)) } + + test("reorder left and inner joins # 1") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val z = testRelation.subquery('z) + + val originalQuery = { + x.join(y, LeftOuter, Some("x.b".attr === "y.b".attr && "x.a".attr === "y.a".attr)) + .join(z, Inner, Some("x.b".attr === "z.b".attr && "x.a".attr === "z.a".attr)) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = + x.join(z, Inner, Some("x.b".attr === "z.b".attr && "x.a".attr === "z.a".attr)) + .join(y, LeftOuter, Some("x.b".attr === "y.b".attr && "x.a".attr === "y.a".attr)) + .analyze + + comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer)) + } + + test("reorder left and inner joins # 2") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val z = testRelation.subquery('z) + + val originalQuery = { + val right = x.join(y, LeftOuter, + Some("x.c".attr === "y.c".attr && "x.b".attr === "y.b".attr)) + z.join(right, Inner, Some("z.b".attr === "x.b".attr && "z.a".attr === "x.a".attr)) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = + x.join(y, Inner, Some("x.b".attr === "y.b".attr && "x.a".attr === "y.a".attr)) + .join(z, LeftOuter, Some("z.c".attr === "x.c".attr && "z.b".attr === "x.b".attr)).analyze + + comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer)) + } + + test("reorder right and inner joins # 1") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val z = testRelation.subquery('z) + + val originalQuery = { + x.join(y, RightOuter, Some("x.a".attr === "y.a".attr)) + .join(z, Inner, Some("z.c".attr === "y.c".attr)) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = + x.join(z, Inner, Some("z.c".attr === "x.c".attr)) + .join(y, LeftOuter, Some("x.a".attr === "y.a".attr)).analyze + + comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer)) + } + + test("reorder right and inner joins # 2") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val z = testRelation.subquery('z) + + val originalQuery = { + val right = x.join(y, RightOuter, Some("x.b".attr === "y.b".attr)) + z.join(right, Inner, Some("z.c".attr === "y.c".attr)) + } + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = + z.join(y, Inner, Some("z.c".attr === "y.c".attr)) + .join(x, LeftOuter, Some("x.b".attr === "y.b".attr)).analyze + + comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer)) + } + } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index 39a65413bd59..c287e3792dc6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import org.apache.spark.sql.catalyst.plans.{Inner, RightOuter, LeftOuter} +import org.apache.spark.sql.catalyst.plans.logical.Join import org.apache.spark.sql.execution.joins.BroadcastHashJoin import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext @@ -118,6 +120,104 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { Row(1, 1, 1, 1) :: Row(2, 1, 2, 2) :: Nil) } + test("join - left outer + inner reordering # 1") { + val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a") + val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b") + val df3 = Seq((1, 3, "1"), (3, 6, "5")).toDF("int", "int2", "str").as("c") + + // Left Then Inner -> Inner Then Left + val leftInnerJoin = df.join(df2, $"a.int" === $"b.int", "left") + .join(df3, $"c.int" === $"a.int", "inner").select($"a.*", $"b.*", $"c.*") + + // The order before reordering: Left Then Inner + assert(leftInnerJoin.queryExecution.analyzed.collect { + case j@Join(Join(_, _, LeftOuter, _), _, Inner, _) => j + }.size === 1) + + // The order after reordering: Inner Then Left + assert(leftInnerJoin.queryExecution.optimizedPlan.collect { + case j@Join(Join(_, _, Inner, _), _, LeftOuter, _) => j + }.size === 1) + + checkAnswer( + leftInnerJoin, + Row(1, 2, "1", 1, 3, "1", 1, 3, "1") :: + Row(3, 4, "3", null, null, null, 3, 6, "5") :: Nil) + } + + test("join - left outer + inner reordering # 2") { + val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a") + val df2 = Seq((1, 2, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b") + val df3 = Seq((1, 3, "1"), (3, 6, "5")).toDF("int", "int2", "str").as("c") + + // Left Then Inner -> Inner Then Left + val right = df.join(df2, $"a.int" === $"b.int" && $"a.int2" === $"b.int2", "left") + val leftInnerJoin = df3.join(right, $"c.int" === $"a.int", "inner") + .select($"a.*", $"b.*", $"c.*") + + // The order before reordering: Left Then Inner + assert(leftInnerJoin.queryExecution.analyzed.collect { + case j@Join(_, Join(_, _, LeftOuter, _), Inner, _) => j + }.size === 1) + + // The order after reordering: Inner Then Left + assert(leftInnerJoin.queryExecution.optimizedPlan.collect { + case j@Join(Join(_, _, Inner, _), _, LeftOuter, _) => j + }.size === 1) + + checkAnswer( + leftInnerJoin, + Row(1, 2, "1", 1, 2, "1", 1, 3, "1") :: + Row(3, 4, "3", null, null, null, 3, 6, "5") :: Nil) + } + + test("join - right outer + inner reordering # 1") { + val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a") + val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b") + val df3 = Seq((1, 9, "8"), (5, 0, "4")).toDF("int", "int2", "str").as("c") + + // Right Then Inner -> Inner Then Right + val rightInnerJoin = df.join(df2, $"a.int" === $"b.int", "right") + .join(df3, $"c.int" === $"b.int", "inner").select($"a.*", $"b.*", $"c.*") + + // The order before reordering: Right Then Inner + assert(rightInnerJoin.queryExecution.analyzed.collect { + case j @ Join(Join(_, _, RightOuter, _), _, Inner, _) => j }.size === 1) + + // The order after reordering: Inner Then Left + assert(rightInnerJoin.queryExecution.optimizedPlan.collect { + case j @ Join(Join(_, _, Inner, _), _, LeftOuter, _) => j }.size === 1) + + checkAnswer( + rightInnerJoin, + Row(1, 2, "1", 1, 3, "1", 1, 9, "8") :: + Row(null, null, null, 5, 6, "5", 5, 0, "4") :: Nil) + } + + test("join - right outer + inner reordering #2") { + val df = Seq((0, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a") + val df2 = Seq((1, 2, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b") + val df3 = Seq((1, 9, "8"), (5, 0, "4")).toDF("int", "int2", "str").as("c") + + // Right Then Inner -> Inner Then Right + val right = df.join(df2, $"a.int2" === $"b.int2", "right") + val rightInnerJoin = df3.join(right, $"c.int" === $"b.int", "inner") + .select($"a.*", $"b.*", $"c.*") + + // The order before reordering: Right Then Inner + assert(rightInnerJoin.queryExecution.analyzed.collect { + case j @ Join(_, Join(_, _, RightOuter, _), Inner, _) => j }.size === 1) + + // The order after reordering: Inner Then Left + assert(rightInnerJoin.queryExecution.optimizedPlan.collect { + case j @ Join(Join(_, _, Inner, _), _, LeftOuter, _) => j }.size === 1) + + checkAnswer( + rightInnerJoin, + Row(0, 2, "1", 1, 2, "1", 1, 9, "8") :: + Row(null, null, null, 5, 6, "5", 5, 0, "4") :: Nil) + } + test("broadcast join hint") { val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value") val df2 = Seq((1, "1"), (2, "2")).toDF("key", "value")