Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.immutable.HashSet
import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, EliminateSubQueries}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, ExtractFiltersAndInnerJoins}
import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
Expand All @@ -44,7 +44,8 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
// Operator push down
SetOperationPushDown,
SamplePushDown,
ReorderJoin,
ReorderInnerJoin,
ReorderOuterInnerJoin,
PushPredicateThroughJoin,
PushPredicateThroughProject,
PushPredicateThroughGenerate,
Expand Down Expand Up @@ -727,7 +728,7 @@ object PushPredicateThroughAggregate extends Rule[LogicalPlan] with PredicateHel
*
* The order of joins will not be changed if all of them already have at least one condition.
*/
object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
object ReorderInnerJoin extends Rule[LogicalPlan] with PredicateHelper {

/**
* Join a list of plans together and push down the conditions into them.
Expand Down Expand Up @@ -768,6 +769,49 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
}
}


/**
* Reorder the adjacent outer and inner joins and push inner join through left/right outer join.
*
* Basic rules are based on associativity of outer and inner joins:
* 1. (R1 left R2 on p12) inner R3 on p13 = (R1 inner R3 on p13) left R2 on p12
* 2. (R1 right R2 on p12) inner R3 on p23 = R1 right (R2 inner R3 on p23) on p12
* = (R2 inner R3 on p23) left R1 on p1 (<-- left deep tree is preferred)
* 3. R1 inner (R2 left R3 on p23) on p12 = (R1 inner R2 on p12) left R3 on p23
* 4. R1 inner (R2 right R3 on p23) on p13 = R2 right (R1 inner R3 on p13) on p23
* = (R1 inner R3 on p13) left R2 on p23 (<-- left deep tree is preferred)
*/
object ReorderOuterInnerJoin extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {

case j @ Join(left @ Join(ll, lr, LeftOuter|RightOuter, lCond), right, Inner, condition) =>
val leftJoinKey: Seq[Expression] = j match {
case ExtractEquiJoinKeys(_, leftKeys, _, _, _, _) => leftKeys
}

left.joinType match {
case LeftOuter if leftJoinKey.forall(canEvaluate(_, ll)) =>
Join(Join(ll, right, Inner, condition), lr, LeftOuter, lCond)
case RightOuter if leftJoinKey.forall(canEvaluate(_, lr)) =>
Join(Join(lr, right, Inner, condition), ll, LeftOuter, lCond)
case _ => j
}

case j @ Join(left, right @ Join(rl, rr, LeftOuter|RightOuter, rCond), Inner, condition) =>
val rightJoinKey: Seq[Expression] = j match {
case ExtractEquiJoinKeys(_, _, rightKey, _, _, _) => rightKey
}

right.joinType match {
case LeftOuter if rightJoinKey.forall(canEvaluate(_, rl)) =>
Join(Join(rl, left, Inner, condition), rr, LeftOuter, rCond)
case RightOuter if rightJoinKey.forall(canEvaluate(_, rr)) =>
Join(Join(left, rr, Inner, condition), rl, LeftOuter, rCond)
case _ => j
}
}
}

