Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
01e4cdf
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 13, 2015
6835704
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 14, 2015
9180687
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 14, 2015
b38a21e
SPARK-11633
gatorsmile Nov 17, 2015
d2b84af
Merge remote-tracking branch 'upstream/master' into joinMakeCopy
gatorsmile Nov 17, 2015
fda8025
Merge remote-tracking branch 'upstream/master'
gatorspark Nov 17, 2015
ac0dccd
Merge branch 'master' of https://github.com/gatorsmile/spark
gatorspark Nov 17, 2015
6e0018b
Merge remote-tracking branch 'upstream/master'
Nov 20, 2015
0546772
converge
gatorsmile Nov 20, 2015
b37a64f
converge
gatorsmile Nov 20, 2015
c2a872c
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
ab6dbd7
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
4276356
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
2dab708
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 7, 2016
0458770
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 8, 2016
1debdfa
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 9, 2016
763706d
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 14, 2016
4de6ec1
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 18, 2016
9422a4f
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 19, 2016
52bdf48
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 20, 2016
1e95df3
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 23, 2016
fab24cf
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 1, 2016
8b2e33b
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 5, 2016
2ee1876
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 11, 2016
b9f0090
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 12, 2016
ade6f7e
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 15, 2016
9fd63d2
Merge remote-tracking branch 'upstream/master'
gatorsmile Feb 19, 2016
a5f1c49
outer join elimination by Filter condition
gatorsmile Jan 4, 2016
a61272a
style fix.
gatorsmile Jan 4, 2016
aadb8e3
use the latest null propagation.
gatorsmile Feb 11, 2016
75eface
style fix.
gatorsmile Feb 11, 2016
618f128
added canFilterOutNull to the rule
gatorsmile Feb 18, 2016
6977fdf
style fix.
gatorsmile Feb 18, 2016
cc0262c
address comments.
gatorsmile Feb 19, 2016
82357e0
address comments.
gatorsmile Feb 20, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import scala.collection.immutable.HashSet

import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, EliminateSubQueries}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
import org.apache.spark.sql.catalyst.planning.{ExtractFiltersAndInnerJoins, Unions}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -62,6 +62,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
SetOperationPushDown,
SamplePushDown,
ReorderJoin,
OuterJoinElimination,
PushPredicateThroughJoin,
PushPredicateThroughProject,
PushPredicateThroughGenerate,
Expand Down Expand Up @@ -931,6 +932,62 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
}
}

/**
* Elimination of outer joins, if the predicates can restrict the result sets so that
* all null-supplying rows are eliminated
*
* - full outer -> inner if both sides have such predicates
* - left outer -> inner if the right side has such predicates
* - right outer -> inner if the left side has such predicates
* - full outer -> left outer if only the left side has such predicates
* - full outer -> right outer if only the right side has such predicates
*
* This rule should be executed before pushing down the Filter
*/
object OuterJoinElimination extends Rule[LogicalPlan] with PredicateHelper {

/**
* Returns whether the expression returns null or false when all inputs are nulls.
*/
private def canFilterOutNull(e: Expression): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also check the expression is deterministic or not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. Let me add it. Thanks!

if (!e.deterministic) return false
val attributes = e.references.toSeq
val emptyRow = new GenericInternalRow(attributes.length)
val v = BindReferences.bindReference(e, attributes).eval(emptyRow)
v == null || v == false
}

private def buildNewJoinType(filter: Filter, join: Join): JoinType = {
val splitConjunctiveConditions: Seq[Expression] = splitConjunctivePredicates(filter.condition)
val leftConditions = splitConjunctiveConditions
.filter(_.references.subsetOf(join.left.outputSet))
val rightConditions = splitConjunctiveConditions
.filter(_.references.subsetOf(join.right.outputSet))

val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull) ||
filter.constraints.filter(_.isInstanceOf[IsNotNull])
.exists(expr => join.left.outputSet.intersect(expr.references).nonEmpty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not correct, IsNotNull(Add(A, B)) does not mean both A and B should be not null.

I think we don't need these constraints.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davies Based on my understanding, IsNotNull(Add(A, B)) is put into constraints only when Add(A,B) is the left/right side of the following operators: EqualTo, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, and EqualNullSafe.

For example, a + b = c, this condition always returns false when either a, b or c is NULL. Thus, we can use it as not null filters. The benefit here is to handle the conditions that contains both left and right at the same time.

Please correct me if my understanding is not right. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For EqualNullSafe, the result is true if both a and b are null, so it can not filter out null.

All others make sense to me, we can have that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, true. Let me correct it. Thanks!

val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull) ||
filter.constraints.filter(_.isInstanceOf[IsNotNull])
.exists(expr => join.right.outputSet.intersect(expr.references).nonEmpty)

