-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-35779][SQL] Dynamic filtering for Data Source V2 #32921
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thank you for making a PR, @aokolnychyi ! |
|
cc @wangyum - this may be related to the runtime filtering that you guys are working on. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The design doc has alternative ways to represent dynamic filters. It would be great to get feedback on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ds v2 API prefers to use the native data classes in Spark, e.g. InternalRow, UTF8String, etc. However, we keep using the v1 Filter API which uses external data classes. Shall we consider adding a v2 Filter API which uses v2 Expression and native data classes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that would be best, @cloud-fan. Has there been any discussion on how the new API should look like? Since the old API has been exposed in SupportsPushDownFilters, what is the plan for introducing the new API? Will we introduce a new method with a default implementation that would translate into to the old API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it alright with everybody to consider v1 filters in the scope of this PR? I'll take over @dbtsai's PR later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we target this PR for 3.3, then I'm fine to use v1 Filter here and replace it with v2 Filter later, as there is plenty of time. Otherwise, I'd like to have v2 Filter first, to avoid releasing this API with v1 Filter and breaking it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this should use v1 filters. The new filters don't exist yet and the DSv2 API uses v1 in other places. There is no need to block this on adding v2 filters. If the release for this is the same for v2 filters, we can consider removing support. But at this point I think we should not assume that will happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since dynamic filtering can provide substantial performance improvements for v2 tables, I'd love to get this feature into 3.2. As we already use v1 filters in other Data Source V2 interfaces, I feel it should be alright to use them here too.
As I noted above, we don't really have to break this API once we have v2 filters. We can follow whatever we decide to do with SupportsPushDownFilters: introduce a separate interface or just add a method to the existing interface with a default implementation that would convert v2 filters into v1 filters.
Does this seem reasonable, @cloud-fan?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we target this PR for 3.3, then I'm fine to use v1 Filter here and replace it with v2 Filter later, as there is plenty of time. Otherwise, I'd like to have v2 Filter first, to avoid releasing this API with v1 Filter and breaking it later.
Do we plan to remove v1 Filter soon? Otherwise, we still can keep v1 Filter support in this API even we decide to add v2 Filter later. So it seems that we don't actually break it quickly (at least not in next release).
So basically seems we don't need to block this for that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to implement stats as tests rely on them.
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
Outdated
Show resolved
Hide resolved
|
cc @huaxingao @dongjoon-hyun @sunchao @cloud-fan @maryannxue @viirya @rdblue @HyukjinKwon It would be great to hear your feedback on this WIP PR. Some tests are expected to fail as we don't support ANALYZE statements for v2 tables yet. |
|
Kubernetes integration test starting |
sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
Outdated
Show resolved
Hide resolved
|
Kubernetes integration test status success |
|
Thanks @aokolnychyi for the PR! |
|
Test build #139828 has finished for PR 32921 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
Outdated
Show resolved
Hide resolved
04ae0e3 to
202be14
Compare
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #139835 has finished for PR 32921 at commit
|
|
Test build #139834 has finished for PR 32921 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to mention related configs for this to kick in? e.g., spark.sql.optimizer.dynamicPartitionPruning.enabled, spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly? in general I think we should link this to DPP since it can be used for that purpose in future.
Does the V2 MergeInto use case allow this to be optional and controlled via existing DPP flags?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should just say that Spark will push runtime filters if they are beneficial. No need to mention too many details. e.g. in the future spark may have a more advanced cost model to decide to push down runtime filters or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. Will add the details.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree with Wenchen. It is better to just state that Spark may use this to further refine the filters.
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if some attributes in scan.filterAttributes cannot be resolved? should we skip and continue?
Also you may want to use DataSourceV2ScanRelation.relation since the output could be pruned by column pruning. It seems we currently run PartitionPruning after V2ScanRelationPushDown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on skipping rather than failing if we cannot resolve the filter attrs.
W.r.t. which columns to use, I did use the scan output to resolve on purpose as we cannot derive a filter on an attribute that hasn't been projected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sunchao, it looks like we always fail if partition columns cannot be resolved in the branch above this one. While it does seem safer not to fail the query if a filter attribute cannot be resolved, shall we be consistent with v1 tables? What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, SGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we might need to update the method name and doc because they are no longer accurate.
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This means data source will need to plan input partitions twice - not sure if it could be expensive. Another idea is Spark provide the input partitions and ask data source to filter on top of them, like:
InputPartition[] filter(Filter[] filters, InputPartition[] partitions);
but I guess this may be too restrictive, so feel free to ignore.
Also may worth checking whether the original input partitions is used for making decisions before it's updated to filteredPartitions (and whether that would change the original decision). I can see that supportsColumnar is using it but not sure if there are other places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I guess for scan implementation, if it can support this filtering, maybe it can cache original input partitions and the second planning is cheaper?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya is correct. Usually, there is no second planning. Instead, existing input partitions that has been already planned are filtered using dynamic filters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. In that case maybe it's useful to make this a bit more clear for data source implementors (I'm not sure if there's enough signal for them that planInputPartitions will be called twice).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a note on caching state to Scan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the TODO items, could you file a JIRA issue and make a IDed TODO like TODO(SPARK-XXX)? Otherwise, it's difficult to be picked up by the other contributors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do. I am not sure about this point so we will need to discuss it a little bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the TODO for now. It is not a blocker, we can reconsider that separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created SPARK-35900 to think about in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think we can spin off this test case expanding contributions? It looks like we can merge this independently first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: runtime filter is a better name I think, as the filter is generated after the query compilation phase, and query runtime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for the naming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did use dynamicXXX as it is common throughout the code but SupportsRuntimeFiltering does sound more accurate. I'll update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the proposal is not limited to only partition pruning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, we do filter input partitions but the filtering can be using a metadata column (e.g. file name).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya, InputPartition is referring to Spark's partition, not a storage partition. These are actually tasks.
This is another reason why Wenchen's suggestion is a good one. No need to mention what gets filtered or imply that you should produce InputPartition instances and then filter those. This only needs to state that additional filters may be added through this interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recall we cannot change partition arbitrarily for streaming case due to stateful tasks. So I'm wondering if the dynamic filtering applies for streaming scan?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it works for streaming plans right now. Shall I just refer to toBatch in the doc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should only document supported case. Otherwise it might mislead developers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. I'll update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: @param filters the data source filter expressions used to dynamically filter the scan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I guess for scan implementation, if it can support this filtering, maybe it can cache original input partitions and the second planning is cheaper?
sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
Outdated
Show resolved
Hide resolved
|
Thanks for the initial review, @sunchao @dongjoon-hyun @cloud-fan @viirya! There are a couple of points like here and here that I'd like to discuss before updating the PR. |
|
Right now, we don't have a dedicated phase for executing DPP subqueries. They are treated like normal subqueries and are executed right before we execute the main query. Let's think about non-AQE first. We need to run EnsureRequirements after DPP in case the output partitioning changes. And we need to execute the DPP subqueries first. Before that, we need to optimize the main query and apply exchange/subquery reuse first. That said, I think we should execute DPP subqueries after the query plan is fully optimized and ready to execute. For safety I think we should run the rule that triggers DPP subquery execution and apply DS v2 pushdown after all the existing physical rules are run. i.e. AQE would be more complicated as the fully optimized query plan is only available at the query stage optimization phase, where it's not allowed to change stage boundaries anymore. I agree that it's better to allow the v2 source to change its output partitioning after runtime filter pushdown, but I'm not quite sure we should allow it if it introduces extra shuffles. The cost of extra shuffles can be large. I think we can simplify the design if we don't allow runtime filter pushdown to introduce extra shuffles. Spark can give v2 source both the runtime filter and the required distribution, so that the v2 source can handle it properly and change output partitioning as long as it can still satisfy the required distribution. |
|
|
||
| override lazy val inputRDD: RDD[InternalRow] = { | ||
| new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar, customMetrics) | ||
| if (filteredPartitions.isEmpty && outputPartitioning == SinglePartition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this possible if we already check the number of partition in originalPartitioning must match new partition number?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We check the number of partitions before and after filtering match only if the source reported a specific partitioning through SupportsReportPartitioning. Only in that case we have DataSourcePartitioning. This situation, on the other hand, can happen if we inferred SinglePartition but the source did not report anything.
viirya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks okay. Is there any concern or blocking comment for this?
This is not WIP PR anymore. @aokolnychyi Could you update the description? Thanks. |
|
@viirya, missed to update the PR description when updated the title. Done. |
|
Thanks @aokolnychyi! I am not sure if we still can merge this in after branch cut? If not, maybe we can have this in first, if there is no major comments/concerns, and continue to address minor comments later before release? Any idea? @dongjoon-hyun @sunchao @cloud-fan @dbtsai @holdenk @rdblue? |
|
also cc @gengliangwang |
|
This PR looks good to me now. Also curious if this can be merged after branch-cut. It'd also be great if @cloud-fan can take one more look. |
|
+1 to merge it as it now if there is no major issue, and we can work on the followup later to reduce the scope. |
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
Show resolved
Hide resolved
| protected[sql] def translateRuntimeFilter(expr: Expression): Option[Filter] = expr match { | ||
| case in @ InSubqueryExec(e @ PushableColumnAndNestedColumn(name), _, _, _) => | ||
| val values = in.values().getOrElse { | ||
| throw new AnalysisException(s"Can't translate $in to source filter, no subquery result") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we throw IllegalStateException? Seems only bug can lead to this branch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will only translate the runtime filter when we executing the physical plan, and at that time subqueries must be all evaluated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I'll switch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
cloud-fan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except for two small comments. The new solution is much simpler!
|
Thank you, @cloud-fan! |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
In GA, all tests are passed. Only "Hadoop 2 build with SBT" failed which seems unrelated: @aokolnychyi and I verified ran "Hadoop 2 build with SBT" locally and it worked. So it seems a flaky issue only on GA. I don't want to block branch cut too long so going to merge this now. If we see any error in Jenkins later, we can address them quickly. |
|
Thanks @aokolnychyi for this work and all for the review! Merging to master! |
|
Thanks for reviewing, @viirya @cloud-fan @sunchao @rdblue @dongjoon-hyun @holdenk! |
|
Test build #140535 has finished for PR 32921 at commit
|
|
Nice feature for users ! But I have a little doubt about it. |
### What changes were proposed in this pull request? Use V2 Filter in run time filtering for V2 Table ### Why are the changes needed? We should use V2 Filter in DS V2. #32921 (comment) ### Does this PR introduce _any_ user-facing change? Yes new interface `SupportsRuntimeV2Filtering` ### How was this patch tested? new test suite Closes #36918 from huaxingao/v2filtering. Authored-by: huaxingao <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
|
Hey @aokolnychyi We are trying to use spark datasourceV2 and noticed that the spark v2 built-in data sources (eg parquet one, looking at ParquetScan) don't support this (SupportsRuntimeFiltering nor SupportsRuntimeV2Filtering) by default, creating a large performance difference between using v1 and v2 datasource ootb. Is there a plan to have them support this? It would be really beneficial for the file scans to be able to do this and given they already benefit of some push downs we were wondering why the runtime filtering is not implemented. Or maybe I am missing something? And in that case it would be great to understand how to have spark file sources take advantage of dpp. Thanks! |
|
Hi, @LorenzoMartini! I am not sure how much |
|
Hey @aokolnychyi thank you for the answer. I see that those sources have special optimizations. However we do have instances of data transformations being incredibly slower using spark's v2 datasources and the only difference in the query plans compared to those same transformation ran using v1 datasources is the absence of the dynamic pruning expressions. Do you have any suggestions on how to improve those use cases if not trying to implement |
What changes were proposed in this pull request?
This PR implemented the proposal per design doc for SPARK-35779.
Why are the changes needed?
Spark supports dynamic partition filtering that enables reusing parts of the query to skip unnecessary partitions in the larger table during joins. This optimization has proven to be beneficial for star-schema queries which are common in the industry. Unfortunately, dynamic pruning is currently limited to partition pruning during joins and is only supported for built-in v1 sources. As more and more Spark users migrate to Data Source V2, it is important to generalize dynamic filtering and expose it to all v2 connectors.
Please, see the design doc for more information on this effort.
Does this PR introduce any user-facing change?
Yes, this PR adds a new optional mix-in interface for
Scanin Data Source V2.How was this patch tested?
This PR comes with tests.