From 88a19550ec7537a4b9822ecb7f8736ab2cc5cc90 Mon Sep 17 00:00:00 2001 From: Felix Scherz Date: Mon, 15 Apr 2024 21:08:32 +0200 Subject: [PATCH 01/11] setup scaffolding for RollingManifestWriter --- pyiceberg/manifest.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 960952d02d..c516112ed5 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -23,6 +23,7 @@ from types import TracebackType from typing import ( Any, + Callable, Dict, Iterator, List, @@ -840,6 +841,20 @@ def existing(self, entry: ManifestEntry) -> ManifestWriter: return self +class RollingManifestWriter: + _current_writer: ManifestWriter + _supplier: Callable[[], ManifestWriter] + + def __init__(self, supplier: Callable[[], ManifestWriter], target_file_size_in_bytes, target_number_of_rows) -> None: + pass + + def _should_roll_to_new_file(self) -> bool: ... + + def to_manifest_files(self) -> list[ManifestFile]: ... + + def add_entry(self, entry: ManifestEntry) -> RollingManifestWriter: ... + + class ManifestWriterV1(ManifestWriter): def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int): super().__init__( From 159999002dbac0e53232f8e01fe8be45c40254a7 Mon Sep 17 00:00:00 2001 From: Felix Scherz Date: Sun, 21 Apr 2024 19:11:38 +0200 Subject: [PATCH 02/11] feat: implement RollingManifestWriter --- pyiceberg/manifest.py | 230 +++++++++++++++++++++++++++++++----------- 1 file changed, 169 insertions(+), 61 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index c516112ed5..bd7c847f73 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -21,23 +21,23 @@ from copy import copy from enum import Enum from types import TracebackType -from typing import ( - Any, - Callable, - Dict, - Iterator, - List, - Literal, - Optional, - Type, -) +from typing import Any, Generator +from typing import Callable +from typing import Dict +from typing import Iterator +from typing import List +from typing import Literal +from typing import Optional +from typing import Type from pydantic_core import to_json from pyiceberg.avro.file import AvroFile, AvroOutputFile from pyiceberg.conversions import to_bytes from pyiceberg.exceptions import ValidationError -from pyiceberg.io import FileIO, InputFile, OutputFile +from pyiceberg.io import FileIO +from pyiceberg.io import InputFile +from pyiceberg.io import OutputFile from pyiceberg.partitioning import PartitionSpec from pyiceberg.schema import Schema from pyiceberg.typedef import Record, TableVersion @@ -53,6 +53,7 @@ StringType, StructType, ) +from pyiceberg.typedef import EMPTY_DICT UNASSIGNED_SEQ = -1 DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024 @@ -102,7 +103,9 @@ def __repr__(self) -> str: DATA_FILE_TYPE: Dict[int, StructType] = { 1: StructType( - NestedField(field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"), + NestedField( + field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme" + ), NestedField( field_id=101, name="file_format", @@ -117,9 +120,15 @@ def __repr__(self) -> str: required=True, doc="Partition data tuple, schema based on the partition spec", ), - NestedField(field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"), NestedField( - field_id=104, name="file_size_in_bytes", field_type=LongType(), required=True, doc="Total file size in bytes" + field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file" + ), + NestedField( + field_id=104, + name="file_size_in_bytes", + field_type=LongType(), + required=True, + doc="Total file size in bytes", ), NestedField( field_id=105, @@ -172,7 +181,11 @@ def __repr__(self) -> str: doc="Map of column id to upper bound", ), NestedField( - field_id=131, name="key_metadata", field_type=BinaryType(), required=False, doc="Encryption key metadata blob" + field_id=131, + name="key_metadata", + field_type=BinaryType(), + required=False, + doc="Encryption key metadata blob", ), NestedField( field_id=132, @@ -192,7 +205,9 @@ def __repr__(self) -> str: doc="File format name: avro, orc, or parquet", initial_default=DataFileContent.DATA, ), - NestedField(field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"), + NestedField( + field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme" + ), NestedField( field_id=101, name="file_format", @@ -207,9 +222,15 @@ def __repr__(self) -> str: required=True, doc="Partition data tuple, schema based on the partition spec", ), - NestedField(field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"), NestedField( - field_id=104, name="file_size_in_bytes", field_type=LongType(), required=True, doc="Total file size in bytes" + field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file" + ), + NestedField( + field_id=104, + name="file_size_in_bytes", + field_type=LongType(), + required=True, + doc="Total file size in bytes", ), NestedField( field_id=108, @@ -254,7 +275,11 @@ def __repr__(self) -> str: doc="Map of column id to upper bound", ), NestedField( - field_id=131, name="key_metadata", field_type=BinaryType(), required=False, doc="Encryption key metadata blob" + field_id=131, + name="key_metadata", + field_type=BinaryType(), + required=False, + doc="Encryption key metadata blob", ), NestedField( field_id=132, @@ -282,28 +307,34 @@ def __repr__(self) -> str: def data_file_with_partition(partition_type: StructType, format_version: TableVersion) -> StructType: - data_file_partition_type = StructType(*[ - NestedField( - field_id=field.field_id, - name=field.name, - field_type=field.field_type, - required=field.required, - ) - for field in partition_type.fields - ]) + data_file_partition_type = StructType( + *[ + NestedField( + field_id=field.field_id, + name=field.name, + field_type=field.field_type, + required=field.required, + ) + for field in partition_type.fields + ] + ) - return StructType(*[ - NestedField( - field_id=102, - name="partition", - field_type=data_file_partition_type, - required=True, - doc="Partition data tuple, schema based on the partition spec", - ) - if field.field_id == 102 - else field - for field in DATA_FILE_TYPE[format_version].fields - ]) + return StructType( + *[ + ( + NestedField( + field_id=102, + name="partition", + field_type=data_file_partition_type, + required=True, + doc="Partition data tuple, schema based on the partition spec", + ) + if field.field_id == 102 + else field + ) + for field in DATA_FILE_TYPE[format_version].fields + ] + ) class DataFile(Record): @@ -384,14 +415,18 @@ def __eq__(self, other: Any) -> bool: ), } -MANIFEST_ENTRY_SCHEMAS_STRUCT = {format_version: schema.as_struct() for format_version, schema in MANIFEST_ENTRY_SCHEMAS.items()} +MANIFEST_ENTRY_SCHEMAS_STRUCT = { + format_version: schema.as_struct() for format_version, schema in MANIFEST_ENTRY_SCHEMAS.items() +} def manifest_entry_schema_with_data_file(format_version: TableVersion, data_file: StructType) -> Schema: - return Schema(*[ - NestedField(2, "data_file", data_file, required=True) if field.field_id == 2 else field - for field in MANIFEST_ENTRY_SCHEMAS[format_version].fields - ]) + return Schema( + *[ + NestedField(2, "data_file", data_file, required=True) if field.field_id == 2 else field + for field in MANIFEST_ENTRY_SCHEMAS[format_version].fields + ] + ) class ManifestEntry(Record): @@ -499,7 +534,9 @@ def update(self, value: Any) -> None: self._min = min(self._min, value) -def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partitions: List[Record]) -> List[PartitionFieldSummary]: +def construct_partition_summaries( + spec: PartitionSpec, schema: Schema, partitions: List[Record] +) -> List[PartitionFieldSummary]: types = [field.field_type for field in spec.partition_type(schema).fields] field_stats = [PartitionFieldStats(field_type) for field_type in types] for partition_keys in partitions: @@ -523,7 +560,9 @@ def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partition NestedField(512, "added_rows_count", LongType(), required=False), NestedField(513, "existing_rows_count", LongType(), required=False), NestedField(514, "deleted_rows_count", LongType(), required=False), - NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False), + NestedField( + 507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False + ), NestedField(519, "key_metadata", BinaryType(), required=False), ), 2: Schema( @@ -540,12 +579,16 @@ def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partition NestedField(512, "added_rows_count", LongType(), required=True), NestedField(513, "existing_rows_count", LongType(), required=True), NestedField(514, "deleted_rows_count", LongType(), required=True), - NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False), + NestedField( + 507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False + ), NestedField(519, "key_metadata", BinaryType(), required=False), ), } -MANIFEST_LIST_FILE_STRUCTS = {format_version: schema.as_struct() for format_version, schema in MANIFEST_LIST_FILE_SCHEMAS.items()} +MANIFEST_LIST_FILE_STRUCTS = { + format_version: schema.as_struct() for format_version, schema in MANIFEST_LIST_FILE_SCHEMAS.items() +} POSITIONAL_DELETE_SCHEMA = Schema( @@ -669,7 +712,9 @@ def _inherit_from_manifest(entry: ManifestEntry, manifest: ManifestFile) -> Mani # in v1 tables, the file sequence number is not persisted and can be safely defaulted to 0 # in v2 tables, the file sequence number should be inherited iff the entry status is ADDED - if entry.file_sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED): + if entry.file_sequence_number is None and ( + manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED + ): # Only available in V2, always 0 in V1 entry.file_sequence_number = manifest.sequence_number @@ -842,17 +887,74 @@ def existing(self, entry: ManifestEntry) -> ManifestWriter: class RollingManifestWriter: - _current_writer: ManifestWriter - _supplier: Callable[[], ManifestWriter] - - def __init__(self, supplier: Callable[[], ManifestWriter], target_file_size_in_bytes, target_number_of_rows) -> None: - pass - - def _should_roll_to_new_file(self) -> bool: ... + closed: bool + _supplier: Generator[ManifestWriter, None, None] + _manifest_files: list[ManifestFile] + _target_file_size_in_bytes: int + _target_number_of_rows: int + _current_writer: Optional[ManifestWriter] + _current_file_rows: int + + def __init__( + self, supplier: Generator[ManifestWriter, None, None], target_file_size_in_bytes, target_number_of_rows + ) -> None: + self._closed = False + self._manifest_files = [] + self._supplier = supplier + self._target_file_size_in_bytes = target_file_size_in_bytes + self._target_number_of_rows = target_number_of_rows + self._current_writer = None + self._current_file_rows = 0 + + def __enter__(self) -> RollingManifestWriter: + self._get_current_writer().__enter__() + return self - def to_manifest_files(self) -> list[ManifestFile]: ... + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_value: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> None: + self.closed = True + if self._current_writer: + self._current_writer.__exit__(exc_type, exc_value, traceback) + + def _get_current_writer(self) -> ManifestWriter: + if not self._current_writer: + self._current_writer = next(self._supplier) + self._current_writer.__enter__() + return self._current_writer + if self._should_roll_to_new_file(): + self._close_current_writer() + return self._current_writer + + def _should_roll_to_new_file(self) -> bool: + if not self._current_writer: + return False + return ( + self._current_file_rows >= self._target_number_of_rows + or len(self._current_writer._output_file) >= self._target_file_size_in_bytes + ) - def add_entry(self, entry: ManifestEntry) -> RollingManifestWriter: ... + def _close_current_writer(self): + if self._current_writer: + self._current_writer.__exit__(None, None, None) + current_file = self._current_writer.to_manifest_file() + self._manifest_files.append(current_file) + self._current_writer = None + self._current_file_rows = 0 + + def to_manifest_files(self) -> list[ManifestFile]: + self._close_current_writer() + self._closed = True + return self._manifest_files + + def add_entry(self, entry: ManifestEntry) -> RollingManifestWriter: + if self._closed: + raise RuntimeError("Cannot add entry to closed manifest writer") + self._get_current_writer().add_entry(entry) + return self class ManifestWriterV1(ManifestWriter): @@ -962,7 +1064,11 @@ def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id super().__init__( format_version=1, output_file=output_file, - meta={"snapshot-id": str(snapshot_id), "parent-snapshot-id": str(parent_snapshot_id), "format-version": "1"}, + meta={ + "snapshot-id": str(snapshot_id), + "parent-snapshot-id": str(parent_snapshot_id), + "format-version": "1", + }, ) def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: @@ -975,7 +1081,9 @@ class ManifestListWriterV2(ManifestListWriter): _commit_snapshot_id: int _sequence_number: int - def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: int): + def __init__( + self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: int + ): super().__init__( format_version=2, output_file=output_file, From b2160f52478cef9e5538e54b519946c2b1b59591 Mon Sep 17 00:00:00 2001 From: Felix Scherz Date: Sun, 21 Apr 2024 19:40:31 +0200 Subject: [PATCH 03/11] test: dummy test without rollover --- pyiceberg/manifest.py | 2 + tests/utils/test_manifest.py | 179 +++++++++++++++++++++++++++++++++++ 2 files changed, 181 insertions(+) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index bd7c847f73..b2803e9d32 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -927,6 +927,7 @@ def _get_current_writer(self) -> ManifestWriter: return self._current_writer if self._should_roll_to_new_file(): self._close_current_writer() + return self._get_current_writer() return self._current_writer def _should_roll_to_new_file(self) -> bool: @@ -954,6 +955,7 @@ def add_entry(self, entry: ManifestEntry) -> RollingManifestWriter: if self._closed: raise RuntimeError("Cannot add entry to closed manifest writer") self._get_current_writer().add_entry(entry) + self._current_file_rows += entry.data_file.record_count return self diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py index ef33b16b00..2c299acbec 100644 --- a/tests/utils/test_manifest.py +++ b/tests/utils/test_manifest.py @@ -31,6 +31,7 @@ ManifestEntryStatus, ManifestFile, PartitionFieldSummary, + RollingManifestWriter, read_manifest_list, write_manifest, write_manifest_list, @@ -493,6 +494,184 @@ def test_write_manifest( assert data_file.sort_order_id == 0 +@pytest.mark.parametrize("format_version", [1, 2]) +def test_rolling_manifest_writer( + generated_manifest_file_file_v1: str, generated_manifest_file_file_v2: str, format_version: TableVersion +) -> None: + io = load_file_io() + snapshot = Snapshot( + snapshot_id=25, + parent_snapshot_id=19, + timestamp_ms=1602638573590, + manifest_list=generated_manifest_file_file_v1 if format_version == 1 else generated_manifest_file_file_v2, + summary=Summary(Operation.APPEND), + schema_id=3, + ) + demo_manifest_file = snapshot.manifests(io)[0] + manifest_entries = demo_manifest_file.fetch_manifest_entry(io) + test_schema = Schema( + NestedField(1, "VendorID", IntegerType(), False), NestedField(2, "tpep_pickup_datetime", IntegerType(), False) + ) + test_spec = PartitionSpec( + PartitionField(source_id=1, field_id=1, transform=IdentityTransform(), name="VendorID"), + PartitionField(source_id=2, field_id=2, transform=IdentityTransform(), name="tpep_pickup_datetime"), + spec_id=demo_manifest_file.partition_spec_id, + ) + with TemporaryDirectory() as tmpdir: + tmp_avro_file = tmpdir + "/test_write_manifest.avro" + tmp_avro_file = tmpdir + "/test_write_manifest-1.avro" + output = io.new_output(tmp_avro_file) + def supplier(): + i = 0 + while True: + i += 1 + tmp_avro_file = tmpdir + f"/test_write_manifest-{i}.avro" + output = io.new_output(tmp_avro_file) + yield write_manifest( + format_version=format_version, + spec=test_spec, + schema=test_schema, + output_file=output, + snapshot_id=8744736658442914487, + ) + with RollingManifestWriter(supplier=supplier(), target_file_size_in_bytes=388872 + 1, target_number_of_rows=20000) as writer: + for entry in manifest_entries: + writer.add_entry(entry) + new_manifest = writer.to_manifest_files()[0] + with pytest.raises(RuntimeError): + # It is already closed + writer.add_entry(manifest_entries[0]) + + expected_metadata = { + "schema": test_schema.model_dump_json(), + "partition-spec": test_spec.model_dump_json(), + "partition-spec-id": str(test_spec.spec_id), + "format-version": str(format_version), + } + _verify_metadata_with_fastavro( + tmp_avro_file, + expected_metadata, + ) + new_manifest_entries = new_manifest.fetch_manifest_entry(io) + + manifest_entry = new_manifest_entries[0] + + assert manifest_entry.status == ManifestEntryStatus.ADDED + assert manifest_entry.snapshot_id == 8744736658442914487 + assert manifest_entry.data_sequence_number == -1 if format_version == 1 else 3 + assert isinstance(manifest_entry.data_file, DataFile) + + data_file = manifest_entry.data_file + + assert data_file.content is DataFileContent.DATA + assert ( + data_file.file_path + == "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet" + ) + assert data_file.file_format == FileFormat.PARQUET + assert data_file.partition == Record(VendorID=1, tpep_pickup_datetime=1925) + assert data_file.record_count == 19513 + assert data_file.file_size_in_bytes == 388872 + assert data_file.column_sizes == { + 1: 53, + 2: 98153, + 3: 98693, + 4: 53, + 5: 53, + 6: 53, + 7: 17425, + 8: 18528, + 9: 53, + 10: 44788, + 11: 35571, + 12: 53, + 13: 1243, + 14: 2355, + 15: 12750, + 16: 4029, + 17: 110, + 18: 47194, + 19: 2948, + } + assert data_file.value_counts == { + 1: 19513, + 2: 19513, + 3: 19513, + 4: 19513, + 5: 19513, + 6: 19513, + 7: 19513, + 8: 19513, + 9: 19513, + 10: 19513, + 11: 19513, + 12: 19513, + 13: 19513, + 14: 19513, + 15: 19513, + 16: 19513, + 17: 19513, + 18: 19513, + 19: 19513, + } + assert data_file.null_value_counts == { + 1: 19513, + 2: 0, + 3: 0, + 4: 19513, + 5: 19513, + 6: 19513, + 7: 0, + 8: 0, + 9: 19513, + 10: 0, + 11: 0, + 12: 19513, + 13: 0, + 14: 0, + 15: 0, + 16: 0, + 17: 0, + 18: 0, + 19: 0, + } + assert data_file.nan_value_counts == {16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0} + assert data_file.lower_bounds == { + 2: b"2020-04-01 00:00", + 3: b"2020-04-01 00:12", + 7: b"\x03\x00\x00\x00", + 8: b"\x01\x00\x00\x00", + 10: b"\xf6(\\\x8f\xc2\x05S\xc0", + 11: b"\x00\x00\x00\x00\x00\x00\x00\x00", + 13: b"\x00\x00\x00\x00\x00\x00\x00\x00", + 14: b"\x00\x00\x00\x00\x00\x00\xe0\xbf", + 15: b")\\\x8f\xc2\xf5(\x08\xc0", + 16: b"\x00\x00\x00\x00\x00\x00\x00\x00", + 17: b"\x00\x00\x00\x00\x00\x00\x00\x00", + 18: b"\xf6(\\\x8f\xc2\xc5S\xc0", + 19: b"\x00\x00\x00\x00\x00\x00\x04\xc0", + } + assert data_file.upper_bounds == { + 2: b"2020-04-30 23:5:", + 3: b"2020-05-01 00:41", + 7: b"\t\x01\x00\x00", + 8: b"\t\x01\x00\x00", + 10: b"\xcd\xcc\xcc\xcc\xcc,_@", + 11: b"\x1f\x85\xebQ\\\xe2\xfe@", + 13: b"\x00\x00\x00\x00\x00\x00\x12@", + 14: b"\x00\x00\x00\x00\x00\x00\xe0?", + 15: b"q=\n\xd7\xa3\xf01@", + 16: b"\x00\x00\x00\x00\x00`B@", + 17: b"333333\xd3?", + 18: b"\x00\x00\x00\x00\x00\x18b@", + 19: b"\x00\x00\x00\x00\x00\x00\x04@", + } + assert data_file.key_metadata is None + assert data_file.split_offsets == [4] + assert data_file.equality_ids is None + assert data_file.sort_order_id == 0 + + @pytest.mark.parametrize("format_version", [1, 2]) def test_write_manifest_list( generated_manifest_file_file_v1: str, generated_manifest_file_file_v2: str, format_version: TableVersion From ff58a53d30c233d499f1efac18815599661faea3 Mon Sep 17 00:00:00 2001 From: Felix Scherz Date: Tue, 23 Apr 2024 18:01:38 +0200 Subject: [PATCH 04/11] test: test different rollover scenarios --- pyiceberg/manifest.py | 4 +- tests/utils/test_manifest.py | 166 ++++++----------------------------- 2 files changed, 30 insertions(+), 140 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index b2803e9d32..41fbe01854 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -927,7 +927,6 @@ def _get_current_writer(self) -> ManifestWriter: return self._current_writer if self._should_roll_to_new_file(): self._close_current_writer() - return self._get_current_writer() return self._current_writer def _should_roll_to_new_file(self) -> bool: @@ -955,7 +954,6 @@ def add_entry(self, entry: ManifestEntry) -> RollingManifestWriter: if self._closed: raise RuntimeError("Cannot add entry to closed manifest writer") self._get_current_writer().add_entry(entry) - self._current_file_rows += entry.data_file.record_count return self @@ -1137,3 +1135,5 @@ def write_manifest_list( return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number) else: raise ValueError(f"Cannot write manifest list for table version: {format_version}") + + diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py index 2c299acbec..b82f0713c3 100644 --- a/tests/utils/test_manifest.py +++ b/tests/utils/test_manifest.py @@ -16,7 +16,7 @@ # under the License. # pylint: disable=redefined-outer-name,arguments-renamed,fixme from tempfile import TemporaryDirectory -from typing import Dict +from typing import Dict, Generator import fastavro import pytest @@ -30,6 +30,7 @@ ManifestContent, ManifestEntryStatus, ManifestFile, + ManifestWriter, PartitionFieldSummary, RollingManifestWriter, read_manifest_list, @@ -495,8 +496,22 @@ def test_write_manifest( @pytest.mark.parametrize("format_version", [1, 2]) +@pytest.mark.parametrize( + "target_number_of_rows,target_file_size_in_bytes,expected_number_of_files", + [ + (19514, 388873, 1), # should not roll over + (19513, 388873, 2), # should roll over due to target_rows + (19514, 388872, 2), # should roll over due target_bytes + (19513, 388872, 2), # should roll over due to target_rows and target_bytes + ], +) def test_rolling_manifest_writer( - generated_manifest_file_file_v1: str, generated_manifest_file_file_v2: str, format_version: TableVersion + generated_manifest_file_file_v1: str, + generated_manifest_file_file_v2: str, + format_version: TableVersion, + target_number_of_rows: int, + target_file_size_in_bytes: int, + expected_number_of_files: int, ) -> None: io = load_file_io() snapshot = Snapshot( @@ -518,13 +533,10 @@ def test_rolling_manifest_writer( spec_id=demo_manifest_file.partition_spec_id, ) with TemporaryDirectory() as tmpdir: - tmp_avro_file = tmpdir + "/test_write_manifest.avro" - tmp_avro_file = tmpdir + "/test_write_manifest-1.avro" - output = io.new_output(tmp_avro_file) - def supplier(): + + def supplier() -> Generator[ManifestWriter, None, None]: i = 0 while True: - i += 1 tmp_avro_file = tmpdir + f"/test_write_manifest-{i}.avro" output = io.new_output(tmp_avro_file) yield write_manifest( @@ -534,143 +546,21 @@ def supplier(): output_file=output, snapshot_id=8744736658442914487, ) - with RollingManifestWriter(supplier=supplier(), target_file_size_in_bytes=388872 + 1, target_number_of_rows=20000) as writer: + i += 1 + + with RollingManifestWriter( + supplier=supplier(), + target_file_size_in_bytes=target_file_size_in_bytes, + target_number_of_rows=target_number_of_rows, + ) as writer: for entry in manifest_entries: writer.add_entry(entry) - new_manifest = writer.to_manifest_files()[0] + manifest_files = writer.to_manifest_files() + assert len(manifest_files) == expected_number_of_files with pytest.raises(RuntimeError): # It is already closed writer.add_entry(manifest_entries[0]) - expected_metadata = { - "schema": test_schema.model_dump_json(), - "partition-spec": test_spec.model_dump_json(), - "partition-spec-id": str(test_spec.spec_id), - "format-version": str(format_version), - } - _verify_metadata_with_fastavro( - tmp_avro_file, - expected_metadata, - ) - new_manifest_entries = new_manifest.fetch_manifest_entry(io) - - manifest_entry = new_manifest_entries[0] - - assert manifest_entry.status == ManifestEntryStatus.ADDED - assert manifest_entry.snapshot_id == 8744736658442914487 - assert manifest_entry.data_sequence_number == -1 if format_version == 1 else 3 - assert isinstance(manifest_entry.data_file, DataFile) - - data_file = manifest_entry.data_file - - assert data_file.content is DataFileContent.DATA - assert ( - data_file.file_path - == "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet" - ) - assert data_file.file_format == FileFormat.PARQUET - assert data_file.partition == Record(VendorID=1, tpep_pickup_datetime=1925) - assert data_file.record_count == 19513 - assert data_file.file_size_in_bytes == 388872 - assert data_file.column_sizes == { - 1: 53, - 2: 98153, - 3: 98693, - 4: 53, - 5: 53, - 6: 53, - 7: 17425, - 8: 18528, - 9: 53, - 10: 44788, - 11: 35571, - 12: 53, - 13: 1243, - 14: 2355, - 15: 12750, - 16: 4029, - 17: 110, - 18: 47194, - 19: 2948, - } - assert data_file.value_counts == { - 1: 19513, - 2: 19513, - 3: 19513, - 4: 19513, - 5: 19513, - 6: 19513, - 7: 19513, - 8: 19513, - 9: 19513, - 10: 19513, - 11: 19513, - 12: 19513, - 13: 19513, - 14: 19513, - 15: 19513, - 16: 19513, - 17: 19513, - 18: 19513, - 19: 19513, - } - assert data_file.null_value_counts == { - 1: 19513, - 2: 0, - 3: 0, - 4: 19513, - 5: 19513, - 6: 19513, - 7: 0, - 8: 0, - 9: 19513, - 10: 0, - 11: 0, - 12: 19513, - 13: 0, - 14: 0, - 15: 0, - 16: 0, - 17: 0, - 18: 0, - 19: 0, - } - assert data_file.nan_value_counts == {16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0} - assert data_file.lower_bounds == { - 2: b"2020-04-01 00:00", - 3: b"2020-04-01 00:12", - 7: b"\x03\x00\x00\x00", - 8: b"\x01\x00\x00\x00", - 10: b"\xf6(\\\x8f\xc2\x05S\xc0", - 11: b"\x00\x00\x00\x00\x00\x00\x00\x00", - 13: b"\x00\x00\x00\x00\x00\x00\x00\x00", - 14: b"\x00\x00\x00\x00\x00\x00\xe0\xbf", - 15: b")\\\x8f\xc2\xf5(\x08\xc0", - 16: b"\x00\x00\x00\x00\x00\x00\x00\x00", - 17: b"\x00\x00\x00\x00\x00\x00\x00\x00", - 18: b"\xf6(\\\x8f\xc2\xc5S\xc0", - 19: b"\x00\x00\x00\x00\x00\x00\x04\xc0", - } - assert data_file.upper_bounds == { - 2: b"2020-04-30 23:5:", - 3: b"2020-05-01 00:41", - 7: b"\t\x01\x00\x00", - 8: b"\t\x01\x00\x00", - 10: b"\xcd\xcc\xcc\xcc\xcc,_@", - 11: b"\x1f\x85\xebQ\\\xe2\xfe@", - 13: b"\x00\x00\x00\x00\x00\x00\x12@", - 14: b"\x00\x00\x00\x00\x00\x00\xe0?", - 15: b"q=\n\xd7\xa3\xf01@", - 16: b"\x00\x00\x00\x00\x00`B@", - 17: b"333333\xd3?", - 18: b"\x00\x00\x00\x00\x00\x18b@", - 19: b"\x00\x00\x00\x00\x00\x00\x04@", - } - assert data_file.key_metadata is None - assert data_file.split_offsets == [4] - assert data_file.equality_ids is None - assert data_file.sort_order_id == 0 - @pytest.mark.parametrize("format_version", [1, 2]) def test_write_manifest_list( From 1cc36a535b4a79b743a2f1a5c22880de9f16f1b9 Mon Sep 17 00:00:00 2001 From: Felix Scherz Date: Sat, 4 May 2024 14:09:00 +0200 Subject: [PATCH 05/11] feat: extend `OutputStream`, `AvroOutputFile` and `ManifestWriter` with `__len__` method --- pyiceberg/avro/file.py | 9 +++++++++ pyiceberg/io/__init__.py | 4 ++++ pyiceberg/manifest.py | 7 ++++++- tests/utils/test_manifest.py | 4 ++-- 4 files changed, 21 insertions(+), 3 deletions(-) diff --git a/pyiceberg/avro/file.py b/pyiceberg/avro/file.py index d0da7651b7..eb09b5077b 100644 --- a/pyiceberg/avro/file.py +++ b/pyiceberg/avro/file.py @@ -228,6 +228,7 @@ class AvroOutputFile(Generic[D]): encoder: BinaryEncoder sync_bytes: bytes writer: Writer + closed: bool def __init__( self, @@ -247,6 +248,7 @@ def __init__( else resolve_writer(record_schema=record_schema, file_schema=self.file_schema) ) self.metadata = metadata + self.closed = False def __enter__(self) -> AvroOutputFile[D]: """ @@ -267,6 +269,7 @@ def __exit__( ) -> None: """Perform cleanup when exiting the scope of a 'with' statement.""" self.output_stream.close() + self.closed = True def _write_header(self) -> None: json_schema = json.dumps(AvroSchemaConversion().iceberg_to_avro(self.file_schema, schema_name=self.schema_name)) @@ -285,3 +288,9 @@ def write_block(self, objects: List[D]) -> None: self.encoder.write_int(len(block_content)) self.encoder.write(block_content) self.encoder.write(self.sync_bytes) + + def __len__(self) -> int: + """Returns the total number number of bytes written.""" + if self.closed: + return len(self.output_file) + return len(self.output_stream) diff --git a/pyiceberg/io/__init__.py b/pyiceberg/io/__init__.py index d200874741..da674da71f 100644 --- a/pyiceberg/io/__init__.py +++ b/pyiceberg/io/__init__.py @@ -136,6 +136,10 @@ def __exit__( ) -> None: """Perform cleanup when exiting the scope of a 'with' statement.""" + @abstractmethod + def __len__(self) -> int: + """Returns the total number number of bytes written to the stream.""" + class InputFile(ABC): """A base class for InputFile implementations. diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 41fbe01854..9090aef35e 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -886,6 +886,11 @@ def existing(self, entry: ManifestEntry) -> ManifestWriter: return self + def __len__(self) -> int: + """Returns the total number number of bytes written.""" + return len(self._writer) + + class RollingManifestWriter: closed: bool _supplier: Generator[ManifestWriter, None, None] @@ -934,7 +939,7 @@ def _should_roll_to_new_file(self) -> bool: return False return ( self._current_file_rows >= self._target_number_of_rows - or len(self._current_writer._output_file) >= self._target_file_size_in_bytes + or len(self._current_writer) >= self._target_file_size_in_bytes ) def _close_current_writer(self): diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py index b82f0713c3..81c52e6e1d 100644 --- a/tests/utils/test_manifest.py +++ b/tests/utils/test_manifest.py @@ -501,8 +501,8 @@ def test_write_manifest( [ (19514, 388873, 1), # should not roll over (19513, 388873, 2), # should roll over due to target_rows - (19514, 388872, 2), # should roll over due target_bytes - (19513, 388872, 2), # should roll over due to target_rows and target_bytes + (4000, 388872, 2), # should roll over due target_bytes + (4000, 388872, 2), # should roll over due to target_rows and target_bytes ], ) def test_rolling_manifest_writer( From c7f9611b16e13477b373c359a4dfcd8733488c31 Mon Sep 17 00:00:00 2001 From: Felix Scherz Date: Sat, 4 May 2024 14:14:21 +0200 Subject: [PATCH 06/11] test: patch tests temporarily --- tests/utils/test_manifest.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py index 81c52e6e1d..f6a6574d80 100644 --- a/tests/utils/test_manifest.py +++ b/tests/utils/test_manifest.py @@ -532,6 +532,14 @@ def test_rolling_manifest_writer( PartitionField(source_id=2, field_id=2, transform=IdentityTransform(), name="tpep_pickup_datetime"), spec_id=demo_manifest_file.partition_spec_id, ) + + # The tests are using `PyArrowFileIO` where `OutputStream` is implemented as `pyarrow.lib.BufferedOutputStream` + # this is just to show the tests passing if `pyarrow.lib.BufferedOutputStream` would implement the + # new `OutputStream` protocol that includes a `__len__` method + from pyiceberg.avro.file import AvroOutputFile + AvroOutputFile.__len__ = lambda self: self.output_stream.tell() + + with TemporaryDirectory() as tmpdir: def supplier() -> Generator[ManifestWriter, None, None]: From 70c37efde1dddcbfd85698f5f14b05bac8a9d1ab Mon Sep 17 00:00:00 2001 From: Felix Scherz Date: Sat, 4 May 2024 14:26:15 +0200 Subject: [PATCH 07/11] feat: use `tell()` instead of `__len__` --- pyiceberg/avro/file.py | 4 ++-- pyiceberg/io/__init__.py | 4 ++-- pyiceberg/manifest.py | 5 ++--- tests/utils/test_manifest.py | 7 ------- 4 files changed, 6 insertions(+), 14 deletions(-) diff --git a/pyiceberg/avro/file.py b/pyiceberg/avro/file.py index eb09b5077b..1e3dbfd3df 100644 --- a/pyiceberg/avro/file.py +++ b/pyiceberg/avro/file.py @@ -290,7 +290,7 @@ def write_block(self, objects: List[D]) -> None: self.encoder.write(self.sync_bytes) def __len__(self) -> int: - """Returns the total number number of bytes written.""" + """Return the total number number of bytes written.""" if self.closed: return len(self.output_file) - return len(self.output_stream) + return self.output_stream.tell() diff --git a/pyiceberg/io/__init__.py b/pyiceberg/io/__init__.py index da674da71f..d8a264ab29 100644 --- a/pyiceberg/io/__init__.py +++ b/pyiceberg/io/__init__.py @@ -137,8 +137,8 @@ def __exit__( """Perform cleanup when exiting the scope of a 'with' statement.""" @abstractmethod - def __len__(self) -> int: - """Returns the total number number of bytes written to the stream.""" + def tell(self) -> int: + """Return the total number number of bytes written to the stream.""" class InputFile(ABC): diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 9090aef35e..927ba1790d 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -887,7 +887,7 @@ def existing(self, entry: ManifestEntry) -> ManifestWriter: def __len__(self) -> int: - """Returns the total number number of bytes written.""" + """Return the total number number of bytes written.""" return len(self._writer) @@ -938,8 +938,7 @@ def _should_roll_to_new_file(self) -> bool: if not self._current_writer: return False return ( - self._current_file_rows >= self._target_number_of_rows - or len(self._current_writer) >= self._target_file_size_in_bytes + self._current_file_rows >= self._target_number_of_rows or len(self._current_writer) >= self._target_file_size_in_bytes ) def _close_current_writer(self): diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py index f6a6574d80..2c4b24a0be 100644 --- a/tests/utils/test_manifest.py +++ b/tests/utils/test_manifest.py @@ -533,13 +533,6 @@ def test_rolling_manifest_writer( spec_id=demo_manifest_file.partition_spec_id, ) - # The tests are using `PyArrowFileIO` where `OutputStream` is implemented as `pyarrow.lib.BufferedOutputStream` - # this is just to show the tests passing if `pyarrow.lib.BufferedOutputStream` would implement the - # new `OutputStream` protocol that includes a `__len__` method - from pyiceberg.avro.file import AvroOutputFile - AvroOutputFile.__len__ = lambda self: self.output_stream.tell() - - with TemporaryDirectory() as tmpdir: def supplier() -> Generator[ManifestWriter, None, None]: From 05270720f7f91f56cab0991e09773f2a3aef3eef Mon Sep 17 00:00:00 2001 From: Felix Scherz Date: Sat, 6 Jul 2024 14:48:43 +0200 Subject: [PATCH 08/11] fix: issue with unset writer --- pyiceberg/manifest.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 927ba1790d..3af2c04ae1 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -926,12 +926,12 @@ def __exit__( self._current_writer.__exit__(exc_type, exc_value, traceback) def _get_current_writer(self) -> ManifestWriter: + if self._should_roll_to_new_file(): + self._close_current_writer() if not self._current_writer: self._current_writer = next(self._supplier) self._current_writer.__enter__() return self._current_writer - if self._should_roll_to_new_file(): - self._close_current_writer() return self._current_writer def _should_roll_to_new_file(self) -> bool: @@ -958,6 +958,7 @@ def add_entry(self, entry: ManifestEntry) -> RollingManifestWriter: if self._closed: raise RuntimeError("Cannot add entry to closed manifest writer") self._get_current_writer().add_entry(entry) + self._current_file_rows += entry.data_file.record_count return self From 7d369ee5439ea0ffbc47de982d284de226f76964 Mon Sep 17 00:00:00 2001 From: Felix Scherz Date: Tue, 9 Jul 2024 07:59:54 +0200 Subject: [PATCH 09/11] fix: formatting --- pyiceberg/manifest.py | 156 ++++++++++++++---------------------------- 1 file changed, 51 insertions(+), 105 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 3af2c04ae1..d56a3166a5 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -21,23 +21,22 @@ from copy import copy from enum import Enum from types import TracebackType -from typing import Any, Generator -from typing import Callable -from typing import Dict -from typing import Iterator -from typing import List -from typing import Literal -from typing import Optional -from typing import Type +from typing import ( + Any, + Dict, + Iterator, + List, + Literal, + Optional, + Type, +) from pydantic_core import to_json from pyiceberg.avro.file import AvroFile, AvroOutputFile from pyiceberg.conversions import to_bytes from pyiceberg.exceptions import ValidationError -from pyiceberg.io import FileIO -from pyiceberg.io import InputFile -from pyiceberg.io import OutputFile +from pyiceberg.io import FileIO, InputFile, OutputFile from pyiceberg.partitioning import PartitionSpec from pyiceberg.schema import Schema from pyiceberg.typedef import Record, TableVersion @@ -53,7 +52,6 @@ StringType, StructType, ) -from pyiceberg.typedef import EMPTY_DICT UNASSIGNED_SEQ = -1 DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024 @@ -103,9 +101,7 @@ def __repr__(self) -> str: DATA_FILE_TYPE: Dict[int, StructType] = { 1: StructType( - NestedField( - field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme" - ), + NestedField(field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"), NestedField( field_id=101, name="file_format", @@ -120,15 +116,9 @@ def __repr__(self) -> str: required=True, doc="Partition data tuple, schema based on the partition spec", ), + NestedField(field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"), NestedField( - field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file" - ), - NestedField( - field_id=104, - name="file_size_in_bytes", - field_type=LongType(), - required=True, - doc="Total file size in bytes", + field_id=104, name="file_size_in_bytes", field_type=LongType(), required=True, doc="Total file size in bytes" ), NestedField( field_id=105, @@ -181,11 +171,7 @@ def __repr__(self) -> str: doc="Map of column id to upper bound", ), NestedField( - field_id=131, - name="key_metadata", - field_type=BinaryType(), - required=False, - doc="Encryption key metadata blob", + field_id=131, name="key_metadata", field_type=BinaryType(), required=False, doc="Encryption key metadata blob" ), NestedField( field_id=132, @@ -205,9 +191,7 @@ def __repr__(self) -> str: doc="File format name: avro, orc, or parquet", initial_default=DataFileContent.DATA, ), - NestedField( - field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme" - ), + NestedField(field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"), NestedField( field_id=101, name="file_format", @@ -222,15 +206,9 @@ def __repr__(self) -> str: required=True, doc="Partition data tuple, schema based on the partition spec", ), + NestedField(field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"), NestedField( - field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file" - ), - NestedField( - field_id=104, - name="file_size_in_bytes", - field_type=LongType(), - required=True, - doc="Total file size in bytes", + field_id=104, name="file_size_in_bytes", field_type=LongType(), required=True, doc="Total file size in bytes" ), NestedField( field_id=108, @@ -275,11 +253,7 @@ def __repr__(self) -> str: doc="Map of column id to upper bound", ), NestedField( - field_id=131, - name="key_metadata", - field_type=BinaryType(), - required=False, - doc="Encryption key metadata blob", + field_id=131, name="key_metadata", field_type=BinaryType(), required=False, doc="Encryption key metadata blob" ), NestedField( field_id=132, @@ -307,34 +281,28 @@ def __repr__(self) -> str: def data_file_with_partition(partition_type: StructType, format_version: TableVersion) -> StructType: - data_file_partition_type = StructType( - *[ - NestedField( - field_id=field.field_id, - name=field.name, - field_type=field.field_type, - required=field.required, - ) - for field in partition_type.fields - ] - ) + data_file_partition_type = StructType(*[ + NestedField( + field_id=field.field_id, + name=field.name, + field_type=field.field_type, + required=field.required, + ) + for field in partition_type.fields + ]) - return StructType( - *[ - ( - NestedField( - field_id=102, - name="partition", - field_type=data_file_partition_type, - required=True, - doc="Partition data tuple, schema based on the partition spec", - ) - if field.field_id == 102 - else field - ) - for field in DATA_FILE_TYPE[format_version].fields - ] - ) + return StructType(*[ + NestedField( + field_id=102, + name="partition", + field_type=data_file_partition_type, + required=True, + doc="Partition data tuple, schema based on the partition spec", + ) + if field.field_id == 102 + else field + for field in DATA_FILE_TYPE[format_version].fields + ]) class DataFile(Record): @@ -415,18 +383,14 @@ def __eq__(self, other: Any) -> bool: ), } -MANIFEST_ENTRY_SCHEMAS_STRUCT = { - format_version: schema.as_struct() for format_version, schema in MANIFEST_ENTRY_SCHEMAS.items() -} +MANIFEST_ENTRY_SCHEMAS_STRUCT = {format_version: schema.as_struct() for format_version, schema in MANIFEST_ENTRY_SCHEMAS.items()} def manifest_entry_schema_with_data_file(format_version: TableVersion, data_file: StructType) -> Schema: - return Schema( - *[ - NestedField(2, "data_file", data_file, required=True) if field.field_id == 2 else field - for field in MANIFEST_ENTRY_SCHEMAS[format_version].fields - ] - ) + return Schema(*[ + NestedField(2, "data_file", data_file, required=True) if field.field_id == 2 else field + for field in MANIFEST_ENTRY_SCHEMAS[format_version].fields + ]) class ManifestEntry(Record): @@ -534,9 +498,7 @@ def update(self, value: Any) -> None: self._min = min(self._min, value) -def construct_partition_summaries( - spec: PartitionSpec, schema: Schema, partitions: List[Record] -) -> List[PartitionFieldSummary]: +def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partitions: List[Record]) -> List[PartitionFieldSummary]: types = [field.field_type for field in spec.partition_type(schema).fields] field_stats = [PartitionFieldStats(field_type) for field_type in types] for partition_keys in partitions: @@ -560,9 +522,7 @@ def construct_partition_summaries( NestedField(512, "added_rows_count", LongType(), required=False), NestedField(513, "existing_rows_count", LongType(), required=False), NestedField(514, "deleted_rows_count", LongType(), required=False), - NestedField( - 507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False - ), + NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False), NestedField(519, "key_metadata", BinaryType(), required=False), ), 2: Schema( @@ -579,16 +539,12 @@ def construct_partition_summaries( NestedField(512, "added_rows_count", LongType(), required=True), NestedField(513, "existing_rows_count", LongType(), required=True), NestedField(514, "deleted_rows_count", LongType(), required=True), - NestedField( - 507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False - ), + NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False), NestedField(519, "key_metadata", BinaryType(), required=False), ), } -MANIFEST_LIST_FILE_STRUCTS = { - format_version: schema.as_struct() for format_version, schema in MANIFEST_LIST_FILE_SCHEMAS.items() -} +MANIFEST_LIST_FILE_STRUCTS = {format_version: schema.as_struct() for format_version, schema in MANIFEST_LIST_FILE_SCHEMAS.items()} POSITIONAL_DELETE_SCHEMA = Schema( @@ -712,9 +668,7 @@ def _inherit_from_manifest(entry: ManifestEntry, manifest: ManifestFile) -> Mani # in v1 tables, the file sequence number is not persisted and can be safely defaulted to 0 # in v2 tables, the file sequence number should be inherited iff the entry status is ADDED - if entry.file_sequence_number is None and ( - manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED - ): + if entry.file_sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED): # Only available in V2, always 0 in V1 entry.file_sequence_number = manifest.sequence_number @@ -1069,11 +1023,7 @@ def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id super().__init__( format_version=1, output_file=output_file, - meta={ - "snapshot-id": str(snapshot_id), - "parent-snapshot-id": str(parent_snapshot_id), - "format-version": "1", - }, + meta={"snapshot-id": str(snapshot_id), "parent-snapshot-id": str(parent_snapshot_id), "format-version": "1"}, ) def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: @@ -1086,9 +1036,7 @@ class ManifestListWriterV2(ManifestListWriter): _commit_snapshot_id: int _sequence_number: int - def __init__( - self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: int - ): + def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: int): super().__init__( format_version=2, output_file=output_file, @@ -1140,5 +1088,3 @@ def write_manifest_list( return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number) else: raise ValueError(f"Cannot write manifest list for table version: {format_version}") - - From d010fd453c4376e0a3918ba05e73daadecc935c9 Mon Sep 17 00:00:00 2001 From: Felix Scherz Date: Tue, 9 Jul 2024 17:57:12 +0200 Subject: [PATCH 10/11] refactor: use _close_current_writer on exit --- pyiceberg/manifest.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index d56a3166a5..6194fb45c4 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -875,9 +875,8 @@ def __exit__( exc_value: Optional[BaseException], traceback: Optional[TracebackType], ) -> None: + self._close_current_writer() self.closed = True - if self._current_writer: - self._current_writer.__exit__(exc_type, exc_value, traceback) def _get_current_writer(self) -> ManifestWriter: if self._should_roll_to_new_file(): From 9f01e5a413851b0625c7b6bb4980b0561110f7a1 Mon Sep 17 00:00:00 2001 From: Felix Scherz Date: Tue, 9 Jul 2024 19:42:57 +0200 Subject: [PATCH 11/11] refactor: expect writer to be closed when creating manifest files --- pyiceberg/manifest.py | 6 +++--- tests/utils/test_manifest.py | 13 ++++++++----- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 6194fb45c4..f125221ef6 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -876,7 +876,7 @@ def __exit__( traceback: Optional[TracebackType], ) -> None: self._close_current_writer() - self.closed = True + self._closed = True def _get_current_writer(self) -> ManifestWriter: if self._should_roll_to_new_file(): @@ -903,8 +903,8 @@ def _close_current_writer(self): self._current_file_rows = 0 def to_manifest_files(self) -> list[ManifestFile]: - self._close_current_writer() - self._closed = True + if not self._closed: + raise RuntimeError("Cannot create manifest files from unclosed writer") return self._manifest_files def add_entry(self, entry: ManifestEntry) -> RollingManifestWriter: diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py index 2c4b24a0be..82750fe871 100644 --- a/tests/utils/test_manifest.py +++ b/tests/utils/test_manifest.py @@ -556,11 +556,12 @@ def supplier() -> Generator[ManifestWriter, None, None]: ) as writer: for entry in manifest_entries: writer.add_entry(entry) - manifest_files = writer.to_manifest_files() - assert len(manifest_files) == expected_number_of_files - with pytest.raises(RuntimeError): - # It is already closed - writer.add_entry(manifest_entries[0]) + + manifest_files = writer.to_manifest_files() + assert len(manifest_files) == expected_number_of_files + with pytest.raises(RuntimeError): + # It is already closed + writer.add_entry(manifest_entries[0]) @pytest.mark.parametrize("format_version", [1, 2]) @@ -630,3 +631,5 @@ def test_write_manifest_list( assert entry.file_sequence_number == 0 if format_version == 1 else 3 assert entry.snapshot_id == 8744736658442914487 assert entry.status == ManifestEntryStatus.ADDED + +