1717from __future__ import annotations
1818
1919import math
20- from abc import ABC
21- from abc import abstractmethod
20+ from abc import ABC , abstractmethod
2221from enum import Enum
2322from types import TracebackType
24- from typing import Any , Generator
25- from typing import Callable
26- from typing import Dict
27- from typing import Iterator
28- from typing import List
29- from typing import Literal
30- from typing import Optional
31- from typing import Type
32-
33- from pyiceberg .avro .file import AvroFile
34- from pyiceberg .avro .file import AvroOutputFile
23+ from typing import Any , Dict , Generator , Iterator , List , Literal , Optional , Type
24+
25+ from pyiceberg .avro .file import AvroFile , AvroOutputFile
3526from pyiceberg .conversions import to_bytes
3627from pyiceberg .exceptions import ValidationError
37- from pyiceberg .io import FileIO
38- from pyiceberg .io import InputFile
39- from pyiceberg .io import OutputFile
28+ from pyiceberg .io import FileIO , InputFile , OutputFile
4029from pyiceberg .partitioning import PartitionSpec
4130from pyiceberg .schema import Schema
42- from pyiceberg .typedef import EMPTY_DICT
43- from pyiceberg .typedef import Record
44- from pyiceberg . typedef import TableVersion
45- from pyiceberg . types import BinaryType
46- from pyiceberg . types import BooleanType
47- from pyiceberg . types import IntegerType
48- from pyiceberg . types import ListType
49- from pyiceberg . types import LongType
50- from pyiceberg . types import MapType
51- from pyiceberg . types import NestedField
52- from pyiceberg . types import PrimitiveType
53- from pyiceberg . types import StringType
54- from pyiceberg . types import StructType
31+ from pyiceberg .typedef import EMPTY_DICT , Record , TableVersion
32+ from pyiceberg .types import (
33+ BinaryType ,
34+ BooleanType ,
35+ IntegerType ,
36+ ListType ,
37+ LongType ,
38+ MapType ,
39+ NestedField ,
40+ PrimitiveType ,
41+ StringType ,
42+ StructType ,
43+ )
5544
5645UNASSIGNED_SEQ = - 1
5746DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024
@@ -101,9 +90,7 @@ def __repr__(self) -> str:
10190
10291DATA_FILE_TYPE : Dict [int , StructType ] = {
10392 1 : StructType (
104- NestedField (
105- field_id = 100 , name = "file_path" , field_type = StringType (), required = True , doc = "Location URI with FS scheme"
106- ),
93+ NestedField (field_id = 100 , name = "file_path" , field_type = StringType (), required = True , doc = "Location URI with FS scheme" ),
10794 NestedField (
10895 field_id = 101 ,
10996 name = "file_format" ,
@@ -118,9 +105,7 @@ def __repr__(self) -> str:
118105 required = True ,
119106 doc = "Partition data tuple, schema based on the partition spec" ,
120107 ),
121- NestedField (
122- field_id = 103 , name = "record_count" , field_type = LongType (), required = True , doc = "Number of records in the file"
123- ),
108+ NestedField (field_id = 103 , name = "record_count" , field_type = LongType (), required = True , doc = "Number of records in the file" ),
124109 NestedField (
125110 field_id = 104 ,
126111 name = "file_size_in_bytes" ,
@@ -203,9 +188,7 @@ def __repr__(self) -> str:
203188 doc = "File format name: avro, orc, or parquet" ,
204189 initial_default = DataFileContent .DATA ,
205190 ),
206- NestedField (
207- field_id = 100 , name = "file_path" , field_type = StringType (), required = True , doc = "Location URI with FS scheme"
208- ),
191+ NestedField (field_id = 100 , name = "file_path" , field_type = StringType (), required = True , doc = "Location URI with FS scheme" ),
209192 NestedField (
210193 field_id = 101 ,
211194 name = "file_format" ,
@@ -220,9 +203,7 @@ def __repr__(self) -> str:
220203 required = True ,
221204 doc = "Partition data tuple, schema based on the partition spec" ,
222205 ),
223- NestedField (
224- field_id = 103 , name = "record_count" , field_type = LongType (), required = True , doc = "Number of records in the file"
225- ),
206+ NestedField (field_id = 103 , name = "record_count" , field_type = LongType (), required = True , doc = "Number of records in the file" ),
226207 NestedField (
227208 field_id = 104 ,
228209 name = "file_size_in_bytes" ,
@@ -305,34 +286,30 @@ def __repr__(self) -> str:
305286
306287
307288def data_file_with_partition (partition_type : StructType , format_version : TableVersion ) -> StructType :
308- data_file_partition_type = StructType (
309- * [
310- NestedField (
311- field_id = field .field_id ,
312- name = field .name ,
313- field_type = field .field_type ,
314- required = field .required ,
315- )
316- for field in partition_type .fields
317- ]
318- )
289+ data_file_partition_type = StructType (* [
290+ NestedField (
291+ field_id = field .field_id ,
292+ name = field .name ,
293+ field_type = field .field_type ,
294+ required = field .required ,
295+ )
296+ for field in partition_type .fields
297+ ])
319298
320- return StructType (
321- * [
322- (
323- NestedField (
324- field_id = 102 ,
325- name = "partition" ,
326- field_type = data_file_partition_type ,
327- required = True ,
328- doc = "Partition data tuple, schema based on the partition spec" ,
329- )
330- if field .field_id == 102
331- else field
299+ return StructType (* [
300+ (
301+ NestedField (
302+ field_id = 102 ,
303+ name = "partition" ,
304+ field_type = data_file_partition_type ,
305+ required = True ,
306+ doc = "Partition data tuple, schema based on the partition spec" ,
332307 )
333- for field in DATA_FILE_TYPE [format_version ].fields
334- ]
335- )
308+ if field .field_id == 102
309+ else field
310+ )
311+ for field in DATA_FILE_TYPE [format_version ].fields
312+ ])
336313
337314
338315class DataFile (Record ):
@@ -413,18 +390,14 @@ def __eq__(self, other: Any) -> bool:
413390 ),
414391}
415392
416- MANIFEST_ENTRY_SCHEMAS_STRUCT = {
417- format_version : schema .as_struct () for format_version , schema in MANIFEST_ENTRY_SCHEMAS .items ()
418- }
393+ MANIFEST_ENTRY_SCHEMAS_STRUCT = {format_version : schema .as_struct () for format_version , schema in MANIFEST_ENTRY_SCHEMAS .items ()}
419394
420395
421396def manifest_entry_schema_with_data_file (format_version : TableVersion , data_file : StructType ) -> Schema :
422- return Schema (
423- * [
424- NestedField (2 , "data_file" , data_file , required = True ) if field .field_id == 2 else field
425- for field in MANIFEST_ENTRY_SCHEMAS [format_version ].fields
426- ]
427- )
397+ return Schema (* [
398+ NestedField (2 , "data_file" , data_file , required = True ) if field .field_id == 2 else field
399+ for field in MANIFEST_ENTRY_SCHEMAS [format_version ].fields
400+ ])
428401
429402
430403class ManifestEntry (Record ):
@@ -494,9 +467,7 @@ def update(self, value: Any) -> None:
494467 self ._min = min (self ._min , value )
495468
496469
497- def construct_partition_summaries (
498- spec : PartitionSpec , schema : Schema , partitions : List [Record ]
499- ) -> List [PartitionFieldSummary ]:
470+ def construct_partition_summaries (spec : PartitionSpec , schema : Schema , partitions : List [Record ]) -> List [PartitionFieldSummary ]:
500471 types = [field .field_type for field in spec .partition_type (schema ).fields ]
501472 field_stats = [PartitionFieldStats (field_type ) for field_type in types ]
502473 for partition_keys in partitions :
@@ -520,9 +491,7 @@ def construct_partition_summaries(
520491 NestedField (512 , "added_rows_count" , LongType (), required = False ),
521492 NestedField (513 , "existing_rows_count" , LongType (), required = False ),
522493 NestedField (514 , "deleted_rows_count" , LongType (), required = False ),
523- NestedField (
524- 507 , "partitions" , ListType (508 , PARTITION_FIELD_SUMMARY_TYPE , element_required = True ), required = False
525- ),
494+ NestedField (507 , "partitions" , ListType (508 , PARTITION_FIELD_SUMMARY_TYPE , element_required = True ), required = False ),
526495 NestedField (519 , "key_metadata" , BinaryType (), required = False ),
527496 ),
528497 2 : Schema (
@@ -539,16 +508,12 @@ def construct_partition_summaries(
539508 NestedField (512 , "added_rows_count" , LongType (), required = True ),
540509 NestedField (513 , "existing_rows_count" , LongType (), required = True ),
541510 NestedField (514 , "deleted_rows_count" , LongType (), required = True ),
542- NestedField (
543- 507 , "partitions" , ListType (508 , PARTITION_FIELD_SUMMARY_TYPE , element_required = True ), required = False
544- ),
511+ NestedField (507 , "partitions" , ListType (508 , PARTITION_FIELD_SUMMARY_TYPE , element_required = True ), required = False ),
545512 NestedField (519 , "key_metadata" , BinaryType (), required = False ),
546513 ),
547514}
548515
549- MANIFEST_LIST_FILE_STRUCTS = {
550- format_version : schema .as_struct () for format_version , schema in MANIFEST_LIST_FILE_SCHEMAS .items ()
551- }
516+ MANIFEST_LIST_FILE_STRUCTS = {format_version : schema .as_struct () for format_version , schema in MANIFEST_LIST_FILE_SCHEMAS .items ()}
552517
553518
554519POSITIONAL_DELETE_SCHEMA = Schema (
@@ -667,16 +632,12 @@ def _inherit_from_manifest(entry: ManifestEntry, manifest: ManifestFile) -> Mani
667632
668633 # in v1 tables, the data sequence number is not persisted and can be safely defaulted to 0
669634 # in v2 tables, the data sequence number should be inherited iff the entry status is ADDED
670- if entry .data_sequence_number is None and (
671- manifest .sequence_number == 0 or entry .status == ManifestEntryStatus .ADDED
672- ):
635+ if entry .data_sequence_number is None and (manifest .sequence_number == 0 or entry .status == ManifestEntryStatus .ADDED ):
673636 entry .data_sequence_number = manifest .sequence_number
674637
675638 # in v1 tables, the file sequence number is not persisted and can be safely defaulted to 0
676639 # in v2 tables, the file sequence number should be inherited iff the entry status is ADDED
677- if entry .file_sequence_number is None and (
678- manifest .sequence_number == 0 or entry .status == ManifestEntryStatus .ADDED
679- ):
640+ if entry .file_sequence_number is None and (manifest .sequence_number == 0 or entry .status == ManifestEntryStatus .ADDED ):
680641 # Only available in V2, always 0 in V1
681642 entry .file_sequence_number = manifest .sequence_number
682643
@@ -827,7 +788,7 @@ class RollingManifestWriter:
827788 _current_file_rows : int
828789
829790 def __init__ (
830- self , supplier : Generator [ManifestWriter , None , None ], target_file_size_in_bytes , target_number_of_rows
791+ self , supplier : Generator [ManifestWriter , None , None ], target_file_size_in_bytes : int , target_number_of_rows : int
831792 ) -> None :
832793 self ._closed = False
833794 self ._manifest_files = []
@@ -838,6 +799,7 @@ def __init__(
838799 self ._current_file_rows = 0
839800
840801 def __enter__ (self ) -> RollingManifestWriter :
802+ """Open the writer."""
841803 self ._get_current_writer ().__enter__ ()
842804 return self
843805
@@ -847,6 +809,7 @@ def __exit__(
847809 exc_value : Optional [BaseException ],
848810 traceback : Optional [TracebackType ],
849811 ) -> None :
812+ """Close the writer."""
850813 self .closed = True
851814 if self ._current_writer :
852815 self ._current_writer .__exit__ (exc_type , exc_value , traceback )
@@ -869,7 +832,7 @@ def _should_roll_to_new_file(self) -> bool:
869832 or len (self ._current_writer ._output_file ) >= self ._target_file_size_in_bytes
870833 )
871834
872- def _close_current_writer (self ):
835+ def _close_current_writer (self ) -> None :
873836 if self ._current_writer :
874837 self ._current_writer .__exit__ (None , None , None )
875838 current_file = self ._current_writer .to_manifest_file ()
@@ -887,6 +850,7 @@ def add_entry(self, entry: ManifestEntry) -> RollingManifestWriter:
887850 raise RuntimeError ("Cannot add entry to closed manifest writer" )
888851 self ._get_current_writer ().add_entry (entry )
889852 self ._current_file_rows += entry .data_file .record_count
853+
890854 return self
891855
892856
@@ -1025,9 +989,7 @@ class ManifestListWriterV2(ManifestListWriter):
1025989 _commit_snapshot_id : int
1026990 _sequence_number : int
1027991
1028- def __init__ (
1029- self , output_file : OutputFile , snapshot_id : int , parent_snapshot_id : Optional [int ], sequence_number : int
1030- ):
992+ def __init__ (self , output_file : OutputFile , snapshot_id : int , parent_snapshot_id : Optional [int ], sequence_number : int ):
1031993 super ().__init__ (
1032994 format_version = 2 ,
1033995 output_file = output_file ,
0 commit comments