-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands #36304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@cloud-fan, here is one way to achieve proper group filtering in row-level operations. While I don't target 3.3 with this functionality, I believe it is still important to finish this before 3.3 is officially out. That way, we still have a chance to change the row-level API if needed before it gets released. Currently, it is fully backward compatible. cc @rdblue @huaxingao @sunchao @viirya @dongjoon-hyun Some tests are expected to fail due to SPARK-38977. I have another PR #36303 to fix this. |
sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/RowLevelOperation.java
Outdated
Show resolved
Hide resolved
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
|
I want to resume working on this PR but I need feedback on one point. In the original implementation, @cloud-fan and I discussed supporting a separate scan builder for runtime group filtering in row-level operations. That way, we can prune columns and push down filters while looking for groups that have matches. We can't do that in the main row-level scan for group-based data sources as non-matching records in matching groups have to be copied over. See PR #35395 for context. The current idea is to have an optimizer rule that would check if the main scan implements The only challenge is ensuring the same version of the table is scanned in the main row-level scan and in the scan that searches for matching groups to rewrite. There are multiple solutions to consider. Option 1 The first option is shown in this PR. We can add a new method to Under this implementation, it is up to data sources to ensure the same version is scanned in both scans. It is a fairly simple approach but it complicates the row-level API. On top, the new method is useless for data sources that can handle a delta of rows. Option 2 The main row-level Option 3 The rule that assigns a runtime group filter has access to the original If we can somehow benefit from reusing Any ideas how to make Option 3 work? cc @cloud-fan @rdblue @huaxingao @dongjoon-hyun @sunchao @viirya |
|
I talked with @aokolnychyi about this and I think this is a data source problem, not something Spark should track right now. The main problem is that some table sources have different versions and that's not something that we're used to handling. Data sources that don't have different versions are not affected, so option 1 is not great because it forces everyone to deal with a problem only few sources have. Spark could use option 2 and track this itself, but that complicates the API as well and we don't know that we need it yet. If we do add version/history to Spark then we'd probably want to add We've also found a reliable way for option 3 to work. The underlying table instance is the same, so the filter method just needs to check that the table instance has not been refreshed or modified when the runtime filter is applied to it. I think that option 3 is the simplest approach in terms of new Spark APIs (none!) and is the right way forward until Spark decides to model tables with multiple versions. |
|
I agree, @rdblue. Given that versioned data sources can handle it internally (as the same |
…row-level commands
4f77298 to
6708f2a
Compare
| .createWithDefault(67108864L) | ||
|
|
||
| val RUNTIME_ROW_LEVEL_OPERATION_GROUP_FILTER_ENABLED = | ||
| buildConf("spark.sql.optimizer.runtime.rowLevelOperationGroupFilter.enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went back and forth on the name. On one hand, we have dynamic partition pruning. On the other hand, we call it runtime filtering in DS V2. Ideas are welcome.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also used the spark.sql.optimizer.runtime prefix like for runtime Bloom filter joins. There are other runtime-related configs that don't use this prefix so let me know the correct config namespace.
| override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { | ||
| // apply special dynamic filtering only for group-based row-level operations | ||
| case GroupBasedRowLevelOperation(replaceData, cond, | ||
| DataSourceV2ScanRelation(_, scan: SupportsRuntimeV2Filtering, _, _, _)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the optimizer rule that checks whether the primary row-level scan supports runtime filtering. As long as a data source implements SupportsRuntimeV2Filtering, it should be enough to benefit from the new functionality.
Also, the runtime group filter uses the existing framework for runtime filtering in DS V2, meaning we get all the benefits like subquery reuse, etc.
| // use the original table instance that was loaded for this row-level operation | ||
| // in order to leverage a regular batch scan in the group filter query | ||
| val originalTable = r.relation.table.asRowLevelOperationTable.table | ||
| val relation = r.relation.copy(table = originalTable) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We build DataSourceV2Relation so the scan can prune columns and push filters into groups.
|
@cloud-fan @rdblue @huaxingao @dongjoon-hyun @sunchao @viirya, I've updated this PR and it should be ready for a detailed review round whenever you have a minute. |
| } | ||
|
|
||
| class GroupBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase | ||
| class GroupBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you create GroupBasedDeleteFromTableSuite.scala?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjoon-hyun, do you mean move this class into its own file? I can surely do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes~
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I also renamed the original file as DeleteFromTableSuiteBase is the only class now.
| "discard groups that don't have to be rewritten.") | ||
| .version("3.4.0") | ||
| .booleanConf | ||
| .createWithDefault(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with starting with true in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for AS-IS implementation (with only minor comments). Thank you, @aokolnychyi and @rdblue .
|
+1, this PR looks good to me. |
| properties: util.Map[String, String]) | ||
| extends InMemoryTable(name, schema, partitioning, properties) with SupportsRowLevelOperations { | ||
|
|
||
| var replacedPartitions: Seq[Seq[Any]] = Seq.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a comment to mention this is for test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment above.
|
Merged to master for Apache Spark 3.4.0. Also, cc @cloud-fan |
|
Thank you for reviewing, @dongjoon-hyun @viirya @huaxingao @rdblue! Also, thanks @cloud-fan for the original discussion we had around this feature. |
| * | ||
| * Note this rule only applies to group-based row-level operations. | ||
| */ | ||
| case class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPlan]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to pass the rule as a parameter? Can't we call OptimizeSubqueries directly in this rule?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also thought about this, but I think it's hard to reference OptimizeSubqueries outside Optimizer, since the former is more like a "inner class" of the latter, and references the current instance of latter in itself (i.e.: Optimizer.this.execute(Subquery.fromExpression(s)))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sunchao is correct. It wasn't easy to call OptimizeSubqueries outside Optimizer. Hence, I had to come up with this workaround.
@cloud-fan, I also considered simply adding OptimizeSubqueries to the batch with runtime partition filtering. However, SPARK-36444 specifically removed it from there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An alternative idea could be to move OptimizeSubqueries into its own file. However, that's tricky too as it calls the optimizer.
Optimizer.this.execute(Subquery.fromExpression(s))
| case r: DataSourceV2Relation if r eq relation => | ||
| val oldOutput = r.output | ||
| val newOutput = oldOutput.map(_.newInstance()) | ||
| r.copy(output = newOutput) -> oldOutput.zip(newOutput) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
val newRelation = r.newInstance
newRelation -> r.output.zip(newRelation.output)
| } | ||
|
|
||
| // optimize subqueries to rewrite them as joins and trigger job planning | ||
| replaceData.copy(query = optimizeSubqueries(newQuery)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean we revert what we did in RewriteDeleteFromTable before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really, @cloud-fan. This rule simply attaches a runtime filter to the plan that was created while rewriting the delete. We do replace the query but it is pretty much the same plan, just with an extra runtime filter.
|
This is much simpler than I expected, great design! Sorry for the late review. |
| Batch("PartitionPruning", Once, | ||
| PartitionPruning) :+ | ||
| PartitionPruning, | ||
| RowLevelOperationRuntimeGroupFiltering(OptimizeSubqueries)) :+ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think another idea is to run OptimizeSubqueries in this batch:
PartitionPruning,
RowLevelOperationRuntimeGroupFiltering,
// The rule above may create subqueries, need to optimize them.
OptimizeSubqueries
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be much cleaner but SPARK-36444 removed OptimizeSubqueries from that batch.
|
I will be off until next Monday. I'll address the comments then. Thanks for taking a look, @cloud-fan! |
|
Still remember about following up on this and another PR. Slowly getting there. |
…untimeGroupFiltering ### What changes were proposed in this pull request? This PR is to address the feedback on PR #36304 after that change was merged. ### Why are the changes needed? These changes are needed for better code quality. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes #38526 from aokolnychyi/spark-38959-follow-up. Authored-by: aokolnychyi <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…d optimize subqueries ### What changes were proposed in this pull request? This is a followup to #36304 to simplify `RowLevelOperationRuntimeGroupFiltering`. It does 3 things: 1. run `OptimizeSubqueries` in the batch `PartitionPruning`, so that `RowLevelOperationRuntimeGroupFiltering` does not need to invoke it manually. 2. skip dpp subquery in `OptimizeSubqueries`, to avoid the issue fixed by #33664 3. `RowLevelOperationRuntimeGroupFiltering` creates `InSubquery` instead of `DynamicPruningSubquery`, so that it can be optimized by `OptimizeSubqueries` later. This also avoids unnecessary planning overhead of `DynamicPruningSubquery`, as there is no join and we can only run it as a subquery. ### Why are the changes needed? code simplification ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes #38557 from cloud-fan/help. Lead-authored-by: Wenchen Fan <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…untimeGroupFiltering ### What changes were proposed in this pull request? This PR is to address the feedback on PR apache#36304 after that change was merged. ### Why are the changes needed? These changes are needed for better code quality. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes apache#38526 from aokolnychyi/spark-38959-follow-up. Authored-by: aokolnychyi <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…d optimize subqueries ### What changes were proposed in this pull request? This is a followup to apache#36304 to simplify `RowLevelOperationRuntimeGroupFiltering`. It does 3 things: 1. run `OptimizeSubqueries` in the batch `PartitionPruning`, so that `RowLevelOperationRuntimeGroupFiltering` does not need to invoke it manually. 2. skip dpp subquery in `OptimizeSubqueries`, to avoid the issue fixed by apache#33664 3. `RowLevelOperationRuntimeGroupFiltering` creates `InSubquery` instead of `DynamicPruningSubquery`, so that it can be optimized by `OptimizeSubqueries` later. This also avoids unnecessary planning overhead of `DynamicPruningSubquery`, as there is no join and we can only run it as a subquery. ### Why are the changes needed? code simplification ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes apache#38557 from cloud-fan/help. Lead-authored-by: Wenchen Fan <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
This PR adds runtime group filtering for group-based row-level operations.
Why are the changes needed?
These changes are needed to avoid rewriting unnecessary groups as the data skipping during job planning is limited and can still report false positive groups to rewrite.
Does this PR introduce any user-facing change?
This PR leverages existing APIs.
How was this patch tested?
This PR comes with tests.