Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
01e4cdf
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 13, 2015
6835704
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 14, 2015
9180687
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 14, 2015
b38a21e
SPARK-11633
gatorsmile Nov 17, 2015
d2b84af
Merge remote-tracking branch 'upstream/master' into joinMakeCopy
gatorsmile Nov 17, 2015
fda8025
Merge remote-tracking branch 'upstream/master'
gatorspark Nov 17, 2015
ac0dccd
Merge branch 'master' of https://github.com/gatorsmile/spark
gatorspark Nov 17, 2015
6e0018b
Merge remote-tracking branch 'upstream/master'
Nov 20, 2015
0546772
converge
gatorsmile Nov 20, 2015
b37a64f
converge
gatorsmile Nov 20, 2015
bde74f8
Merge remote-tracking branch 'upstream/master' into OuterJoinEliminat…
gatorsmile Jan 3, 2016
e18ba75
Merge remote-tracking branch 'upstream/master' into OuterJoinEliminat…
gatorsmile Jan 4, 2016
d6a6e9c
outer join elimination by parent join.
gatorsmile Jan 4, 2016
c2a872c
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
ab6dbd7
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
4276356
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
2dab708
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 7, 2016
0458770
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 8, 2016
1debdfa
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 9, 2016
763706d
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 14, 2016
4de6ec1
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 18, 2016
9422a4f
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 19, 2016
52bdf48
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 20, 2016
1e95df3
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 23, 2016
fab24cf
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 1, 2016
8b2e33b
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 5, 2016
2ee1876
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 11, 2016
b9f0090
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 12, 2016
ade6f7e
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 15, 2016
9fd63d2
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 19, 2016
5199d49
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 22, 2016
404214c
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 23, 2016
c001dd9
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 25, 2016
4c657fd
integrate it into the existing outer-join elimination.
gatorsmile Feb 25, 2016
1a9ebdf
integrate it into the existing outer-join elimination.
gatorsmile Feb 25, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -945,16 +945,32 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
}

/**
* Elimination of outer joins, if the predicates can restrict the result sets so that
* all null-supplying rows are eliminated
* Elimination of Outer Joins
*
* Rule Set 1: checking the Filter Condition if the predicates can restrict the result sets
* so that all null-supplying rows are eliminated
*
* - full outer -> inner if both sides have such predicates
* - left outer -> inner if the right side has such predicates
* - right outer -> inner if the left side has such predicates
* - full outer -> left outer if only the left side has such predicates
* - full outer -> right outer if only the right side has such predicates
*
* This rule should be executed before pushing down the Filter
* This rule set should be executed before pushing down the Filter
*
* Rule Set 2: given an outer join is involved in another join (called parent join), when the join
* type of the parent join is inner, left-semi, left-outer and right-outer, checking if the join
* condition of the parent join satisfies the following two conditions:
* 1) there exist null filtering predicates against the columns in the null-supplying side of
* parent join.
* 2) these columns are from the child join.
*
* If having such join predicates, execute the elimination rules:
* - full outer -> inner if both sides of the child join have such predicates
* - left outer -> inner if the right side of the child join has such predicates
* - right outer -> inner if the left side of the child join has such predicates
* - full outer -> left outer if only the left side of the child join has such predicates
* - full outer -> right outer if only the right side of the child join has such predicates
*/
object OuterJoinElimination extends Rule[LogicalPlan] with PredicateHelper {

Expand All @@ -969,18 +985,21 @@ object OuterJoinElimination extends Rule[LogicalPlan] with PredicateHelper {
v == null || v == false
}

private def buildNewJoinType(filter: Filter, join: Join): JoinType = {
val splitConjunctiveConditions: Seq[Expression] = splitConjunctivePredicates(filter.condition)
private def buildNewJoinType(
condition: Expression,
constraints: Set[Expression],
join: Join): JoinType = {
val splitConjunctiveConditions: Seq[Expression] = splitConjunctivePredicates(condition)
val leftConditions = splitConjunctiveConditions
.filter(_.references.subsetOf(join.left.outputSet))
val rightConditions = splitConjunctiveConditions
.filter(_.references.subsetOf(join.right.outputSet))

val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull) ||
filter.constraints.filter(_.isInstanceOf[IsNotNull])
constraints.filter(_.isInstanceOf[IsNotNull])
.exists(expr => join.left.outputSet.intersect(expr.references).nonEmpty)
val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull) ||
filter.constraints.filter(_.isInstanceOf[IsNotNull])
constraints.filter(_.isInstanceOf[IsNotNull])
.exists(expr => join.right.outputSet.intersect(expr.references).nonEmpty)

join.joinType match {
Expand All @@ -994,9 +1013,37 @@ object OuterJoinElimination extends Rule[LogicalPlan] with PredicateHelper {
}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Rule Set 1: elimination using Filter conditions/constraints
case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) =>
val newJoinType = buildNewJoinType(f, j)
val newJoinType = buildNewJoinType(f.condition, f.constraints, j)
if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))

