-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-34514][SQL] Push down limit for LEFT SEMI and LEFT ANTI join #31630
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @wangyum , @maropu , @viirya , @HyukjinKwon and @cloud-fan for review if you have time, thanks. |
| left = maybePushLocalLimit(exp, left), | ||
| right = maybePushLocalLimit(exp, right)) | ||
| case LeftSemi | LeftAnti if conditionOpt.isEmpty => | ||
| join.copy(left = maybePushLocalLimit(exp, left)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm, in this case, we need the join itself?
scala> sql("select * from l1").show()
+----+
| id|
+----+
| 1|
| 2|
|null|
+----+
scala> sql("select * from r1").show()
+----+
| id|
+----+
| 2|
|null|
+----+
scala> sql("select * from l1 left semi join r1").show()
+----+
| id|
+----+
| 1|
| 2|
|null|
+----+
scala> sql("select * from l1 left anti join r1").show()
+---+
| id|
+---+
+---+
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we still need. Whether to output all rows or nothing, is depending on whether right side is empty, and this can only be known during runtime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maropu - this actually reminds me whether we can further optimize during runtime, and I found I already did it for LEFT SEMI with AQE - #29484 . Similarly for LEFT ANTI join without condition, we can convert join logical plan node to an empty relation if right build side is not empty. Will submit a followup PR tomorrow.
In addition, after taking a deep look at BroadcastNestedLoopJoinExec (never looked closely to that because it's not popular in our environment), I found many places that we can optimize:
- populate
outputOrderingandoutputPartitioningwhen possible to avoid shuffle/sort in later stage. - shortcut for
LEFT SEMI/ANTIindefaultJoin()as we don't need to look through all rows when there's no join condition. - code-gen the operator.
I will file an umbrella JIRA with minor priority and do it gradually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly for LEFT ANTI join without condition, we can convert join logical plan node to an empty relation if right build side is not empty. Will submit a followup PR tomorrow.
Ah, I see. That sounds reasonable. Nice idea, @c21 .
| spark.range(5).toDF().repartition(1).write.saveAsTable("left_table") | ||
| spark.range(3).write.saveAsTable("nonempty_right_table") | ||
| spark.range(0).write.saveAsTable("empty_right_table") | ||
| Seq("LEFT SEMI").foreach { joinType => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems LEFT ANTI is missing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - good catch, I was accidentally removing it during debugging, fixed.
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
thanks, merging to master! |
|
Thank you all for review! |
|
Test build #135409 has finished for PR 31630 at commit
|
### What changes were proposed in this pull request? I discovered from review discussion - #31630 (comment) , that we can eliminate LEFT ANTI join (with no join condition) to empty relation, if the right side is known to be non-empty. So with AQE, this is doable similar to #29484 . ### Why are the changes needed? This can help eliminate the join operator during logical plan optimization. Before this PR, [left side physical plan `execute()` will be called](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala#L192), so if left side is complicated (e.g. contain broadcast exchange operator), then some computation would happen. However after this PR, the join operator will be removed during logical plan, and nothing will be computed from left side. Potentially it can save resource for these kinds of query. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit tests for positive and negative queries in `AdaptiveQueryExecSuite.scala`. Closes #31641 from c21/left-anti-aqe. Authored-by: Cheng Su <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
I found out during code review of #31567 (comment), where we can push down limit to the left side of LEFT SEMI and LEFT ANTI join, if the join condition is empty.
Why it's safe to push down limit:
The semantics of LEFT SEMI join without condition:
(1). if right side is non-empty, output all rows from left side.
(2). if right side is empty, output nothing.
The semantics of LEFT ANTI join without condition:
(1). if right side is non-empty, output nothing.
(2). if right side is empty, output all rows from left side.
With the semantics of output all rows from left side or nothing (all or nothing), it's safe to push down limit to left side.
NOTE: LEFT SEMI / LEFT ANTI join with non-empty condition is not safe for limit push down, because output can be a portion of left side rows.
Reference: physical operator implementation for LEFT SEMI / LEFT ANTI join without condition - https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala#L200-L204 .
Why are the changes needed?
Better performance. Save CPU and IO for these joins, as limit being pushed down before join.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Added unit test in
LimitPushdownSuite.scalaandSQLQuerySuite.scala.