-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or Paging (Sort with group column) #37238
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
ping @huaxingao cc @cloud-fan |
679e705 to
f620009
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per SortOrder(attr: AttributeReference..., it's always AttributeReference. Should it address Alias?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. If users specify an Alias for group column. It will be Alias(alias: Alias, _).
|
ping @cloud-fan |
|
I think the key part here is how to get the original group by column name. If we simply allow top n pushdown, it actually works fine, but the column name is incorrect like |
PR description updated. |
| private def findGroupColumn(alias: Alias): Option[AttributeReference] = alias match { | ||
| case alias @ Alias(attr: AttributeReference, name) if attr.name.startsWith("group_col_") => | ||
| Some(AttributeReference(name, attr.dataType)(alias.exprId)) | ||
| case Alias(alias: Alias, _) => findGroupColumn(alias) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel it's a bit hacky to assume the Alias contains the actual grouping columns. How about we generate the name mapping (grouping attribute to actual group column name) during agg pushdown, put the name mapping in ScanBuilderHolder, and use the mapping to rewrite order by expression during limit pushdown?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
|
Because #37320 merged, I will close this PR. |
What changes were proposed in this pull request?
Currently, DS V2 aggregate push-down cannot work with DS V2 Top N push-down (order by ... limit ...) or DS V2 Paging push-down (order by ... limit ... offset ...).
If we can push down aggregate with Top N or Paging, it will be better performance.
This PR only let aggregate pushed down with ORDER BY column which must be GROUP BY column.
The idea of this PR are:
pushedAggregateis defined, it tell me we may push down aggregate with Top N or Paging.This PR have a key part which is how to get the original group by column name.
For lazily build the
Scan, the code show below give an expectation output ofScanBuilderHolder`.spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
Line 198 in 55c3347
Then the aggregate pushdown will construct an
Aliasfor the group by columns show below.spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
Line 226 in 55c3347
or
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
Line 279 in 55c3347
So, if we want find out the original group by column, need two steps.
First step, use
findGroupColumnto find out theAliasused for attribute starts withgroup_col_. As you know, the name ofAliasmay be the origin column name.Second step, check the attribute looked from first step if it is the origin column by
sHolder.relation.output.exists(out => out.semanticEquals(groupCol).Third step, recreate the
SortOrderwith the origin column.Why are the changes needed?
Let DS V2 aggregate push down can work with Top N or Paging (Sort with group column), then users can get the better performance.
Does this PR introduce any user-facing change?
'No'.
New feature.
How was this patch tested?
New test cases.