-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-24253][SQL] Add DeleteSupport mix-in for DataSourceV2. #21308
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This comment has been minimized.
This comment has been minimized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does putting the delete method here (as opposed to say, in DataDeleters on some other thing parallel to to the DataWriters) imply that this is a driver-side operation only? I understand the use case is deleting partitions which is usually only a file system operation, but will that always be the case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is a driver-side operation. That's why the source can reject the delete. Anything that requires a parallel operation should really be implemented as read, filter, and replace data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it would be more clear if this were explicitly a driver-side operation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it would be more clear if this were explicitly a driver-side operation?
Possibly. Maybe in the big data world this is already obvious. To me, it looks like a general purpose delete. Maybe deletePartitions? (I am bad at naming things, however).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There aren't necessarily partitions in these data sources, so I wouldn't add partitions to the method name. I think we can make this more clear with better docs though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: is this a duplicate of the above paragraph
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, these are distinct.
UnsupportedOperationException indicates that the source doesn't understand a filter. For example, it could be date(ts) = '2018-05-13' and the source doesn't support the conversion from timestamp to date.
IllegalArgumentException is thrown when the expression is understood by the source, but the work required to perform the delete is not supported. For example, if you have data partitioned by hour(ts) and the delete expression is ts > '2018-05-13T00:05:00' and ts < '2018-05-13T00:10:00'. Deleting a 5-minute window when data is partitioned by hour probably isn't possible without rewriting data files, so the source can reject it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After updating this to use Filter, the UnsupportedOperationException is no longer needed, so I removed it. That should also cut down on the confusion here.
|
#21888 shows how this is used to implement DELETE FROM. |
ffbd3cb to
db77b9a
Compare
|
@rxin, I've updated this API to use cc @cloud-fan |
db77b9a to
e32e6c4
Compare
This comment has been minimized.
This comment has been minimized.
|
Test build #94818 has finished for PR 21308 at commit
|
|
I am assuming this API was intended to support the "drop partition" use-case. I'm arguing that adding and deleting partitions deal with a concept that is a slightly higher concept than just a bunch of records that match a filter. Backing up this fact is the concept that partitions are defined independently of any records they may or may not contain - You can add an empty partition and the underlying state of the system will change. Also - as an end user I would be very upset if I meant to drop a partition, but because of a transcription error accidentally started a delete process with a filter that didn't directly match a partition definition that takes a million times as long to execute. Partitions are an implementation optimisation that has leaked into higher level APIs because they are an extremely useful and performant implementation optimisation. I am wondering if we should represent them in this API as something slightly more higher level then just a filter definition. |
|
@tigerquoll, what we come up with needs to work across a variety of data sources, including those like JDBC that can delete at a lower granularity than partition. For Hive tables, the partition columns are exposed directly, so users would supply a predicate that matches partition columns. A Hive table source would also be free to reject delete requests -- by throwing the documented exception -- that would require rewriting data. These avoid the case that you're talking about because the predicate must match entire partitions, the source can reject predicates on non-partition columns, or could reject predicates that can't be cleanly deleted with a metadata operation. |
|
@rdblue what about those data sources that support record deletion and partition dropping as two semantically different operations - Kudu and Hbase being two examples. All systems that support partitions have a different api for dealing with partition level ops. Even file based table storage systems support the different levels of manipulation. (look at the sql DDL that impala supports for parquet partition for an example - they use a filter, but the command is “this partition op applies to the partiton that is defined by this filter”, not “apply this op to all records that match this filter)” The difference is subtle, but it is an important one, and every system that supports partitions enforces that difference for a reason. |
|
@tigerquoll, there is currently no support to expose partitions through the v2 API. That would be a different operation. If you wanted to implement partition operations through this API, then you would need to follow the guarantees specified here: if you need to delete by partition, then the expression must match records at partition boundaries or reject the delete operation otherwise. |
|
@rdblue I think our debate is whether we should expose an API to represent direct operations on partitions in the new datasource api. |
|
@tigerquoll, I'm not debating whether we should or shouldn't expose partitions here. In general, I'm undecided. I don't think that the API proposed here needs to support a first-class partition concept for tables, largely because partitions aren't currently exposed in the v2 API. The issue you linked to, SPARK-22389, exposes Spark's view of partitioning -- as in |
|
@rdblue when you say "you don't think the API proposed here needs to support a first-class partition concept", are you referring to the "DeleteSupport" Interface, or to DataSourceV2 in general?
|
| * @param filters filter expressions, used to select rows to delete when all expressions match | ||
| * @throws IllegalArgumentException If the delete is rejected due to required effort | ||
| */ | ||
| void deleteWhere(Filter[] filters); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems different from what we discussed in the dev list about the new abstraction. I expect to see
Write newDeleteWrite(Filter[] filters);
Do I miss something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it's a little unclear: this delete is not a write. It is a driver-side operation using table metadata, like dropping matching partitions in a Hive table or dropping matching files in an Iceberg table. That way, there are no tasks and we don't need to use the commit protocol.
If we want to filter data files, the overwrite API I've proposed is the right way to do it. Spark could read, filter the rows, and replace all of the files that were read.
If there are files that have both rows that should be removed and rows that should be kept, the source should throw IllegalArgumentException to reject the delete.
|
@tigerquoll, I'm talking about the DataSourceV2 API in general. I'm not sure if I think there is value in exposing partitions, but I'd be happy to hear why you think they are valuable and think through how it would fix with the existing API. I think that partitions that aren't hidden make tables much harder for users to work with, which is why Iceberg hides partitioning and automatically translates from row filters to partition filters. For Kudu, maybe it is different. Could you write up the use case with a bit more context about what empty partitions are used for, and send it to the dev list? If we think that the v2 API should expose a partition concept, then that would definitely include a way to add or drop partitions. |
|
The DELETE support is already merged, closing this |
What changes were proposed in this pull request?
Adds
DeleteSupportmix-in forDataSourceV2. This mix-in provides a method to delete data with catalyst expressions in support ofdelete fromand overwrite logical operations.How was this patch tested?
No tests, this adds an interface.