-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-38085][SQL] DataSource V2: Handle DELETE commands for group-based sources #35395
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuite.scala
Outdated
Show resolved
Hide resolved
a7cf002 to
cac1449
Compare
|
The test failure seem unrelated. |
I see that there is a revert commit in the upstream e34d8ee |
cac1449 to
0be11b3
Compare
|
Alright, the tests are green and the PR is ready for a detailed review. |
...atalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsRowLevelOperations.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelCommand.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Command.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala
Outdated
Show resolved
Hide resolved
0be11b3 to
0b9165c
Compare
sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/RowLevelOperation.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/RowLevelOperationInfo.java
Outdated
Show resolved
Hide resolved
...core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
Outdated
Show resolved
Hide resolved
...cala/org/apache/spark/sql/execution/datasources/v2/OptimizeMetadataOnlyDeleteFromTable.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to push down the negated filter in the rewrite plan?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We actually have to prevent that (added the new rule to the list of rules that cannot be excluded).
Here is how a DELETE command may look like.
== Analyzed Logical Plan ==
DeleteFromTable (id#88 <= 1)
:- RelationV2[id#88, dep#89] cat.ns1.test_table
+- ReplaceData RelationV2[id#88, dep#89] cat.ns1.test_table
+- Filter NOT ((id#88 <= 1) <=> true)
+- RelationV2[id#88, dep#89, _partition#91] cat.ns1.test_table
The condition we should push down to the source is the DELETE condition (id < 1) (not the condition in the filter on top of the scan). Suppose we have a data source that can replace files. We have two files: File A contains IDs 1, 2, 3 and File B contains IDs 5, 6, 7. If we want to delete the record with ID = 1, we should push down the actual delete condition (ID = 1) for correct file pruning. Once the data source determines that only File A contains records to delete, we need to read the entire file and determine what records did not match the condition (that's what that filter on top of the scan is doing). Those records (IDs 2, 3 in our example) have to be written back to the data source as it can only replace files. That's why pushing the filter condition would actually be wrong and we have to prevent it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cause we need to push down the command condition, I couldn't use the existing rule. If anyone has any ideas on how to avoid a separate rule, I'll be glad to do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is okay. I think you could probably add a pushdown function in the existing pushdown class that uses the RewrittenRowLevelCommand matcher but returns the ScanBuilderHolder that is now used. But since pushdown for the row-level rewrite commands is so specific, I think it's probably more readable and maintainable over time to use a separate rule like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took another look at V2ScanRelationPushDown. I think we can make filter pushdown work there by adding separate branches for RewrittenRowLevelCommand but it does not seem to help. Instead, it would make the existing rule even more complicated. Apart from that, we also can't apply regular logic for aggregate pushdown as we have to look at the condition in the row-level operation. Essentially, we have to make sure that none of the logic in the existing rule work for row-level operations. At this point, I agree keeping a separate rule seems cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for separate rules. The other one is complicated to allow extra pushdown that isn't needed here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following previous question, how do we know if the data source can replace files? If it cannot, do we still should/need to push down the command filter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a data source does not support replacing groups, it won't extend SupportsRowLevelOperation and we will fail in the analyzer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The import looks a bit weird. I can do an aliased import if that's any better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would probably move DeleteFromTableWithFilters to a follow-up commit since it is an optimization and not needed for correctness.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, Spark can already plan filter-based DELETE today, so not supporting it would be a regression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan, DeleteFromTableWithFilters is an optimization for SupportsRowLevelOperations. Existing deletes with filters would be unaffected. That being said, I am going to combine the existing logic in DataSourceV2Strategy with the optimizer rule I added, like discussed here. That way, we will have the filter conversion logic just in one place. Let me know if you agree with that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This optimizer rule contains logic similar to what we have in DataSourceV2Strategy. However, it is done in the optimizer to avoid building Scan and Write if a DELETE operation can be handled using filters.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to mention why this always finds the read relation rather than constructing the RowLevelCommand with a hard reference to it. My understanding is that it may be changed by the optimizer. It could be removed based on the condition and there may be more than one depending on the planning for UPDATE queries. Is that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kept the minimum required logic for group-based deletes for now. You are right, this extractor will change to support UPDATE and delta-based sources. What about updating the description once we make those changes? For now, there will be exactly one read relation.
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/RowLevelOperationTable.scala
Outdated
Show resolved
Hide resolved
...t/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala
Outdated
Show resolved
Hide resolved
...t/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to merge RowLevelCommandScanRelationPushDown into V2ScanRelationPushDown so they're in one place?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keeping them separate for now as V2ScanRelationPushDown is already complicated and none of that logic applies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After looking at this more, I agree with this direction. There is no need to overcomplicate either case.
3245703 to
aa6121c
Compare
aa6121c to
b05f2c2
Compare
b05f2c2 to
0983997
Compare
|
@dongjoon-hyun @HyukjinKwon do you have any idea about what the GA workflow error is? |
|
Hi, @viirya . It is happening on multiple PRs. I don't think it's our issue. However, although I look at their status, there is no clue there either. |
|
Thanks @dongjoon-hyun. Then it's weird. Keeping an eye on it... |
|
FYI, this is the code path for the error. spark/.github/workflows/notify_test_workflow.yml Lines 110 to 112 in 7a6b989
|
@cloud-fan, I think discarding entire row groups is possible only for DELETEs when the whole condition was successfully translated into data source filters. This isn’t something we can support for other commands like UPDATE or when certain parts of the condition couldn’t be converted to a data source filter (e.g. subquery). A few points on my mind right now:
Technically, if we simply extend the scan builder API to indicate that the entire condition is being pushed down, it should be sufficient for data sources to discard entire row groups of deleted records. We already pass the SQL command and the condition. Data sources just don't know whether it is the entire condition and whether row groups can be discarded. |
|
|
||
| override lazy val isByName: Boolean = false | ||
| override lazy val references: AttributeSet = query.outputSet | ||
| override lazy val stringArgs: Iterator[Any] = Iterator(table, query, write) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: these can be val as they are just constants.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replaced isByName and stringArgs with val. Kept references lazy to avoid eagerly computing those.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
Outdated
Show resolved
Hide resolved
| // metadata columns may be needed to request a correct distribution or ordering | ||
| // but are not passed back to the data source during writes | ||
|
|
||
| table.skipSchemaResolution || (dataInput.size == table.output.size && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need to check this? the input query is built by spark and is directly reading the table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be redundant in case of DELETE but it will be required for UPDATE and MERGE when the incoming values no longer solely depend on what was read. This will prevent setting nullable values for non-nullable attributes, for instance.
| pushedFilters.right.get.mkString(", ") | ||
| } | ||
|
|
||
| val (scan, output) = PushDownUtils.pruneColumns(scanBuilder, relation, relation.output, Nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This means we don't do column pruning at all. We can make the code a bit simler
val scan = scanBuilder.scan
...
DataSourceV2ScanRelation(r, scan, r.output)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right we don't do column pruning but this makes sure metadata columns are projected. Otherwise, the scan would just report table attributes.
.../org/apache/spark/sql/execution/datasources/v2/GroupBasedRowLevelOperationScanPlanning.scala
Outdated
Show resolved
Hide resolved
| WriteToDataSourceV2(relation, microBatchWrite, newQuery, customMetrics) | ||
|
|
||
| case rd @ ReplaceData(r: DataSourceV2Relation, _, query, _, None) => | ||
| val rowSchema = StructType.fromAttributes(rd.dataInput) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we simply use rd.originalTable.output?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have to use dataInput as it will hold the correct nullability info for UPDATE and MERGE.
cloud-fan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did one more round of review and left a few more minor comments. Great job!
|
Thanks for reviewing, @cloud-fan! Could you take one more look? I either addressed comments or replied. I don't know what happened but the notify test workflow keeps failing for this PR and tests are not triggered. I tried updating the branch and reopening the PR. Did not work. |
|
Currently we can check PR test result by https://github.com/aokolnychyi/spark/actions/workflows/build_and_test.yml |
|
thanks, merging to master/3.3! |
…sed sources This PR contains changes to rewrite DELETE operations for V2 data sources that can replace groups of data (e.g. files, partitions). These changes are needed to support row-level operations in Spark per SPIP SPARK-35801. No. This PR comes with tests. Closes #35395 from aokolnychyi/spark-38085. Authored-by: Anton Okolnychyi <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 5a92ecc) Signed-off-by: Wenchen Fan <[email protected]>
|
Appreciate all the reviews, @cloud-fan @viirya @huaxingao @rdblue @sunchao @dongjoon-hyun! |
|
Thanks @aokolnychyi and all, great work! |

What changes were proposed in this pull request?
This PR contains changes to rewrite DELETE operations for V2 data sources that can replace groups of data (e.g. files, partitions).
Why are the changes needed?
These changes are needed to support row-level operations in Spark per SPIP SPARK-35801.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
This PR comes with tests.