-
Notifications
You must be signed in to change notification settings - Fork 392
feat: validate_deleted_data_files
#1938
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: validate_deleted_data_files
#1938
Conversation
pyiceberg/table/update/validate.py
Outdated
| if entry.snapshot_id not in new_snapshot_ids: | ||
| continue | ||
|
|
||
| if entry.status != ManifestEntryStatus.DELETED: | ||
| continue | ||
|
|
||
| if data_filter is not None and not evaluator(entry.data_file): | ||
| continue | ||
|
|
||
| if partition_set is not None and (entry.data_file.spec_id, entry.data_file.partition) not in partition_set: | ||
| continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably cleaner to put these each on a line and wrap in an any call
validate_deleted_data_filesvalidate_deleted_data_files
pyiceberg/table/update/validate.py
Outdated
| parent_snapshot: Optional[Snapshot], | ||
| partition_set: Optional[set[Record]], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks out of order:
| parent_snapshot: Optional[Snapshot], | |
| partition_set: Optional[set[Record]], | |
| partition_set: Optional[set[Record]], | |
| parent_snapshot: Optional[Snapshot], |
It looks like the function call in validate_deleted_data_files is assuming that parent_snapshot is the last argument
conflicting_entries = deleted_data_files(table, starting_snapshot, data_filter, None, parent_snapshot)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs some more work. Next to @sungwy's comment, we in the code below:
(entry.data_file.spec_id, entry.data_file.partition) not in partition_setWhere it checks if a tuple is in a partition_set, but the partition_set only contains the Record according to the signature.
This triggered me, because if you do:
ALTER TABLE prod.db.taxis REPLACE PARTITION FIELD pickup_timestamp WITH day(pickup_timestamp);
-- and then
ALTER TABLE prod.db.taxis REPLACE PARTITION FIELD pickup_timestamp WITH day(dropoff_timestamp);Both of the partitioning strategies will produce a Record[int] because it will contain the number of days since epoch. But the meaning is completely different.
pyiceberg/table/update/validate.py
Outdated
| starting_snapshot, | ||
| parent_snapshot, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks out of order to, because validation_history has to_snapshot as the second argument, and from_snapshot as the third argument.
| starting_snapshot, | |
| parent_snapshot, | |
| starting_snapshot, | |
| parent_snapshot, |
Do you think it would be better to update validation_history function to use the following function signature instead? I think it's a lot more expected to have from_snapshot then to_snapshot
def validation_history(
table: Table,
from_snapshot: Snapshot,
to_snapshot: Snapshot,
matching_operations: set[Operation],
manifest_content_filter: ManifestContent,
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you clarify a little more? Are you saying we should move to the terms starting_snapshot (which is from_snapshot) and parent_snapshot (to_snapshot)
pyiceberg/table/update/validate.py
Outdated
| ) | ||
|
|
||
| if data_filter is not None: | ||
| evaluator = _StrictMetricsEvaluator(table.schema(), data_filter).eval |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not too sure if this is correct, because ManifestGroup.entries seems to be using inclusive projection.
Should we be using inclusive_projection here instead?
Summoning @Fokko for a second review
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree that this should be inclusive projection, since we want to know if there are any matches. Inclusive projection returns rows_might_match and rows_cannot_match. If they cannot be matched, then we can skip it :)
Fokko
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for following up on the previous PR @jayceslesar. This looks like a great start, but there are some missing elements. It would be good to add some more tests as well, since this is pretty critical code since it might affect correctness.
pyiceberg/table/update/validate.py
Outdated
| parent_snapshot: Optional[Snapshot], | ||
| partition_set: Optional[set[Record]], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs some more work. Next to @sungwy's comment, we in the code below:
(entry.data_file.spec_id, entry.data_file.partition) not in partition_setWhere it checks if a tuple is in a partition_set, but the partition_set only contains the Record according to the signature.
This triggered me, because if you do:
ALTER TABLE prod.db.taxis REPLACE PARTITION FIELD pickup_timestamp WITH day(pickup_timestamp);
-- and then
ALTER TABLE prod.db.taxis REPLACE PARTITION FIELD pickup_timestamp WITH day(dropoff_timestamp);Both of the partitioning strategies will produce a Record[int] because it will contain the number of days since epoch. But the meaning is completely different.
pyiceberg/table/update/validate.py
Outdated
| ) | ||
|
|
||
| if data_filter is not None: | ||
| evaluator = _StrictMetricsEvaluator(table.schema(), data_filter).eval |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree that this should be inclusive projection, since we want to know if there are any matches. Inclusive projection returns rows_might_match and rows_cannot_match. If they cannot be matched, then we can skip it :)
Co-authored-by: Fokko Driesprong <[email protected]>
sungwy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @jayceslesar - thanks for adopting all of the review feedback!
I think this is good to merge, but I'd prefer to make validate_deleted_data_files a private function
|
Waiting for the CI to pass |
| parent_snapshot: Ending snapshot on the branch being validated | ||
| """ | ||
| conflicting_entries = _deleted_data_files(table, starting_snapshot, data_filter, None, parent_snapshot) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, I think style wise it is more elegant to switch over to keyword-arguments:
| conflicting_entries = _deleted_data_files(table, starting_snapshot, data_filter, None, parent_snapshot) | |
| conflicting_entries = _deleted_data_files(table, starting_snapshot, data_filter, parent_snapshot=parent_snapshot) |
Fokko
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, thanks @jayceslesar for working on this, and thanks @sungwy for the review 🙌
Closes apache#1928 # Rationale for this change Add `validate_deleted_data_files` which depends on apache#1935 # Are these changes tested? Added a test! # References Java `deletedDataFiles` impl: https://github.com/apache/iceberg/blob/3a29199e73f2e9ae0f8f92a1a0732a338c66aa0d/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java#L678 Java `ManifestGroup.entries` impl: https://github.com/apache/iceberg/blob/3a29199e73f2e9ae0f8f92a1a0732a338c66aa0d/core/src/main/java/org/apache/iceberg/ManifestGroup.java#L242 --------- Co-authored-by: Fokko Driesprong <[email protected]>
Closes apache#1928 # Rationale for this change Add `validate_deleted_data_files` which depends on apache#1935 # Are these changes tested? Added a test! # References Java `deletedDataFiles` impl: https://github.com/apache/iceberg/blob/3a29199e73f2e9ae0f8f92a1a0732a338c66aa0d/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java#L678 Java `ManifestGroup.entries` impl: https://github.com/apache/iceberg/blob/3a29199e73f2e9ae0f8f92a1a0732a338c66aa0d/core/src/main/java/org/apache/iceberg/ManifestGroup.java#L242 --------- Co-authored-by: Fokko Driesprong <[email protected]>
Closes #1928
Rationale for this change
Add
validate_deleted_data_fileswhich depends on #1935Are these changes tested?
Added a test!
References
Java
deletedDataFilesimpl:https://github.com/apache/iceberg/blob/3a29199e73f2e9ae0f8f92a1a0732a338c66aa0d/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java#L678
Java
ManifestGroup.entriesimpl:https://github.com/apache/iceberg/blob/3a29199e73f2e9ae0f8f92a1a0732a338c66aa0d/core/src/main/java/org/apache/iceberg/ManifestGroup.java#L242