join.joinType match {
case RightOuter if leftHasNonNullPredicate => Inner
case LeftOuter if rightHasNonNullPredicate => Inner
case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate => Inner
case FullOuter if leftHasNonNullPredicate => LeftOuter
case FullOuter if rightHasNonNullPredicate => RightOuter
case o => o
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) =>
val newJoinType = buildNewJoinType(f, j)
if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
}
}

/**
* Pushes down [[Filter]] operators where the `condition` can be
* evaluated using only the attributes of the left or right side of a join. Other
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._

class OuterJoinEliminationSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Subqueries", Once,
EliminateSubQueries) ::
Batch("Outer Join Elimination", Once,
OuterJoinElimination,
PushPredicateThroughJoin) :: Nil
}

val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
val testRelation1 = LocalRelation('d.int, 'e.int, 'f.int)

test("joins: full outer to inner") {
val x = testRelation.subquery('x)
val y = testRelation1.subquery('y)

val originalQuery =
x.join(y, FullOuter, Option("x.a".attr === "y.d".attr))
.where("x.b".attr >= 1 && "y.d".attr >= 2)

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation.where('b >= 1)
val right = testRelation1.where('d >= 2)
val correctAnswer =
left.join(right, Inner, Option("a".attr === "d".attr)).analyze

comparePlans(optimized, correctAnswer)
}

test("joins: full outer to right") {
val x = testRelation.subquery('x)
val y = testRelation1.subquery('y)

val originalQuery =
x.join(y, FullOuter, Option("x.a".attr === "y.d".attr)).where("y.d".attr > 2)

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation
val right = testRelation1.where('d > 2)
val correctAnswer =
left.join(right, RightOuter, Option("a".attr === "d".attr)).analyze

comparePlans(optimized, correctAnswer)
}

test("joins: full outer to left") {
val x = testRelation.subquery('x)
val y = testRelation1.subquery('y)

val originalQuery =
x.join(y, FullOuter, Option("x.a".attr === "y.d".attr)).where("x.a".attr <=> 2)

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation.where('a <=> 2)
val right = testRelation1
val correctAnswer =
left.join(right, LeftOuter, Option("a".attr === "d".attr)).analyze

comparePlans(optimized, correctAnswer)
}

test("joins: right to inner") {
val x = testRelation.subquery('x)
val y = testRelation1.subquery('y)

val originalQuery =
x.join(y, RightOuter, Option("x.a".attr === "y.d".attr)).where("x.b".attr > 2)

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation.where('b > 2)
val right = testRelation1
val correctAnswer =
left.join(right, Inner, Option("a".attr === "d".attr)).analyze

comparePlans(optimized, correctAnswer)
}

test("joins: left to inner") {
val x = testRelation.subquery('x)
val y = testRelation1.subquery('y)

val originalQuery =
x.join(y, LeftOuter, Option("x.a".attr === "y.d".attr))
.where("y.e".attr.isNotNull)

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation
val right = testRelation1.where('e.isNotNull)
val correctAnswer =
left.join(right, Inner, Option("a".attr === "d".attr)).analyze

comparePlans(optimized, correctAnswer)
}

// evaluating if mixed OR and NOT expressions can eliminate all null-supplying rows
test("joins: left to inner with complicated filter predicates #1") {
val x = testRelation.subquery('x)
val y = testRelation1.subquery('y)

val originalQuery =
x.join(y, LeftOuter, Option("x.a".attr === "y.d".attr))
.where(!'e.isNull || ('d.isNotNull && 'f.isNull))

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation
val right = testRelation1.where(!'e.isNull || ('d.isNotNull && 'f.isNull))
val correctAnswer =
left.join(right, Inner, Option("a".attr === "d".attr)).analyze

comparePlans(optimized, correctAnswer)
}

// eval(emptyRow) of 'e.in(1, 2) will return null instead of false
test("joins: left to inner with complicated filter predicates #2") {
val x = testRelation.subquery('x)
val y = testRelation1.subquery('y)

val originalQuery =
x.join(y, LeftOuter, Option("x.a".attr === "y.d".attr))
.where('e.in(1, 2))

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation
val right = testRelation1.where('e.in(1, 2))
val correctAnswer =
left.join(right, Inner, Option("a".attr === "d".attr)).analyze

comparePlans(optimized, correctAnswer)
}

// evaluating if mixed OR and AND expressions can eliminate all null-supplying rows
test("joins: left to inner with complicated filter predicates #3") {
val x = testRelation.subquery('x)
val y = testRelation1.subquery('y)

val originalQuery =
x.join(y, LeftOuter, Option("x.a".attr === "y.d".attr))
.where((!'e.isNull || ('d.isNotNull && 'f.isNull)) && 'e.isNull)

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation
val right = testRelation1.where((!'e.isNull || ('d.isNotNull && 'f.isNull)) && 'e.isNull)
val correctAnswer =
left.join(right, Inner, Option("a".attr === "d".attr)).analyze

comparePlans(optimized, correctAnswer)
}

// evaluating if the expressions that have both left and right attributes
// can eliminate all null-supplying rows
// FULL OUTER => INNER
test("joins: left to inner with complicated filter predicates #4") {
val x = testRelation.subquery('x)
val y = testRelation1.subquery('y)

val originalQuery =
x.join(y, FullOuter, Option("x.a".attr === "y.d".attr))
.where("x.b".attr + 3 === "y.e".attr)

val optimized = Optimize.execute(originalQuery.analyze)
val left = testRelation
val right = testRelation1
val correctAnswer =
left.join(right, Inner, Option("b".attr + 3 === "e".attr && "a".attr === "d".attr)).analyze

comparePlans(optimized, correctAnswer)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql

import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical.Join
import org.apache.spark.sql.execution.joins.BroadcastHashJoin
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
Expand Down Expand Up @@ -156,4 +158,50 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
assert(df1.join(broadcast(pf1)).count() === 4)
}
}

test("join - outer join conversion") {
val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a")
val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b")

// outer -> left
val outerJoin2Left = df.join(df2, $"a.int" === $"b.int", "outer").where($"a.int" === 3)
assert(outerJoin2Left.queryExecution.optimizedPlan.collect {
case j @ Join(_, _, LeftOuter, _) => j }.size === 1)
checkAnswer(
outerJoin2Left,
Row(3, 4, "3", null, null, null) :: Nil)

// outer -> right
val outerJoin2Right = df.join(df2, $"a.int" === $"b.int", "outer").where($"b.int" === 5)
assert(outerJoin2Right.queryExecution.optimizedPlan.collect {
case j @ Join(_, _, RightOuter, _) => j }.size === 1)
checkAnswer(
outerJoin2Right,
Row(null, null, null, 5, 6, "5") :: Nil)

// outer -> inner
val outerJoin2Inner = df.join(df2, $"a.int" === $"b.int", "outer").
where($"a.int" === 1 && $"b.int2" === 3)
assert(outerJoin2Inner.queryExecution.optimizedPlan.collect {
case j @ Join(_, _, Inner, _) => j }.size === 1)
checkAnswer(
outerJoin2Inner,
Row(1, 2, "1", 1, 3, "1") :: Nil)

// right -> inner
val rightJoin2Inner = df.join(df2, $"a.int" === $"b.int", "right").where($"a.int" === 1)
assert(rightJoin2Inner.queryExecution.optimizedPlan.collect {
case j @ Join(_, _, Inner, _) => j }.size === 1)
checkAnswer(
rightJoin2Inner,
Row(1, 2, "1", 1, 3, "1") :: Nil)

// left -> inner
val leftJoin2Inner = df.join(df2, $"a.int" === $"b.int", "left").where($"b.int2" === 3)
assert(leftJoin2Inner.queryExecution.optimizedPlan.collect {
case j @ Join(_, _, Inner, _) => j }.size === 1)
checkAnswer(
leftJoin2Inner,
Row(1, 2, "1", 1, 3, "1") :: Nil)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
("SELECT * FROM testData JOIN testData2 ON key = a where key = 2", classOf[SortMergeJoin]),
("SELECT * FROM testData LEFT JOIN testData2 ON key = a", classOf[SortMergeOuterJoin]),
("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2",
classOf[SortMergeOuterJoin]),
classOf[SortMergeJoin]), // converted from Right Outer to Inner
("SELECT * FROM testData right join testData2 ON key = a and key = 2",
classOf[SortMergeOuterJoin]),
("SELECT * FROM testData full outer join testData2 ON key = a",
Expand Down