-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-42040][SQL] SPJ: Introduce a new API for V2 input partition to report partition statistics #45314
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @sunchao @szehon-ho I try to help this task, Could you take a look? Thanks! |
sunchao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @aokolnychyi @RussellSpitzer @rdblue do you think this could be useful for Iceberg to pass partition stats to Spark? SPJ could leverage this to make better decisions on how to combine partitions (like which side to choose during partially clustered distribution), but I'm not sure whether there are more use cases.
| * @see org.apache.spark.sql.connector.read.SupportsReportPartitioning | ||
| * @since 4.0.0 | ||
| */ | ||
| public interface HasPartitionSize extends InputPartition { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can make this more general and support partition level stats as well, like number of rows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @sunchao for review, and this is a good suggestion, i check the Iceberg code, it includes the sizeBytes and estimatedRowsCount, filesCount. Let me address this!
@Override
default long sizeBytes() {
return tasks().stream().mapToLong(ScanTask::sizeBytes).sum();
}
@Override
default long estimatedRowsCount() {
return tasks().stream().mapToLong(ScanTask::estimatedRowsCount).sum();
}
@Override
default int filesCount() {
return tasks().stream().mapToInt(ScanTask::filesCount).sum();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed in latest PR.
sunchao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with one nit, cc @cloud-fan too
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/HasPartitionStatistics.java
Outdated
Show resolved
Hide resolved
| * @see org.apache.spark.sql.connector.read.SupportsReportPartitioning | ||
| * @since 4.0.0 | ||
| */ | ||
| public interface HasPartitionStatistics extends InputPartition { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about ReportStatisticsPartition ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @beliefer for review , because we use HasPartitionKey for the partition key, so i keep the name for HasPartitionStatistics, it is consistent for SPJ feature.
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/HasPartitionStatistics.java
Outdated
Show resolved
Hide resolved
| * Returns the value of the partition statistics associated to this partition. | ||
| */ | ||
| OptionalLong sizeInBytes(); | ||
| OptionalLong numRows(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhuqi-lucas could we add some comments for numRows and fileCount too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @sunchao for this suggestion, addressed latest PR.
| */ | ||
| public interface HasPartitionStatistics extends InputPartition { | ||
| /** | ||
| * Returns the value of the partition statistics associated to this partition. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I think this comment is not correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed in latest PR.
@sunchao Aside from picking the side of partially clustered distribution, would we also be able to use it to group smaller partitions? Example a table is partition by date, and older days have not much data (on both sides), group many of the older days into the same task. Similar to AQE coalesce partitions, but it looks like that applies only after shuffle, so looks like it doesnt apply for SPJ? |
| * Returns the size in bytes of the partition statistics associated to this partition. | ||
| */ | ||
| OptionalLong sizeInBytes(); | ||
| /** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: can we add some newline between method and next line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @szehon-ho for review, addressed in latest PR.
Yea I I think that would be an interesting use case. If we know the partitions from both sides of the join AND the size for each partition, we can probably make some better decisions.
Right, this doesn't to SPJ. |
|
Thanks, merged to master! |
… report partition statistics ### What changes were proposed in this pull request? This PR introduces a new V2 min-in HasPartitionStatistics which can be used to return the partition statistics of a InputPartition. ### Why are the changes needed? As part of the Storage Partitioned Join work ([SPIP](https://issues.apache.org/jira/browse/SPARK-37166)), we'll need to introduce a way for a V2 InputPartition to return its partition size, It's useful for a InputPartition to also report its size (in bytes), so that Spark can use the info to decide whether partition grouping should be applied or not. This will be used later in follow-up PRs. ### Does this PR introduce _any_ user-facing change? Yes, a new V2 mix-in HasPartitionStatistics will be introduced. ### How was this patch tested? Extended InMemoryTable to support this new interface, and added a new unit test to verify the API, and the mock the partition statistics to test. ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#45314 from zhuqi-lucas/SPARK-42040. Authored-by: qzhu <[email protected]> Signed-off-by: Your Name <[email protected]>
What changes were proposed in this pull request?
This PR introduces a new V2 min-in HasPartitionStatistics which can be used to return the partition statistics of a InputPartition.
Why are the changes needed?
As part of the Storage Partitioned Join work (SPIP), we'll need to introduce a way for a V2 InputPartition to return its partition size, It's useful for a InputPartition to also report its size (in bytes), so that Spark can use the info to decide whether partition grouping should be applied or not.
This will be used later in follow-up PRs.
Does this PR introduce any user-facing change?
Yes, a new V2 mix-in HasPartitionStatistics will be introduced.
How was this patch tested?
Extended InMemoryTable to support this new interface, and added a new unit test to verify the API, and the mock the partition statistics to test.
Was this patch authored or co-authored using generative AI tooling?
no