// Rule Set 2: elimination using Parent Join conditions/constraints
// Case 1: when parent join is Inner|LeftSemi|LeftOuter and the child join is on the right side
case pj @ Join(
_,
j @ Join(left, right, RightOuter|LeftOuter|FullOuter, condition),
Inner | LeftSemi | LeftOuter,
Some(pJoinCond)) =>
val newJoinType = buildNewJoinType(pJoinCond, pj.constraints, j)
if (j.joinType == newJoinType) {
pj
} else {
Join(pj.left, j.copy(joinType = newJoinType), pj.joinType, pj.condition)
}

// Case 2: when parent join is Inner|LeftSemi|RightOuter and the child join is on the left side
case pj @ Join(
j @ Join(left, right, RightOuter|LeftOuter|FullOuter, condition),
_,
Inner | LeftSemi | RightOuter,
Some(pJoinCond)) =>
val newJoinType = buildNewJoinType(pJoinCond, pj.constraints, j)
if (j.joinType == newJoinType) {
pj
} else {
Join(j.copy(joinType = newJoinType), pj.right, pj.joinType, pj.condition)
}
}
}

Expand Down
157 changes: 155 additions & 2 deletions sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.spark.sql

import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical.Join
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{Join, Project}
import org.apache.spark.sql.execution.joins.BroadcastHashJoin
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
Expand Down Expand Up @@ -204,4 +204,157 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
leftJoin2Inner,
Row(1, 2, "1", 1, 3, "1") :: Nil)
}

test("join - left outer to inner by the parent join's join condition") {
val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a")
val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b")
val df3 = Seq((1, 3, "1"), (3, 6, "5")).toDF("int", "int2", "str").as("c")

// Left -> Inner
val right = df.join(df2, $"a.int" === $"b.int", "left")
val left2Inner =
df3.join(right, $"c.int" === $"b.int", "inner").select($"a.*", $"b.*", $"c.*")

left2Inner.explain(true)

// The order before conversion: Left Then Inner
assert(left2Inner.queryExecution.analyzed.collect {
case j @ Join(_, Join(_, _, LeftOuter, _), Inner, _) => j
}.size === 1)

// The order after conversion: Inner Then Inner
assert(left2Inner.queryExecution.optimizedPlan.collect {
case j @ Join(_, Join(_, _, Inner, _), Inner, _) => j
}.size === 1)

checkAnswer(
left2Inner,
Row(1, 2, "1", 1, 3, "1", 1, 3, "1") :: Nil)
}

test("join - right outer to inner by the parent join's join condition") {
val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a")
val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b")
val df3 = Seq((1, 9, "8"), (5, 0, "4")).toDF("int", "int2", "str").as("c")

// Right Then Inner -> Inner Then Right
val right2Inner = df.join(df2, $"a.int" === $"b.int", "right")
.join(df3, $"a.int" === $"b.int", "inner").select($"a.*", $"b.*", $"c.*")

// The order before conversion: Left Then Inner
assert(right2Inner.queryExecution.analyzed.collect {
case j @ Join(Join(_, _, RightOuter, _), _, Inner, _) => j
}.size === 1)

// The order after conversion: Inner Then Inner
assert(right2Inner.queryExecution.optimizedPlan.collect {
case j @ Join(Join(_, _, Inner, _), _, Inner, _) => j
}.size === 1)

checkAnswer(
right2Inner,
Row(1, 2, "1", 1, 3, "1", 1, 9, "8") ::
Row(1, 2, "1", 1, 3, "1", 5, 0, "4") :: Nil)
}

test("join - full outer to inner by the parent join's join condition") {
val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a")
val df2 = Seq((1, 2, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b")
val df3 = Seq((1, 3, "1"), (3, 6, "5")).toDF("int", "int2", "str").as("c")

// Full -> Inner
val right = df.join(df2, $"a.int" === $"b.int", "full")
val full2Inner = df3.join(right, $"c.int" === $"a.int" && $"b.int" === 1, "inner")
.select($"a.*", $"b.*", $"c.*")

// The order before conversion: Left Then Inner
assert(full2Inner.queryExecution.analyzed.collect {
case j @ Join(_, Join(_, _, FullOuter, _), Inner, _) => j
}.size === 1)

// The order after conversion: Inner Then Inner
assert(full2Inner.queryExecution.optimizedPlan.collect {
case j @ Join(_, Join(_, _, Inner, _), Inner, _) => j
}.size === 1)

checkAnswer(
full2Inner,
Row(1, 2, "1", 1, 2, "1", 1, 3, "1") :: Nil)
}

test("join - full outer to right by the parent join's join condition") {
val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a")
val df2 = Seq((1, 2, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b")
val df3 = Seq((1, 3, "1"), (3, 6, "5")).toDF("int", "int2", "str").as("c")

// Full -> Right
val right = df.join(df2, $"a.int" === $"b.int", "full")
val full2Right = df3.join(right, $"b.int" === 1, "leftsemi")

// The order before conversion: Left Then Inner
assert(full2Right.queryExecution.analyzed.collect {
case j @ Join(_, Join(_, _, FullOuter, _), LeftSemi, _) => j
}.size === 1)

// The order after conversion: Inner Then Inner
assert(full2Right.queryExecution.optimizedPlan.collect {
case j @ Join(_, Project(_, Join(_, _, RightOuter, _)), LeftSemi, _) => j
}.size === 1)

checkAnswer(
full2Right,
Row(1, 3, "1") :: Row(3, 6, "5") :: Nil)
}


test("join - full outer to left by the parent join's join condition #1") {
val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a")
val df2 = Seq((1, 2, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b")
val df3 = Seq((1, 3, "1"), (4, 6, "5")).toDF("int", "int2", "str").as("c")

// Full -> Left
val right = df.join(df2, $"a.int" === $"b.int", "full")
val full2Left = df3.join(right, lit(3) === $"a.int", "left")
.select($"a.*", $"b.*", $"c.*")

// The order before conversion: Full Then Left
assert(full2Left.queryExecution.analyzed.collect {
case j @ Join(_, Join(_, _, FullOuter, _), LeftOuter, _) => j
}.size === 1)

// The order after conversion: Left Then Left
assert(full2Left.queryExecution.optimizedPlan.collect {
case j @ Join(_, Join(_, _, LeftOuter, _), LeftOuter, _) => j
}.size === 1)

checkAnswer(
full2Left,
Row(3, 4, "3", null, null, null, 1, 3, "1") ::
Row(3, 4, "3", null, null, null, 4, 6, "5") :: Nil)
}

test("join - full outer to left by the parent join's join condition #2") {
val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a")
val df2 = Seq((1, 2, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b")
val df3 = Seq((1, 3, "1"), (4, 6, "5")).toDF("int", "int2", "str").as("c")

// Full -> Left
val full2Left = df.join(df2, $"a.int" === $"b.int", "full")
.join(df3, lit(3) === $"a.int", "right").select($"a.*", $"b.*", $"c.*")

// The order before conversion: Full Then Right
assert(full2Left.queryExecution.analyzed.collect {
case j @ Join(Join(_, _, FullOuter, _), _, RightOuter, _) => j
}.size === 1)

// The order after conversion: Left Then Right
assert(full2Left.queryExecution.optimizedPlan.collect {
case j @ Join(Join(_, _, LeftOuter, _), _, RightOuter, _) => j
}.size === 1)

checkAnswer(
full2Left,
Row(3, 4, "3", null, null, null, 1, 3, "1") ::
Row(3, 4, "3", null, null, null, 4, 6, "5") :: Nil)
}
}