/**
* Pushes down [[Filter]] operators where the `condition` can be
* evaluated using only the attributes of the left or right side of a join. Other
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, PlanTest, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor

Expand All @@ -38,13 +38,13 @@ class JoinOrderSuite extends PlanTest {
CombineFilters,
PushPredicateThroughProject,
BooleanSimplification,
ReorderJoin,
ReorderInnerJoin,
ReorderOuterInnerJoin,
PushPredicateThroughJoin,
PushPredicateThroughGenerate,
PushPredicateThroughAggregate,
ColumnPruning,
ProjectCollapsing) :: Nil

}

val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
Expand Down Expand Up @@ -92,4 +92,79 @@ class JoinOrderSuite extends PlanTest {

comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer))
}

test("reorder left and inner joins # 1") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val z = testRelation.subquery('z)

val originalQuery = {
x.join(y, LeftOuter, Some("x.b".attr === "y.b".attr && "x.a".attr === "y.a".attr))
.join(z, Inner, Some("x.b".attr === "z.b".attr && "x.a".attr === "z.a".attr))
}

val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer =
x.join(z, Inner, Some("x.b".attr === "z.b".attr && "x.a".attr === "z.a".attr))
.join(y, LeftOuter, Some("x.b".attr === "y.b".attr && "x.a".attr === "y.a".attr))
.analyze

comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer))
}

test("reorder left and inner joins # 2") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val z = testRelation.subquery('z)

val originalQuery = {
val right = x.join(y, LeftOuter,
Some("x.c".attr === "y.c".attr && "x.b".attr === "y.b".attr))
z.join(right, Inner, Some("z.b".attr === "x.b".attr && "z.a".attr === "x.a".attr))
}

val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer =
x.join(y, Inner, Some("x.b".attr === "y.b".attr && "x.a".attr === "y.a".attr))
.join(z, LeftOuter, Some("z.c".attr === "x.c".attr && "z.b".attr === "x.b".attr)).analyze

comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer))
}

test("reorder right and inner joins # 1") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val z = testRelation.subquery('z)

val originalQuery = {
x.join(y, RightOuter, Some("x.a".attr === "y.a".attr))
.join(z, Inner, Some("z.c".attr === "y.c".attr))
}

val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer =
x.join(z, Inner, Some("z.c".attr === "x.c".attr))
.join(y, LeftOuter, Some("x.a".attr === "y.a".attr)).analyze

comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer))
}

test("reorder right and inner joins # 2") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val z = testRelation.subquery('z)

val originalQuery = {
val right = x.join(y, RightOuter, Some("x.b".attr === "y.b".attr))
z.join(right, Inner, Some("z.c".attr === "y.c".attr))
}

val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer =
z.join(y, Inner, Some("z.c".attr === "y.c".attr))
.join(x, LeftOuter, Some("x.b".attr === "y.b".attr)).analyze

comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer))
}

}
100 changes: 100 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql

import org.apache.spark.sql.catalyst.plans.{Inner, RightOuter, LeftOuter}
import org.apache.spark.sql.catalyst.plans.logical.Join
import org.apache.spark.sql.execution.joins.BroadcastHashJoin
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
Expand Down Expand Up @@ -118,6 +120,104 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
Row(1, 1, 1, 1) :: Row(2, 1, 2, 2) :: Nil)
}

test("join - left outer + inner reordering # 1") {
val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a")
val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b")
val df3 = Seq((1, 3, "1"), (3, 6, "5")).toDF("int", "int2", "str").as("c")

// Left Then Inner -> Inner Then Left
val leftInnerJoin = df.join(df2, $"a.int" === $"b.int", "left")
.join(df3, $"c.int" === $"a.int", "inner").select($"a.*", $"b.*", $"c.*")

// The order before reordering: Left Then Inner
assert(leftInnerJoin.queryExecution.analyzed.collect {
case j@Join(Join(_, _, LeftOuter, _), _, Inner, _) => j
}.size === 1)

// The order after reordering: Inner Then Left
assert(leftInnerJoin.queryExecution.optimizedPlan.collect {
case j@Join(Join(_, _, Inner, _), _, LeftOuter, _) => j
}.size === 1)

checkAnswer(
leftInnerJoin,
Row(1, 2, "1", 1, 3, "1", 1, 3, "1") ::
Row(3, 4, "3", null, null, null, 3, 6, "5") :: Nil)
}

test("join - left outer + inner reordering # 2") {
val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a")
val df2 = Seq((1, 2, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b")
val df3 = Seq((1, 3, "1"), (3, 6, "5")).toDF("int", "int2", "str").as("c")

// Left Then Inner -> Inner Then Left
val right = df.join(df2, $"a.int" === $"b.int" && $"a.int2" === $"b.int2", "left")
val leftInnerJoin = df3.join(right, $"c.int" === $"a.int", "inner")
.select($"a.*", $"b.*", $"c.*")

// The order before reordering: Left Then Inner
assert(leftInnerJoin.queryExecution.analyzed.collect {
case j@Join(_, Join(_, _, LeftOuter, _), Inner, _) => j
}.size === 1)

// The order after reordering: Inner Then Left
assert(leftInnerJoin.queryExecution.optimizedPlan.collect {
case j@Join(Join(_, _, Inner, _), _, LeftOuter, _) => j
}.size === 1)

checkAnswer(
leftInnerJoin,
Row(1, 2, "1", 1, 2, "1", 1, 3, "1") ::
Row(3, 4, "3", null, null, null, 3, 6, "5") :: Nil)
}

test("join - right outer + inner reordering # 1") {
val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a")
val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b")
val df3 = Seq((1, 9, "8"), (5, 0, "4")).toDF("int", "int2", "str").as("c")

// Right Then Inner -> Inner Then Right
val rightInnerJoin = df.join(df2, $"a.int" === $"b.int", "right")
.join(df3, $"c.int" === $"b.int", "inner").select($"a.*", $"b.*", $"c.*")

// The order before reordering: Right Then Inner
assert(rightInnerJoin.queryExecution.analyzed.collect {
case j @ Join(Join(_, _, RightOuter, _), _, Inner, _) => j }.size === 1)

// The order after reordering: Inner Then Left
assert(rightInnerJoin.queryExecution.optimizedPlan.collect {
case j @ Join(Join(_, _, Inner, _), _, LeftOuter, _) => j }.size === 1)

checkAnswer(
rightInnerJoin,
Row(1, 2, "1", 1, 3, "1", 1, 9, "8") ::
Row(null, null, null, 5, 6, "5", 5, 0, "4") :: Nil)
}

test("join - right outer + inner reordering #2") {
val df = Seq((0, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a")
val df2 = Seq((1, 2, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b")
val df3 = Seq((1, 9, "8"), (5, 0, "4")).toDF("int", "int2", "str").as("c")

// Right Then Inner -> Inner Then Right
val right = df.join(df2, $"a.int2" === $"b.int2", "right")
val rightInnerJoin = df3.join(right, $"c.int" === $"b.int", "inner")
.select($"a.*", $"b.*", $"c.*")

// The order before reordering: Right Then Inner
assert(rightInnerJoin.queryExecution.analyzed.collect {
case j @ Join(_, Join(_, _, RightOuter, _), Inner, _) => j }.size === 1)

// The order after reordering: Inner Then Left
assert(rightInnerJoin.queryExecution.optimizedPlan.collect {
case j @ Join(Join(_, _, Inner, _), _, LeftOuter, _) => j }.size === 1)

checkAnswer(
rightInnerJoin,
Row(0, 2, "1", 1, 2, "1", 1, 9, "8") ::
Row(null, null, null, 5, 6, "5", 5, 0, "4") :: Nil)
}

test("broadcast join hint") {
val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
val df2 = Seq((1, "1"), (2, "2")).toDF("key", "value")
Expand Down