diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index 93cf12d669..55c34676e3 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -14,11 +14,17 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from typing import Iterator, Optional from pyiceberg.exceptions import ValidationException -from pyiceberg.manifest import ManifestContent, ManifestFile +from pyiceberg.expressions import BooleanExpression +from pyiceberg.expressions.visitors import ROWS_CANNOT_MATCH, _InclusiveMetricsEvaluator +from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile from pyiceberg.table import Table from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between +from pyiceberg.typedef import Record + +VALIDATE_DATA_FILES_EXIST_OPERATIONS = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE} def validation_history( @@ -69,3 +75,78 @@ def validation_history( raise ValidationException("No matching snapshot found.") return manifests_files, snapshots + + +def _deleted_data_files( + table: Table, + starting_snapshot: Snapshot, + data_filter: Optional[BooleanExpression], + partition_set: Optional[dict[int, set[Record]]], + parent_snapshot: Optional[Snapshot], +) -> Iterator[ManifestEntry]: + """Find deleted data files matching a filter since a starting snapshot. + + Args: + table: Table to validate + starting_snapshot: Snapshot current at the start of the operation + data_filter: Expression used to find deleted data files + partition_set: dict of {spec_id: set[partition]} to filter on + parent_snapshot: Ending snapshot on the branch being validated + + Returns: + List of conflicting manifest-entries + """ + # if there is no current table state, no files have been deleted + if parent_snapshot is None: + return + + manifests, snapshot_ids = validation_history( + table, + parent_snapshot, + starting_snapshot, + VALIDATE_DATA_FILES_EXIST_OPERATIONS, + ManifestContent.DATA, + ) + + if data_filter is not None: + evaluator = _InclusiveMetricsEvaluator(table.schema(), data_filter).eval + + for manifest in manifests: + for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False): + if entry.snapshot_id not in snapshot_ids: + continue + + if entry.status != ManifestEntryStatus.DELETED: + continue + + if data_filter is not None and evaluator(entry.data_file) is ROWS_CANNOT_MATCH: + continue + + if partition_set is not None: + spec_id = entry.data_file.spec_id + partition = entry.data_file.partition + if spec_id not in partition_set or partition not in partition_set[spec_id]: + continue + + yield entry + + +def _validate_deleted_data_files( + table: Table, + starting_snapshot: Snapshot, + data_filter: Optional[BooleanExpression], + parent_snapshot: Snapshot, +) -> None: + """Validate that no files matching a filter have been deleted from the table since a starting snapshot. + + Args: + table: Table to validate + starting_snapshot: Snapshot current at the start of the operation + data_filter: Expression used to find deleted data files + parent_snapshot: Ending snapshot on the branch being validated + + """ + conflicting_entries = _deleted_data_files(table, starting_snapshot, data_filter, None, parent_snapshot) + if any(conflicting_entries): + conflicting_snapshots = {entry.snapshot_id for entry in conflicting_entries} + raise ValidationException(f"Deleted data files were found matching the filter for snapshots {conflicting_snapshots}!") diff --git a/tests/table/test_validate.py b/tests/table/test_validate.py index ca7f83badd..74a0b59566 100644 --- a/tests/table/test_validate.py +++ b/tests/table/test_validate.py @@ -22,10 +22,10 @@ from pyiceberg.exceptions import ValidationException from pyiceberg.io import FileIO -from pyiceberg.manifest import ManifestContent, ManifestFile +from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile from pyiceberg.table import Table -from pyiceberg.table.snapshots import Operation, Snapshot -from pyiceberg.table.update.validate import validation_history +from pyiceberg.table.snapshots import Operation, Snapshot, Summary +from pyiceberg.table.update.validate import _deleted_data_files, _validate_deleted_data_files, validation_history @pytest.fixture @@ -136,3 +136,84 @@ def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestF {Operation.APPEND}, ManifestContent.DATA, ) + + +def test_deleted_data_files( + table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]], +) -> None: + table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests + + oldest_snapshot = table.snapshots()[0] + newest_snapshot = cast(Snapshot, table.current_snapshot()) + + def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]: + """Mock the manifests method to use the snapshot_id for lookup.""" + snapshot_id = self.snapshot_id + if snapshot_id in mock_manifests: + return mock_manifests[snapshot_id] + return [] + + # every snapshot is an append, so we should get nothing! + with patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect): + result = list( + _deleted_data_files( + table=table, + starting_snapshot=newest_snapshot, + data_filter=None, + parent_snapshot=oldest_snapshot, + partition_set=None, + ) + ) + + assert result == [] + + # modify second to last snapshot to be a delete + snapshots = table.snapshots() + altered_snapshot = snapshots[-2] + altered_snapshot = altered_snapshot.model_copy(update={"summary": Summary(operation=Operation.DELETE)}) + snapshots[-2] = altered_snapshot + + table.metadata = table.metadata.model_copy( + update={"snapshots": snapshots}, + ) + + my_entry = ManifestEntry.from_args( + status=ManifestEntryStatus.DELETED, + snapshot_id=altered_snapshot.snapshot_id, + ) + + with ( + patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect), + patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", return_value=[my_entry]), + ): + result = list( + _deleted_data_files( + table=table, + starting_snapshot=newest_snapshot, + data_filter=None, + parent_snapshot=oldest_snapshot, + partition_set=None, + ) + ) + + assert result == [my_entry] + + +def test_validate_deleted_data_files_raises_on_conflict( + table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]], +) -> None: + table, _ = table_v2_with_extensive_snapshots_and_manifests + oldest_snapshot = table.snapshots()[0] + newest_snapshot = cast(Snapshot, table.current_snapshot()) + + class DummyEntry: + snapshot_id = 123 + + with patch("pyiceberg.table.update.validate._deleted_data_files", return_value=[DummyEntry()]): + with pytest.raises(ValidationException): + _validate_deleted_data_files( + table=table, + starting_snapshot=newest_snapshot, + data_filter=None, + parent_snapshot=oldest_snapshot, + )