-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-45036][FOLLOWUP][SQL] SPJ: Make sure result partitions are sorted according to partition values #42839
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
.../src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
Outdated
Show resolved
Hide resolved
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you re-trigger the CI again, @sunchao ?
| val sortedKeyToPartitions = results.sorted(partitionOrdering) | ||
| val groupedPartitions = sortedKeyToPartitions | ||
| val rowOrdering = RowOrdering.createNaturalAscendingOrdering(partitionDataTypes) | ||
| val sortedKeyToPartitions = results.sorted(rowOrdering.on(_._1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| val sortedKeyToPartitions = results.sorted(rowOrdering.on(_._1)) | |
| val sortedKeyToPartitions = results.sorted(rowOrdering.on((t: (InternalRow, _)) => t._1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To fix Scala 2.13 build
[error] /home/runner/work/spark/spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala:147:67: missing parameter type for expanded function ((<x$7: error>) => x$7._1)
[error] val sortedKeyToPartitions = results.sorted(rowOrdering.on(_._1))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops didn't realize it doesn't compile this way.
| .groupBy(_._1) | ||
| .toSeq | ||
| .map { case (key, s) => KeyGroupedPartition(key.row, s.map(_._2)) } | ||
| .sorted(rowOrdering.on(_.value)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| .sorted(rowOrdering.on(_.value)) | |
| .sorted(rowOrdering.on((k: KeyGroupedPartition) => k.value)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks!
LuciferYang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
Merged into master. Thanks @sunchao @viirya @dongjoon-hyun @Hisoka-X |
|
Thank you, all! |
…ted according to partition values ### What changes were proposed in this pull request? This PR makes sure the result grouped partitions from `DataSourceV2ScanExec#groupPartitions` are sorted according to the partition values. Previously in the apache#42757 we were assuming Scala would preserve the input ordering but apparently that's not the case. ### Why are the changes needed? See apache#42757 (comment) for diagnosis. The partition ordering is a fundamental property for SPJ and thus must be guaranteed. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? We have tests in `KeyGroupedPartitioningSuite` to cover this. ### Was this patch authored or co-authored using generative AI tooling? Closes apache#42839 from sunchao/SPARK-45036-followup. Authored-by: Chao Sun <[email protected]> Signed-off-by: yangjie01 <[email protected]>
What changes were proposed in this pull request?
This PR makes sure the result grouped partitions from
DataSourceV2ScanExec#groupPartitionsare sorted according to the partition values. Previously in the #42757 we were assuming Scala would preserve the input ordering but apparently that's not the case.Why are the changes needed?
See #42757 (comment) for diagnosis. The partition ordering is a fundamental property for SPJ and thus must be guaranteed.
Does this PR introduce any user-facing change?
No
How was this patch tested?
We have tests in
KeyGroupedPartitioningSuiteto cover this.Was this patch authored or co-authored using generative AI tooling?