-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-46090][SQL] Support plan fragment level SQL configs in AQE #44013
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @cloud-fan @maryannxue @dongjoon-hyun if you have time to take a look at this idea, thank you. |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for pinging me, @ulysses-you .
Although this PR is not big, could you split this PR into two?
- A PR for adding
Adaptive Query Post Planner Strategy Rules - A PR for
fragment-level SQL configs in AQE?
We can proceed (1) first independently.
…y rules in SparkSessionExtensions ### What changes were proposed in this pull request? This pr adds a new extension entrance `queryPostPlannerStrategyRules` in `SparkSessionExtensions`. It will be applied between plannerStrategy and queryStagePrepRules in AQE, so it can get the whole plan before injecting exchanges. ### Why are the changes needed? a part of #44013 ### Does this PR introduce _any_ user-facing change? no, only for develop ### How was this patch tested? add test ### Was this patch authored or co-authored using generative AI tooling? no Closes #44074 from ulysses-you/post-planner. Authored-by: ulysses-you <[email protected]> Signed-off-by: youxiduo <[email protected]>
b358f71 to
380bc21
Compare
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for rebasing this PR, @ulysses-you .
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SQLConfHelper.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this used in somewhere else? Otherwise, let's not define a new one~
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
Outdated
Show resolved
Hide resolved
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala
Outdated
Show resolved
Hide resolved
|
To @ulysses-you , could you address the second-round review comments, please? |
|
thank you @dongjoon-hyun, will rebase and address comments after #44142 |
…per`
### What changes were proposed in this pull request?
This pr moves method `withSQLConf` from `SQLHelper` in catalyst test module to `SQLConfHelper` trait in catalyst module. To make it easy to use such case: `val x = withSQLConf {}`, this pr also changes its return type.
### Why are the changes needed?
A part of #44013
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
Pass CI
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #44142 from ulysses-you/withSQLConf.
Authored-by: ulysses-you <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
|
I merged it. Could you rebase this PR, @ulysses-you ? 😄 |
380bc21 to
8a9c1dd
Compare
|
thank you @dongjoon-hyun it's done |
…per`
### What changes were proposed in this pull request?
This pr moves method `withSQLConf` from `SQLHelper` in catalyst test module to `SQLConfHelper` trait in catalyst module. To make it easy to use such case: `val x = withSQLConf {}`, this pr also changes its return type.
### Why are the changes needed?
A part of apache#44013
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
Pass CI
### Was this patch authored or co-authored using generative AI tooling?
no
Closes apache#44142 from ulysses-you/withSQLConf.
Authored-by: ulysses-you <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM from my side. Thank you, @ulysses-you .
|
cc @cloud-fan , @wangyum |
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveRuleContext.scala
Show resolved
Hide resolved
…per`
### What changes were proposed in this pull request?
This pr moves method `withSQLConf` from `SQLHelper` in catalyst test module to `SQLConfHelper` trait in catalyst module. To make it easy to use such case: `val x = withSQLConf {}`, this pr also changes its return type.
### Why are the changes needed?
A part of apache#44013
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
Pass CI
### Was this patch authored or co-authored using generative AI tooling?
no
Closes apache#44142 from ulysses-you/withSQLConf.
Authored-by: ulysses-you <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
|
@ulysses-you Could you explain what scenario would require adjusting the SQL configs segment by segment? I'm just out of curiosity. |
|
@beliefer for example, change the initial shuffle partition number per plan fragment to avoid too big or too small, change the advisory partition size according to the feature of plan fragment(small for generate, big for filter). |
Thank you for explanation. |
…y rules in SparkSessionExtensions ### What changes were proposed in this pull request? This pr adds a new extension entrance `queryPostPlannerStrategyRules` in `SparkSessionExtensions`. It will be applied between plannerStrategy and queryStagePrepRules in AQE, so it can get the whole plan before injecting exchanges. ### Why are the changes needed? a part of apache#44013 ### Does this PR introduce _any_ user-facing change? no, only for develop ### How was this patch tested? add test ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#44074 from ulysses-you/post-planner. Authored-by: ulysses-you <[email protected]> Signed-off-by: youxiduo <[email protected]>
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
|
@cloud-fan any concern to land this at 4.0 ? |
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveRuleContext.scala
Show resolved
Hide resolved
| def get(): Option[AdaptiveRuleContext] = Option(ruleContextThreadLocal.get()) | ||
|
|
||
| private[sql] def withRuleContext[T](ruleContext: AdaptiveRuleContext)(block: => T): T = { | ||
| assert(ruleContext != null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make it null-tolerant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's used to make sure rule conext is valid, null is meanless even if we tolerate..
|
LGTM |
…e/AdaptiveRuleContext.scala
|
thank you all, merged to master(4.0.0) |
| private def withRuleContext[T](f: => T): T = | ||
| AdaptiveRuleContext.withRuleContext(ruleContext) { f } | ||
|
|
||
| private def applyPhysicalRulesWithRuleContext( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we update applyPhysicalRules directly? Do we expect people to call applyPhysicalRules without rule context?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
applyPhysicalRules is a kind of private method, not sure how can people use it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, people who develop Spark. When applyPhysicalRules should be called instead of applyPhysicalRulesWithRuleContext? If never, why do we still keep it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
applyPhysicalRules it is used by InsertAdaptiveSparkPlan for Spark internal rules and at that time we do not have a AdaptiveSparkPlanExe. It does not affect user-specifed rules, so I leave it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah I see
|
Hi @ulysses-you , we tried to use this
I think a better design is to put the context (plan fragment level confs or something more general) in the query stage itself. Then all the rules can either update or consume the query stage context. The only problem is that we don't have a query stage node for the final result stage, but we should add one (@liuzqt is working on it). |
|
I've reverted it since #49715 is out and it should be the right design for this. |
### What changes were proposed in this pull request?
Added ResultQueryStageExec for AQE
How does the query plan look like in explain string:
```
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
ResultQueryStage 2 ------> newly added
+- *(5) Project [id#26L]
+- *(5) SortMergeJoin [id#26L], [id#27L], Inner
:- *(3) Sort [id#26L ASC NULLS FIRST], false, 0
: +- AQEShuffleRead coalesced
: +- ShuffleQueryStage 0
: +- Exchange hashpartitioning(id#26L, 200), ENSURE_REQUIREMENTS, [plan_id=247]
: +- *(1) Range (0, 25600, step=1, splits=10)
+- *(4) Sort [id#27L ASC NULLS FIRST], false, 0
+- AQEShuffleRead coalesced
+- ShuffleQueryStage 1
+- Exchange hashpartitioning(id#27L, 200), ENSURE_REQUIREMENTS, [plan_id=257]
+- *(2) Ran...
```
How does the query plan look like in Spark UI:
<img width="680" alt="Screenshot 2025-02-03 at 4 11 43 PM" src="https://github.com/user-attachments/assets/86946e19-ffdd-42dd-974a-62a8300ddac8" />
### Why are the changes needed?
Currently AQE framework is not fully self-contained since not all plan segments can be put into a query stage: the final "stage" basically executed as a nonAQE plan. This PR added a result query stage for AQE to unify the framework. With this change, we can build more query stage level features, one use case like #44013 (comment)
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
new unit tests.
Also exisiting tests which are impacted by this change are updated to keep their original test semantics.
### Was this patch authored or co-authored using generative AI tooling?
NO
Closes #49715 from liuzqt/SPARK-51008.
Lead-authored-by: liuzqt <[email protected]>
Co-authored-by: Ziqi Liu <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request?
Added ResultQueryStageExec for AQE
How does the query plan look like in explain string:
```
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
ResultQueryStage 2 ------> newly added
+- *(5) Project [id#26L]
+- *(5) SortMergeJoin [id#26L], [id#27L], Inner
:- *(3) Sort [id#26L ASC NULLS FIRST], false, 0
: +- AQEShuffleRead coalesced
: +- ShuffleQueryStage 0
: +- Exchange hashpartitioning(id#26L, 200), ENSURE_REQUIREMENTS, [plan_id=247]
: +- *(1) Range (0, 25600, step=1, splits=10)
+- *(4) Sort [id#27L ASC NULLS FIRST], false, 0
+- AQEShuffleRead coalesced
+- ShuffleQueryStage 1
+- Exchange hashpartitioning(id#27L, 200), ENSURE_REQUIREMENTS, [plan_id=257]
+- *(2) Ran...
```
How does the query plan look like in Spark UI:
<img width="680" alt="Screenshot 2025-02-03 at 4 11 43 PM" src="https://github.com/user-attachments/assets/86946e19-ffdd-42dd-974a-62a8300ddac8" />
### Why are the changes needed?
Currently AQE framework is not fully self-contained since not all plan segments can be put into a query stage: the final "stage" basically executed as a nonAQE plan. This PR added a result query stage for AQE to unify the framework. With this change, we can build more query stage level features, one use case like #44013 (comment)
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
new unit tests.
Also exisiting tests which are impacted by this change are updated to keep their original test semantics.
### Was this patch authored or co-authored using generative AI tooling?
NO
Closes #49715 from liuzqt/SPARK-51008.
Lead-authored-by: liuzqt <[email protected]>
Co-authored-by: Ziqi Liu <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 207390b)
Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
This pr introduces
case class AdaptiveRuleContext(isSubquery: Boolean, isFinalStage: Boolean)which can be used inside adaptive sql extension rules through thread local, so that developers can modify the next plan fragment configs usingAdaptiveRuleContext.get().The plan fragment configs can be propagated through multi-phases, e.g., if set a config in
queryPostPlannerStrategyRulesthen the config can be gotten inqueryStagePrepRules,queryStageOptimizerRulesandcolumnarRules. The configs will be cleanup before going to execute, so in next round the configs will be empty.Why are the changes needed?
To support modify the plan fragment level SQL configs through AQE rules.
Does this PR introduce any user-facing change?
no, only affect developers.
How was this patch tested?
add new tests
Was this patch authored or co-authored using generative AI tooling?
no