Skip to content

Conversation

@ulysses-you
Copy link
Contributor

What changes were proposed in this pull request?

This pr adds a new extension entrance queryPostPlannerStrategyRules in SparkSessionExtensions. It will be applied between plannerStrategy and queryStagePrepRules in AQE, so it can get the whole plan before injecting exchanges.

Why are the changes needed?

a part of #44013

Does this PR introduce any user-facing change?

no, only for develop

How was this patch tested?

add test

Was this patch authored or co-authored using generative AI tooling?

no

@github-actions github-actions bot added the SQL label Nov 29, 2023
@ulysses-you
Copy link
Contributor Author

cc @dongjoon-hyun @cloud-fan this pr is only for adding Adaptive Query Post Planner Strategy Rules part.

@ulysses-you
Copy link
Contributor Author

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for making a PR, @ulysses-you .

type ColumnarRuleBuilder = SparkSession => ColumnarRule
type QueryStagePrepRuleBuilder = SparkSession => Rule[SparkPlan]
type QueryStageOptimizerRuleBuilder = SparkSession => Rule[SparkPlan]
type QueryPostPlannerStrategyBuilder = SparkSession => Rule[SparkPlan]
Copy link
Member

@dongjoon-hyun dongjoon-hyun Nov 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we put this before type QueryStagePrepRuleBuilder because this rule is supposed to be used before queryStagePreparationRules?

private[this] val queryStageOptimizerRuleBuilders =
mutable.Buffer.empty[QueryStageOptimizerRuleBuilder]
private[this] val queryPostPlannerStrategyRuleBuilders =
mutable.Buffer.empty[QueryPostPlannerStrategyBuilder]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. Shall we put this before private[this] val queryStagePrepRuleBuilders?

session: SparkSession): Seq[Rule[SparkPlan]] = {
queryPostPlannerStrategyRuleBuilders.map(_.apply(session)).toSeq
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. Shall we put this before private[sql] def buildQueryStagePrepRules?

def injectQueryPostPlannerStrategyRule(builder: QueryPostPlannerStrategyBuilder): Unit = {
queryPostPlannerStrategyRuleBuilders += builder
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. Shall we put this before def injectQueryStagePrepRule?

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, it looks reasonable to me. I left a few minor comments. Thank you, @ulysses-you .

Also, cc @sunchao , too

withSession(extensions) { session =>
assert(session.sessionState.adaptiveRulesHolder.queryPostPlannerStrategyRules
.contains(MyQueryPostPlannerStrategyRule))
import session.sqlContext.implicits._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about just use session.implicits._?

import session.sqlContext.implicits._
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "3",
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "false") {
val input = Seq((10), (20), (10)).toDF("c1")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it necessary to add parentheses to each element? for readability?

SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "false") {
val input = Seq((10), (20), (10)).toDF("c1")
val df = input.groupBy("c1").count()
df.collect()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried it, and even if df.collect() is not executed, this test case can still pass. So, is it necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is used to make sure we are checking the final plan

df.collect()
assert(df.rdd.partitions.length == 1)
assert(find(df.queryExecution.executedPlan) {
case s: ShuffleExchangeExec if s.outputPartitioning == SinglePartition => true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about case s: ShuffleExchangeExec => s.outputPartitioning == SinglePartition

Additionally, a personal opinion unrelated to this pr: If there is an exists function in AdaptiveSparkPlanHelper, would this assertion be simpler to write?

case s: ShuffleExchangeExec if s.outputPartitioning == SinglePartition => true
case _ => false
}.isDefined)
assert(find(df.queryExecution.executedPlan) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems using collectFirst could eliminate the case _ => false branch?

@ulysses-you
Copy link
Contributor Author

thank you @dongjoon-hyun @LuciferYang , addressed comments

@ulysses-you
Copy link
Contributor Author

thanks for review, merged to master.

@ulysses-you ulysses-you deleted the post-planner branch November 30, 2023 09:15
ulysses-you added a commit to ulysses-you/spark that referenced this pull request Feb 6, 2024
…y rules in SparkSessionExtensions

### What changes were proposed in this pull request?

This pr adds a new extension entrance `queryPostPlannerStrategyRules` in `SparkSessionExtensions`. It will be applied between plannerStrategy and queryStagePrepRules in AQE, so it can get the whole plan before injecting exchanges.

### Why are the changes needed?

a part of apache#44013

### Does this PR introduce _any_ user-facing change?

no, only for develop

### How was this patch tested?

add test

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#44074 from ulysses-you/post-planner.

Authored-by: ulysses-you <[email protected]>
Signed-off-by: youxiduo <[email protected]>
yaooqinn pushed a commit that referenced this pull request Feb 6, 2024
…rategy rules in SparkSessionExtensions

This pr is backport #44074 for branch-3.5 since 3.5 is a lts version

### What changes were proposed in this pull request?

This pr adds a new extension entrance `queryPostPlannerStrategyRules` in `SparkSessionExtensions`. It will be applied between plannerStrategy and queryStagePrepRules in AQE, so it can get the whole plan before injecting exchanges.

### Why are the changes needed?

3.5 is a lts version

### Does this PR introduce _any_ user-facing change?

no, only for develop

### How was this patch tested?

add test

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #44074 from ulysses-you/post-planner.

Authored-by: ulysses-you <ulyssesyou18gmail.com>

Closes #45037 from ulysses-you/SPARK-46170.

Authored-by: ulysses-you <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
turboFei pushed a commit to turboFei/spark that referenced this pull request Nov 6, 2025
…rategy rules in SparkSessionExtensions (apache#366)

This pr is backport apache#44074 for branch-3.5 since 3.5 is a lts version

### What changes were proposed in this pull request?

This pr adds a new extension entrance `queryPostPlannerStrategyRules` in `SparkSessionExtensions`. It will be applied between plannerStrategy and queryStagePrepRules in AQE, so it can get the whole plan before injecting exchanges.

### Why are the changes needed?

3.5 is a lts version

### Does this PR introduce _any_ user-facing change?

no, only for develop

### How was this patch tested?

add test

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#44074 from ulysses-you/post-planner.

Authored-by: ulysses-you <ulyssesyou18gmail.com>

Closes apache#45037 from ulysses-you/SPARK-46170.

Authored-by: ulysses-you <[email protected]>

Signed-off-by: Kent Yao <[email protected]>
Co-authored-by: ulysses-you <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants