Skip to content

Conversation

@gatorsmile
Copy link
Member

Conversion of outer joins, if the predicates in filter conditions can restrict the result sets so that all null-supplying rows are eliminated.

  • full outer -> inner if both sides have such predicates
  • left outer -> inner if the right side has such predicates
  • right outer -> inner if the left side has such predicates
  • full outer -> left outer if only the left side has such predicates
  • full outer -> right outer if only the right side has such predicates

If applicable, this can greatly improve the performance, since outer join is much slower than inner join, full outer join is much slower than left/right outer join.

The original PR is #10542

@SparkQA
Copy link

SparkQA commented Jan 4, 2016

Test build #48633 has finished for PR 10567 at commit d516ed4.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 4, 2016

Test build #48634 has finished for PR 10567 at commit 3c9e965.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you document the arguments here?

@sameeragarwal it would be great if we could replace this function with your more general null propagation information

/cc @yhuai

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile now that we propagate IsNotNull constraints in the logical plan, you should be able to eliminate outer joins by simply looking into the constraints of the parent filter operator. I believe something along the lines this should work:

object OuterJoinElimination extends Rule[LogicalPlan] with PredicateHelper {

  private def buildNewJoin(filter: Filter, join: Join): Join = {

    val leftHasNonNullPredicate = filter.constraints.filter(_.isInstanceOf[IsNotNull])
      .exists(expr => join.left.outputSet.intersect(expr.references).nonEmpty)
    val rightHasNonNullPredicate = filter.constraints.filter(_.isInstanceOf[IsNotNull])
      .exists(expr => join.right.outputSet.intersect(expr.references).nonEmpty)

    join.joinType match {
      case RightOuter if leftHasNonNullPredicate =>
        Join(join.left, join.right, Inner, join.condition)
      case LeftOuter if rightHasNonNullPredicate =>
        Join(join.left, join.right, Inner, join.condition)
      case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate =>
        Join(join.left, join.right, Inner, join.condition)
      case FullOuter if leftHasNonNullPredicate =>
        Join(join.left, join.right, LeftOuter, join.condition)
      case FullOuter if rightHasNonNullPredicate =>
        Join(join.left, join.right, RightOuter, join.condition)
      case _ =>
        join
    }
  }

  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case f @ Filter(condition,  j@ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) =>
      Filter(condition, buildNewJoin(f, j))
  }
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much! Will do the changes.

@SparkQA
Copy link

SparkQA commented Feb 11, 2016

Test build #51088 has finished for PR 10567 at commit 560f2d0.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 11, 2016

Test build #51098 has started for PR 10567 at commit e7fa63f.

@shaneknapp
Copy link
Contributor

jenkins, test this please

@SparkQA
Copy link

SparkQA commented Feb 11, 2016

Test build #51115 has finished for PR 10567 at commit e7fa63f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can support all the expressions that will return null or false, if the inputs are null.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With your changes, we can support all the expressions whose attributes are all from left or right, no matter how complicated they are!!!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With @sameeragarwal changes, we can support the expressions containing both left and right attributes but the types are limited to EqualTo, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, and EqualNullSafe.

We need both!

@SparkQA
Copy link

SparkQA commented Feb 18, 2016

Test build #51499 has finished for PR 10567 at commit 11b3214.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile gatorsmile force-pushed the outerJoinEliminationByFilterCond branch from 11b3214 to 6977fdf Compare February 19, 2016 04:15
@SparkQA
Copy link

SparkQA commented Feb 19, 2016

Test build #51520 has finished for PR 10567 at commit 6977fdf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return the original f if it's not changed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! will do.

@SparkQA
Copy link

SparkQA commented Feb 19, 2016

Test build #51570 has finished for PR 10567 at commit cc0262c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to return JoinType from buildNewJoin

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do.

@davies
Copy link
Contributor

davies commented Feb 20, 2016

LGTM, pending tests.

@SparkQA
Copy link

SparkQA commented Feb 20, 2016

Test build #51587 has finished for PR 10567 at commit 82357e0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor

davies commented Feb 20, 2016

Merging this into master, thanks!

@asfgit asfgit closed this in ec7a1d6 Feb 20, 2016
@gatorsmile gatorsmile deleted the outerJoinEliminationByFilterCond branch February 20, 2016 06:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants