-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-54136][SQL] Extract plan merging logic from MergeScalarSubqueries to PlanMerger
#52835
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-54136][SQL] Extract plan merging logic from MergeScalarSubqueries to PlanMerger
#52835
Conversation
…ries` to `PlanMerger`
| * - outputMap maps newPlan's attributes to mergedPlan's attributes | ||
| * Returns None if plans cannot be merged. | ||
| */ | ||
| private def tryMergePlans( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the first glance, this looks identical in terms of the code. Am I right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, tryMergePlans(), mapAttributes(), mergeNamedExpressions() and supportedAggregateMerge() are just copied without any changes.
| if (mergedPlan.merged) { | ||
| CTERelationDef( | ||
| createProject(header.plan.output, removeReferences(header.plan, cache)), | ||
| Project( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May I ask why this PR proposes to remove the existing createProject method and use Project(...) directly here? Is this better in a way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the current version of the rule createProject() is a small helper function and it is used only here, so I felt it doesn't need to be a separate function.
|
|
||
| final override val nodePatterns: Seq[TreePattern] = Seq(SCALAR_SUBQUERY_REFERENCE) | ||
|
|
||
| override def stringArgs: Iterator[Any] = Iterator(subqueryIndex, headerIndex, dataType, exprId.id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May I ask why we don't need to override stringArgs at all after this PR? Is this related to this PR's contribution or this was not used before this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ScalarSubqueryReference never shows up in any stringified plan. It's just a temporary object that is added and then removed when MergeScalarSubqueries runs so I think we don't need this.
|
|
||
| // Caching returns the index of the subquery in the cache and the index of scalar member in the | ||
| // "Header". | ||
| private def cacheSubquery(plan: LogicalPlan, cache: ArrayBuffer[Header]): (Int, Int) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this method removed simply?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Briefly, cacheSubquery() became PlanMerger.merge() with some modifications.
As PlanMerger.merge() is kind of an internal API now, which can be used by other rules. It returns MergeResult instead of the tuple after this refactor.
The Header objects, which contined the information about the merged plans became MergedPlans and can be accessed as PlanMerger.mergedPlans() by the rules.
The main difference between cacheSubquery() and the new PlanMerger.merge() is that I removed Header.references and the related safeguards like the if !references.contains(subqueryIndex) below.
Previously, we filled up Header.references and with nested subquery indices so as to avoid trying to merge a subquery with other subqueries that are nested into it.
E.g.
SELECT (
SELECT avg(a) FROM t WHERE c = ( -- subquery 1
SELECT sum(b) FROM t2) -- subquery 2 nested into subquery 1
)
)
Here it doesn't make sense to try merging the plan of subquery 1 and subquery 2 as the former contains the latter. The problem of trying to merge the 2 was not just it doesn't make sense, but in certain weird cases (see the example in SPARK-40618) the merge was successful and resulted in invalid plans.
But because PlanMerger should be a general plan merging tool and in some future rules it will be used to merge other than subqueries, tracking the nested subqueries in it is not the best way to deal with the problem.
This is why the refactored MergeScalarSubqueries uses a different approach. We have one PlanMerger object per each subquery level. E.g. subquery 2 is a level 0 (leaf) subquery, but subquery 1 is a level 1 subquery because it has an inner, level 0 subquery. So the 2 will be added to different PlanMergers. This way a level n subquery is tried to merge with other level n subqueries only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @cloud-fan and @dtenedor from the previous PR (mentioned in the PR description).
BTW, @peter-toth , given the characteristic of this PR, it seems that we need to target Apache Spark 4.2.0 only (master branch) instead of 4.1.0 (branch-4.1).
| val subquery5 = ScalarSubquery(testRelation.select((Symbol("a") + 2).as("a_plus2_2"))) | ||
| val subquery6 = ScalarSubquery(testRelation.select(Symbol("b").as("b_2"))) | ||
| val originalQuery = testRelation | ||
| .select( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This minor test chnage in the order of subqueries is needed because the modified MergeScalarSubqueries encounters them in different order. So we either change the expected query to expect a_plus1_2 and a_plus2_2 or reorder the subqueries in the query like here. I applied the latter.
|
cc @beliefer as well. The goal here is to extract the merging logic and reuse it in SPARK-44571 / #42223 and SPARK-43025 / #40661. |
|
If there is no objection I would like to merge this refactor by the end of this week. |
| * to reference the merged plan instead. | ||
| */ | ||
| case class MergeResult( | ||
| mergedPlan: LogicalPlan, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we use mergedPlan: MergedPlan to save the merged: Boolean parameter in this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea! Let me change this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, fixed in f6899a0.
| } | ||
|
|
||
| object PlanMerger { | ||
| def apply(): PlanMerger = new PlanMerger |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel it's better to explicitly write new PlanMerger() instead of PlanMerger()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in f6899a0.
|
Thank you for the review @cloud-fan and @dongjoon-hyun, really appreciated. Merged to |
What changes were proposed in this pull request?
This PR extracts the plan merging logic from
MergeScalarSubqueriestoPlanMergerso as to other rules can reuse it.While the plan merging logic is extracted without modification to
PlanMerger,MergeScalarSubqueriesrequired a significant adjustment. This is because SPARK-40618 / #38093 added subquery reference tracking so as to avoid trying to merge a subquery to any of its nested subqueries. This kind of reference trancking doesn't work well with a generalPlanMergerso this PR modifiesMergeScalarSubqueriesto use a separatePlanMergers by each subquery level.Why are the changes needed?
To be able to reuse plan merging logic.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing UTs.
Was this patch authored or co-authored using generative AI tooling?
Yes, Claude gave me suggestions to improve documentation.