Skip to content

Conversation

@c21
Copy link
Contributor

@c21 c21 commented Aug 20, 2020

What changes were proposed in this pull request?

For broadcast hash join and shuffled hash join, whenever the build side hashed relation turns out to be empty. We don't need to execute stream side plan at all, and can return an empty iterator (for inner join and left semi join), because we know for sure that none of stream side rows can be outputted as there's no match.

Why are the changes needed?

A very minor optimization for rare use case, but in case build side turns out to be empty, we can leverage it to short-cut stream side to save CPU and IO.

Example broadcast hash join query similar to JoinBenchmark with empty hashed relation:

  def broadcastHashJoinLongKey(): Unit = {
    val N = 20 << 20
    val M = 1 << 16

    val dim = broadcast(spark.range(0).selectExpr("id as k", "cast(id as string) as v"))
    codegenBenchmark("Join w long", N) {
      val df = spark.range(N).join(dim, (col("id") % M) === col("k"))
      assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
      df.noop()
    }
  }

Comparing wall clock time for enabling and disabling this PR (for non-codegen code path). Seeing like 8x improvement.

Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15.4
Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
Join w long:                              Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Join PR disabled                                    637            646          12         32.9          30.4       1.0X
Join PR enabled                                      77             78           2        271.8           3.7       8.3X

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added unit test in JoinSuite.

@c21
Copy link
Contributor Author

c21 commented Aug 20, 2020

cc @cloud-fan and @sameeragarwal if you have time to take a look, thanks.

@SparkQA
Copy link

SparkQA commented Aug 20, 2020

Test build #127676 has finished for PR 29484 at commit 1cd840d.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

if (keyIsUnique) {
if (isEmptyHashedRelation) {
s"""
|// If HashedRelation is empty, hash inner join simply returns nothing.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't a short-cut and we still need to consume the entire stream side. We need something similar to #29389

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - yes sorry about it. After checking codegen code for example query I can confirm this. For non-codegen (iterator mode) it works, but for codegen it does not work because we are processing in doConsume() here so we are still executing the stream side.

So I think

  • for non-codegen code path: will keep the same change as in this PR now.
  • for codegen code path: do not make change here in HashJoin, but adding an adaptive execution logical plan rule e.g. called EliminateEmptyBroadcastHashJoin.scala which checks stage: BroadcastQueryStageExec to be empty or not (stage.broadcast.relationFuture.get().value == EmptyHashedRelation), if it is, then changing the logical plan from Join to LocalRelation(data = Seq.empty, ...).

Does it sound good as plan? Thanks.

Copy link
Contributor

@cloud-fan cloud-fan Aug 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can keep this change, as it at least avoids looking up an empty hash relation and is better than before. It's still useful with AQE is off.

We just need to add a new rule, or extend rule EliminateNullAwareAntiJoin and rename it to OptimizeJoinToEmptyRelation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - sure, updated.

* 1. `Join` is single column NULL-aware anti join (NAAJ), and broadcasted `HashedRelation`
* is `EmptyHashedRelationWithAllNullKeys`.
*
* 2. `Join` is inner or left semi join, and broadcasted `HashedRelation` is `EmptyHashedRelation`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about other join types?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like left outer?

Copy link
Contributor Author

@c21 c21 Aug 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - for left outer join, we need to output the stream side (left side) right? For all other join types (left/right/full outer, anti, existence), stream side needs to be outputted with empty build side. Not sure how AQE helps here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left outer with left empty should return nothing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - with current SHJ/BHJ implementation, left outer join can only build right side, but not left side. I want to target this PR to only handle empty hashed relation (empty build side). But I agree, we can have a short-cut to check stream side is empty or not, before building hashed relation, to save the cost of building hashed relation. But I think AQE cannot handle it, and we can tackle it separately if it's worthy. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right, let's leave it for later.

.groupBy('a).count()
checkAnswer(testDf, Seq())
val plan = testDf.queryExecution.executedPlan
assert(find(plan)(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - just fyi. the change in this unit test is needed as assert(find(plan)(_.isInstanceOf[BroadcastHashJoinExec]).isDefined) no long true, because this is an inner join and the build side is empty. So with the change in this PR, the join operator is optimized into an empty relation operator (failure stack trace of unit test without change is here).

Changed from inner join to left outer join, to help unit test pass. And I don't think changing from inner join to left outer join here can comprise any functionality of original unit test. Let me know if it's not the case. thanks.

@SparkQA
Copy link

SparkQA commented Aug 21, 2020

Test build #127721 has finished for PR 29484 at commit f77e684.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 21, 2020

Test build #127728 has finished for PR 29484 at commit 0e16591.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@c21
Copy link
Contributor Author

c21 commented Aug 21, 2020

retest this please

@cloud-fan
Copy link
Contributor

also cc @viirya @maropu

@SparkQA
Copy link

SparkQA commented Aug 21, 2020

Test build #127730 has finished for PR 29484 at commit 0e16591.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

*
* 2. `Join` is inner or left semi join, and broadcasted `HashedRelation` is `EmptyHashedRelation`.
*/
object OptimizeJoinToEmptyRelation extends Rule[LogicalPlan] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the prefix Optimize is a general term, so how about keeping Eliminate in the prefix? e.g., EliminateJoinHavingEmptyRelation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - I agree with your sentiment here. However the rule also handles the case to convert a NULL-aware anti join into empty relation, if the build side has any NULL key (EmptyHashedRelationWithAllNullKeys). So we are not eliminating join having empty relation (EmptyHashedRelationWithAllNullKeys is not an empty relation, but a relation with NULL key). So I am changing the optimization rule to EliminateJoinToEmptyRelation here indicate we are eliminating a join to an empty local relation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, I see. The new name looks okay to me, too.


private def canEliminate(
plan: LogicalPlan,
expectedRelation: HashedRelation): Boolean = plan match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about reformatting it like this?

  private def canEliminate(plan: LogicalPlan, emptyRelation: HashedRelation): Boolean = plan match {
    case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if stage.resultOption.get().isDefined
      && stage.broadcast.relationFuture.get().value == emptyRelation => true
    case _ => false
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - sure, updated.

/**
* This optimization rule detects and converts a `Join` to an empty `LocalRelation`:
* 1. `Join` is single column NULL-aware anti join (NAAJ), and broadcasted `HashedRelation`
* is `EmptyHashedRelationWithAllNullKeys`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: [[EmptyHashedRelationWithAllNullKeys]]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - updated.


/**
* This optimization rule detects and converts a `Join` to an empty `LocalRelation`:
* 1. `Join` is single column NULL-aware anti join (NAAJ), and broadcasted `HashedRelation`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: [[HashedRelation]]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - updated.

import org.apache.spark.sql.execution.joins.{EmptyHashedRelation, EmptyHashedRelationWithAllNullKeys, HashedRelation}

/**
* This optimization rule detects and converts a `Join` to an empty `LocalRelation`:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: [[LocalRelation]] for shortcuts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - sure. updated.


if (keyIsUnique) {
if (isEmptyHashedRelation) {
s"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove s

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - updated.


if (keyIsUnique) {
if (isEmptyHashedRelation) {
s"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: remove s.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - updated.

Comment on lines 1186 to 1220
// Test inner and left semi join
Seq(
// inner join (small table at right side)
"SELECT * FROM testData t1 join testData3 t2 ON t1.key = t2.a WHERE t2.b = 1",
// inner join (small table at left side)
"SELECT * FROM testData3 t1 join testData t2 ON t1.a = t2.key WHERE t1.b = 1",
// left semi join
"SELECT * FROM testData t1 left semi join testData3 t2 ON t1.key = t2.a AND t2.b = 1"
).foreach(query => {
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(query)
val smj = findTopLevelSortMergeJoin(plan)
assert(smj.size == 1)
val join = findTopLevelBaseJoin(adaptivePlan)
assert(join.isEmpty)
checkNumLocalShuffleReaders(adaptivePlan)
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you separate these new tests from this test unit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - sure, updated.

@maropu
Copy link
Member

maropu commented Aug 21, 2020

Looks okay except for the minor comments I left.

* is [[EmptyHashedRelationWithAllNullKeys]].
*
* 2. Join is inner or left semi join, and broadcasted [[HashedRelation]]
is [[EmptyHashedRelation]].
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: you forgot to add *.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu - sorry, updated.

Copy link
Member

@maropu maropu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this improvement, @c21 ! I'll leave it to @cloud-fan @viirya

Comment on lines 37 to 42
case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if stage.resultOption.get().isDefined
&& stage.broadcast.relationFuture.get().value == emptyRelation => true
case _ => false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For shuffled hash join, doesn't it need to be broadcasted too?

Copy link
Contributor Author

@c21 c21 Aug 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya - when AQE is enabled, for SMJ and SHJ, both will be turned into a BHJ because build side is empty (which is small enough to be broadcasted). here the BHJ will be turned into an empty LocalRelation. So it covers SHJ (basically SHJ has two hops - first turned into BHJ, then turned into empty LocalRelation).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Can you add into the comment explaining it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya - sure, added.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya - thanks for suggestion.

"SELECT * FROM testData t1 left semi join testData3 t2 ON t1.key = t2.a AND t2.b = 1"
).foreach(query => {
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(query)
val smj = findTopLevelSortMergeJoin(plan)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SortMergeJoin? I think this targets BHJ and SHJ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya - similar to other test cases in this file - the input data stats is super large, and by default it uses SMJ. Per my comment above, SMJ/BHJ/SHJ will all turn into empty LocalRelation (where SMJ/SHJ first turned into BHJ).

@SparkQA
Copy link

SparkQA commented Aug 21, 2020

Test build #127756 has finished for PR 29484 at commit 53b8e27.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@c21
Copy link
Contributor Author

c21 commented Aug 22, 2020

cc @cloud-fan the PR is ready to merge if no more new comments, thanks.

@SparkQA
Copy link

SparkQA commented Aug 22, 2020

Test build #127766 has finished for PR 29484 at commit fecadfd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 22, 2020

Test build #127767 has finished for PR 29484 at commit ff52751.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@c21
Copy link
Contributor Author

c21 commented Aug 23, 2020

Already rebased to latest master to resolve conflict with #29503.
The PR is ready to merge if no more comments, thanks. cc @cloud-fan and @viirya .

@SparkQA
Copy link

SparkQA commented Aug 23, 2020

Test build #127798 has finished for PR 29484 at commit a7cdf6e.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 23, 2020

Test build #127799 has finished for PR 29484 at commit 496eda7.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@c21
Copy link
Contributor Author

c21 commented Aug 23, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Aug 23, 2020

Test build #127804 has finished for PR 29484 at commit 496eda7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 08b951b Aug 24, 2020
@c21
Copy link
Contributor Author

c21 commented Aug 24, 2020

Thank you @cloud-fan , @maropu and @viirya for review!

@c21 c21 deleted the empty-relation branch August 24, 2020 17:58
cloud-fan pushed a commit that referenced this pull request Feb 26, 2021
### What changes were proposed in this pull request?

I discovered from review discussion - #31630 (comment) , that we can eliminate LEFT ANTI join (with no join condition) to empty relation, if the right side is known to be non-empty. So with AQE, this is doable similar to #29484 .

### Why are the changes needed?

This can help eliminate the join operator during logical plan optimization.
Before this PR, [left side physical plan `execute()` will be called](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala#L192), so if left side is complicated (e.g. contain broadcast exchange operator), then some computation would happen. However after this PR, the join operator will be removed during logical plan, and nothing will be computed from left side. Potentially it can save resource for these kinds of query.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit tests for positive and negative queries in `AdaptiveQueryExecSuite.scala`.

Closes #31641 from c21/left-anti-aqe.

Authored-by: Cheng Su <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants