From 5eca5740f03ddced5220f157fcc4aef1f56393bb Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Tue, 19 Dec 2023 20:16:49 +0100 Subject: [PATCH 1/5] Arrow: Set field-id with prefix While working on the write support, I started to understand the meaning of the Arrow metadata. The `PARQUET:` prefix means that it is specific to Parquet, and setting `PARQUET:field_id` will actually not be set as metadata, but as the `field_id`. I think we should remove the other option without the prefix. I also did the same for the doc. I checked, in Java we don't check for the documentation, but I think it is a nice addition. However, I think we should only check for the key that's the same as the Iceberg field (`doc`). --- pyiceberg/io/pyarrow.py | 21 ++++--------- tests/io/test_pyarrow.py | 54 ++++++++++++++++---------------- tests/io/test_pyarrow_visitor.py | 14 ++++----- 3 files changed, 40 insertions(+), 49 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 67a16ebefb..550a6f4712 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -154,10 +154,9 @@ ONE_MEGABYTE = 1024 * 1024 BUFFER_SIZE = "buffer-size" ICEBERG_SCHEMA = b"iceberg.schema" -FIELD_ID = "field_id" -DOC = "doc" -PYARROW_FIELD_ID_KEYS = [b"PARQUET:field_id", b"field_id"] -PYARROW_FIELD_DOC_KEYS = [b"PARQUET:field_doc", b"field_doc", b"doc"] +# The PARQUET: in front means that it is Parquet specific, in this case the field_id +PYARROW_FIELD_ID_KEY = b"PARQUET:field_id" +PYARROW_FIELD_DOC_KEY = b"doc" T = TypeVar("T") @@ -456,7 +455,7 @@ def field(self, field: NestedField, field_result: pa.DataType) -> pa.Field: name=field.name, type=field_result, nullable=field.optional, - metadata={DOC: field.doc, FIELD_ID: str(field.field_id)} if field.doc else {FIELD_ID: str(field.field_id)}, + metadata={PYARROW_FIELD_DOC_KEY: field.doc, PYARROW_FIELD_ID_KEY: str(field.field_id)} if field.doc else {PYARROW_FIELD_ID_KEY: str(field.field_id)}, ) def list(self, list_type: ListType, element_result: pa.DataType) -> pa.DataType: @@ -720,17 +719,9 @@ def primitive(self, primitive: pa.DataType) -> Optional[T]: def _get_field_id(field: pa.Field) -> Optional[int]: - for pyarrow_field_id_key in PYARROW_FIELD_ID_KEYS: - if field_id_str := field.metadata.get(pyarrow_field_id_key): - return int(field_id_str.decode()) - return None + return int(field_id_str.decode()) if (field_id_str := field.metadata.get(PYARROW_FIELD_ID_KEY)) else None -def _get_field_doc(field: pa.Field) -> Optional[str]: - for pyarrow_doc_key in PYARROW_FIELD_DOC_KEYS: - if doc_str := field.metadata.get(pyarrow_doc_key): - return doc_str.decode() - return None class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]): @@ -738,7 +729,7 @@ def _convert_fields(self, arrow_fields: Iterable[pa.Field], field_results: List[ fields = [] for i, field in enumerate(arrow_fields): field_id = _get_field_id(field) - field_doc = _get_field_doc(field) + field_doc = doc_str.decode() if (doc_str := field.metadata.get(PYARROW_FIELD_DOC_KEY)) else None field_type = field_results[i] if field_type is not None and field_id is not None: fields.append(NestedField(field_id, field.name, field_type, required=not field.nullable, doc=field_doc)) diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index be5f68e429..5189e77a38 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -324,57 +324,57 @@ def test_schema_to_pyarrow_schema(table_schema_nested: Schema) -> None: actual = schema_to_pyarrow(table_schema_nested) expected = """foo: string -- field metadata -- - field_id: '1' + PARQUET:field_id: '1' bar: int32 not null -- field metadata -- - field_id: '2' + PARQUET:field_id: '2' baz: bool -- field metadata -- - field_id: '3' + PARQUET:field_id: '3' qux: list not null child 0, element: string not null -- field metadata -- - field_id: '5' + PARQUET:field_id: '5' -- field metadata -- - field_id: '4' + PARQUET:field_id: '4' quux: map> not null child 0, entries: struct not null> not null child 0, key: string not null -- field metadata -- - field_id: '7' + PARQUET:field_id: '7' child 1, value: map not null child 0, entries: struct not null child 0, key: string not null -- field metadata -- - field_id: '9' + PARQUET:field_id: '9' child 1, value: int32 not null -- field metadata -- - field_id: '10' + PARQUET:field_id: '10' -- field metadata -- - field_id: '8' + PARQUET:field_id: '8' -- field metadata -- - field_id: '6' + PARQUET:field_id: '6' location: list not null> not null child 0, element: struct not null child 0, latitude: float -- field metadata -- - field_id: '13' + PARQUET:field_id: '13' child 1, longitude: float -- field metadata -- - field_id: '14' + PARQUET:field_id: '14' -- field metadata -- - field_id: '12' + PARQUET:field_id: '12' -- field metadata -- - field_id: '11' + PARQUET:field_id: '11' person: struct child 0, name: string -- field metadata -- - field_id: '16' + PARQUET:field_id: '16' child 1, age: int32 not null -- field metadata -- - field_id: '17' + PARQUET:field_id: '17' -- field metadata -- - field_id: '15'""" + PARQUET:field_id: '15'""" assert repr(actual) == expected @@ -890,22 +890,22 @@ def test_projection_add_column(file_int: str) -> None: list: list child 0, element: int32 -- field metadata -- - field_id: '21' + PARQUET:field_id: '21' map: map child 0, entries: struct not null child 0, key: int32 not null -- field metadata -- - field_id: '31' + PARQUET:field_id: '31' child 1, value: string -- field metadata -- - field_id: '32' + PARQUET:field_id: '32' location: struct child 0, lat: double -- field metadata -- - field_id: '41' + PARQUET:field_id: '41' child 1, lon: double -- field metadata -- - field_id: '42'""" + PARQUET:field_id: '42'""" ) @@ -955,10 +955,10 @@ def test_projection_add_column_struct(schema_int: Schema, file_int: str) -> None child 0, entries: struct not null child 0, key: int32 not null -- field metadata -- - field_id: '3' + PARQUET:field_id: '3' child 1, value: string -- field metadata -- - field_id: '4'""" + PARQUET:field_id: '4'""" ) @@ -1006,7 +1006,7 @@ def test_projection_filter(schema_int: Schema, file_int: str) -> None: repr(result_table.schema) == """id: int32 -- field metadata -- - field_id: '1'""" + PARQUET:field_id: '1'""" ) @@ -1184,10 +1184,10 @@ def test_projection_nested_struct_different_parent_id(file_struct: str) -> None: == """location: struct child 0, lat: double -- field metadata -- - field_id: '41' + PARQUET:field_id: '41' child 1, long: double -- field metadata -- - field_id: '42'""" + PARQUET:field_id: '42'""" ) diff --git a/tests/io/test_pyarrow_visitor.py b/tests/io/test_pyarrow_visitor.py index 5194d8660e..58a7818b48 100644 --- a/tests/io/test_pyarrow_visitor.py +++ b/tests/io/test_pyarrow_visitor.py @@ -25,7 +25,7 @@ _ConvertToIceberg, pyarrow_to_schema, schema_to_pyarrow, - visit_pyarrow, + visit_pyarrow, PYARROW_FIELD_ID_KEY, ) from pyiceberg.schema import Schema, visit from pyiceberg.types import ( @@ -209,9 +209,9 @@ def test_pyarrow_variable_binary_to_iceberg() -> None: def test_pyarrow_struct_to_iceberg() -> None: pyarrow_struct = pa.struct( [ - pa.field("foo", pa.string(), nullable=True, metadata={"field_id": "1", "doc": "foo doc"}), - pa.field("bar", pa.int32(), nullable=False, metadata={"field_id": "2"}), - pa.field("baz", pa.bool_(), nullable=True, metadata={"field_id": "3"}), + pa.field("foo", pa.string(), nullable=True, metadata={PYARROW_FIELD_ID_KEY: "1", "doc": "foo doc"}), + pa.field("bar", pa.int32(), nullable=False, metadata={PYARROW_FIELD_ID_KEY: "2"}), + pa.field("baz", pa.bool_(), nullable=True, metadata={PYARROW_FIELD_ID_KEY: "3"}), ] ) expected = StructType( @@ -223,7 +223,7 @@ def test_pyarrow_struct_to_iceberg() -> None: def test_pyarrow_list_to_iceberg() -> None: - pyarrow_list = pa.list_(pa.field("element", pa.int32(), nullable=False, metadata={"field_id": "1"})) + pyarrow_list = pa.list_(pa.field("element", pa.int32(), nullable=False, metadata={PYARROW_FIELD_ID_KEY: "1"})) expected = ListType( element_id=1, element_type=IntegerType(), @@ -234,8 +234,8 @@ def test_pyarrow_list_to_iceberg() -> None: def test_pyarrow_map_to_iceberg() -> None: pyarrow_map = pa.map_( - pa.field("key", pa.int32(), nullable=False, metadata={"field_id": "1"}), - pa.field("value", pa.string(), nullable=False, metadata={"field_id": "2"}), + pa.field("key", pa.int32(), nullable=False, metadata={PYARROW_FIELD_ID_KEY: "1"}), + pa.field("value", pa.string(), nullable=False, metadata={PYARROW_FIELD_ID_KEY: "2"}), ) expected = MapType( key_id=1, From 7c2bf72f45dcdf63c7e3228e33002976bc3103de Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 20 Dec 2023 20:38:56 +0100 Subject: [PATCH 2/5] Comments, thanks HonahX --- pyiceberg/io/pyarrow.py | 16 ++++++++++------ tests/io/test_pyarrow_visitor.py | 14 +++++++------- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 550a6f4712..097ca145dd 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -155,7 +155,7 @@ BUFFER_SIZE = "buffer-size" ICEBERG_SCHEMA = b"iceberg.schema" # The PARQUET: in front means that it is Parquet specific, in this case the field_id -PYARROW_FIELD_ID_KEY = b"PARQUET:field_id" +PYARROW_PARQUET_FIELD_ID_KEY = b"PARQUET:field_id" PYARROW_FIELD_DOC_KEY = b"doc" T = TypeVar("T") @@ -455,7 +455,9 @@ def field(self, field: NestedField, field_result: pa.DataType) -> pa.Field: name=field.name, type=field_result, nullable=field.optional, - metadata={PYARROW_FIELD_DOC_KEY: field.doc, PYARROW_FIELD_ID_KEY: str(field.field_id)} if field.doc else {PYARROW_FIELD_ID_KEY: str(field.field_id)}, + metadata={PYARROW_FIELD_DOC_KEY: field.doc, PYARROW_PARQUET_FIELD_ID_KEY: str(field.field_id)} + if field.doc + else {PYARROW_PARQUET_FIELD_ID_KEY: str(field.field_id)}, ) def list(self, list_type: ListType, element_result: pa.DataType) -> pa.DataType: @@ -719,9 +721,11 @@ def primitive(self, primitive: pa.DataType) -> Optional[T]: def _get_field_id(field: pa.Field) -> Optional[int]: - return int(field_id_str.decode()) if (field_id_str := field.metadata.get(PYARROW_FIELD_ID_KEY)) else None - - + return ( + int(field_id_str.decode()) + if (field.metadata and (field_id_str := field.metadata.get(PYARROW_PARQUET_FIELD_ID_KEY))) + else None + ) class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]): @@ -729,7 +733,7 @@ def _convert_fields(self, arrow_fields: Iterable[pa.Field], field_results: List[ fields = [] for i, field in enumerate(arrow_fields): field_id = _get_field_id(field) - field_doc = doc_str.decode() if (doc_str := field.metadata.get(PYARROW_FIELD_DOC_KEY)) else None + field_doc = doc_str.decode() if (field.metadata and (doc_str := field.metadata.get(PYARROW_FIELD_DOC_KEY))) else None field_type = field_results[i] if field_type is not None and field_id is not None: fields.append(NestedField(field_id, field.name, field_type, required=not field.nullable, doc=field_doc)) diff --git a/tests/io/test_pyarrow_visitor.py b/tests/io/test_pyarrow_visitor.py index 58a7818b48..f32f33efd2 100644 --- a/tests/io/test_pyarrow_visitor.py +++ b/tests/io/test_pyarrow_visitor.py @@ -25,7 +25,7 @@ _ConvertToIceberg, pyarrow_to_schema, schema_to_pyarrow, - visit_pyarrow, PYARROW_FIELD_ID_KEY, + visit_pyarrow, ) from pyiceberg.schema import Schema, visit from pyiceberg.types import ( @@ -209,9 +209,9 @@ def test_pyarrow_variable_binary_to_iceberg() -> None: def test_pyarrow_struct_to_iceberg() -> None: pyarrow_struct = pa.struct( [ - pa.field("foo", pa.string(), nullable=True, metadata={PYARROW_FIELD_ID_KEY: "1", "doc": "foo doc"}), - pa.field("bar", pa.int32(), nullable=False, metadata={PYARROW_FIELD_ID_KEY: "2"}), - pa.field("baz", pa.bool_(), nullable=True, metadata={PYARROW_FIELD_ID_KEY: "3"}), + pa.field("foo", pa.string(), nullable=True, metadata={b"PARQUET:field_id": "1", "doc": "foo doc"}), + pa.field("bar", pa.int32(), nullable=False, metadata={b"PARQUET:field_id": "2"}), + pa.field("baz", pa.bool_(), nullable=True, metadata={b"PARQUET:field_id": "3"}), ] ) expected = StructType( @@ -223,7 +223,7 @@ def test_pyarrow_struct_to_iceberg() -> None: def test_pyarrow_list_to_iceberg() -> None: - pyarrow_list = pa.list_(pa.field("element", pa.int32(), nullable=False, metadata={PYARROW_FIELD_ID_KEY: "1"})) + pyarrow_list = pa.list_(pa.field("element", pa.int32(), nullable=False, metadata={b"PARQUET:field_id": "1"})) expected = ListType( element_id=1, element_type=IntegerType(), @@ -234,8 +234,8 @@ def test_pyarrow_list_to_iceberg() -> None: def test_pyarrow_map_to_iceberg() -> None: pyarrow_map = pa.map_( - pa.field("key", pa.int32(), nullable=False, metadata={PYARROW_FIELD_ID_KEY: "1"}), - pa.field("value", pa.string(), nullable=False, metadata={PYARROW_FIELD_ID_KEY: "2"}), + pa.field("key", pa.int32(), nullable=False, metadata={b"PARQUET:field_id": "1"}), + pa.field("value", pa.string(), nullable=False, metadata={b"PARQUET:field_id": "2"}), ) expected = MapType( key_id=1, From 092cf080e68523065f1bd3030bcf9c2469f63f85 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Thu, 21 Dec 2023 13:00:33 +0100 Subject: [PATCH 3/5] Move from bytes to string --- tests/io/test_pyarrow_visitor.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/io/test_pyarrow_visitor.py b/tests/io/test_pyarrow_visitor.py index f32f33efd2..54285f08b7 100644 --- a/tests/io/test_pyarrow_visitor.py +++ b/tests/io/test_pyarrow_visitor.py @@ -209,9 +209,9 @@ def test_pyarrow_variable_binary_to_iceberg() -> None: def test_pyarrow_struct_to_iceberg() -> None: pyarrow_struct = pa.struct( [ - pa.field("foo", pa.string(), nullable=True, metadata={b"PARQUET:field_id": "1", "doc": "foo doc"}), - pa.field("bar", pa.int32(), nullable=False, metadata={b"PARQUET:field_id": "2"}), - pa.field("baz", pa.bool_(), nullable=True, metadata={b"PARQUET:field_id": "3"}), + pa.field("foo", pa.string(), nullable=True, metadata={"PARQUET:fieldid": "1", "doc": "foo doc"}), + pa.field("bar", pa.int32(), nullable=False, metadata={"PARQUET:fieldid": "2"}), + pa.field("baz", pa.bool_(), nullable=True, metadata={"PARQUET:fieldid": "3"}), ] ) expected = StructType( @@ -223,7 +223,7 @@ def test_pyarrow_struct_to_iceberg() -> None: def test_pyarrow_list_to_iceberg() -> None: - pyarrow_list = pa.list_(pa.field("element", pa.int32(), nullable=False, metadata={b"PARQUET:field_id": "1"})) + pyarrow_list = pa.list_(pa.field("element", pa.int32(), nullable=False, metadata={"PARQUET:fieldid": "1"})) expected = ListType( element_id=1, element_type=IntegerType(), @@ -234,8 +234,8 @@ def test_pyarrow_list_to_iceberg() -> None: def test_pyarrow_map_to_iceberg() -> None: pyarrow_map = pa.map_( - pa.field("key", pa.int32(), nullable=False, metadata={b"PARQUET:field_id": "1"}), - pa.field("value", pa.string(), nullable=False, metadata={b"PARQUET:field_id": "2"}), + pa.field("key", pa.int32(), nullable=False, metadata={"PARQUET:fieldid": "1"}), + pa.field("value", pa.string(), nullable=False, metadata={"PARQUET:fieldid": "2"}), ) expected = MapType( key_id=1, From bdf8d6482b90e6c0b5a186cf51057f4cb3b05428 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Thu, 21 Dec 2023 13:33:26 +0100 Subject: [PATCH 4/5] Cleanup --- tests/io/test_pyarrow_visitor.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/io/test_pyarrow_visitor.py b/tests/io/test_pyarrow_visitor.py index 54285f08b7..d32db812e8 100644 --- a/tests/io/test_pyarrow_visitor.py +++ b/tests/io/test_pyarrow_visitor.py @@ -209,9 +209,9 @@ def test_pyarrow_variable_binary_to_iceberg() -> None: def test_pyarrow_struct_to_iceberg() -> None: pyarrow_struct = pa.struct( [ - pa.field("foo", pa.string(), nullable=True, metadata={"PARQUET:fieldid": "1", "doc": "foo doc"}), - pa.field("bar", pa.int32(), nullable=False, metadata={"PARQUET:fieldid": "2"}), - pa.field("baz", pa.bool_(), nullable=True, metadata={"PARQUET:fieldid": "3"}), + pa.field("foo", pa.string(), nullable=True, metadata={"PARQUET:field_id": "1", "doc": "foo doc"}), + pa.field("bar", pa.int32(), nullable=False, metadata={"PARQUET:field_id": "2"}), + pa.field("baz", pa.bool_(), nullable=True, metadata={"PARQUET:field_id": "3"}), ] ) expected = StructType( @@ -223,7 +223,7 @@ def test_pyarrow_struct_to_iceberg() -> None: def test_pyarrow_list_to_iceberg() -> None: - pyarrow_list = pa.list_(pa.field("element", pa.int32(), nullable=False, metadata={"PARQUET:fieldid": "1"})) + pyarrow_list = pa.list_(pa.field("element", pa.int32(), nullable=False, metadata={"PARQUET:field_id": "1"})) expected = ListType( element_id=1, element_type=IntegerType(), @@ -234,8 +234,8 @@ def test_pyarrow_list_to_iceberg() -> None: def test_pyarrow_map_to_iceberg() -> None: pyarrow_map = pa.map_( - pa.field("key", pa.int32(), nullable=False, metadata={"PARQUET:fieldid": "1"}), - pa.field("value", pa.string(), nullable=False, metadata={"PARQUET:fieldid": "2"}), + pa.field("key", pa.int32(), nullable=False, metadata={"PARQUET:field_id": "1"}), + pa.field("value", pa.string(), nullable=False, metadata={"PARQUET:field_id": "2"}), ) expected = MapType( key_id=1, From 40245ce9d01fed646bc2f4f2a5d400d55b4267b6 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Tue, 9 Jan 2024 07:17:30 -0800 Subject: [PATCH 5/5] Formatting --- tests/io/test_pyarrow_visitor.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/tests/io/test_pyarrow_visitor.py b/tests/io/test_pyarrow_visitor.py index d32db812e8..c307440352 100644 --- a/tests/io/test_pyarrow_visitor.py +++ b/tests/io/test_pyarrow_visitor.py @@ -207,13 +207,11 @@ def test_pyarrow_variable_binary_to_iceberg() -> None: def test_pyarrow_struct_to_iceberg() -> None: - pyarrow_struct = pa.struct( - [ - pa.field("foo", pa.string(), nullable=True, metadata={"PARQUET:field_id": "1", "doc": "foo doc"}), - pa.field("bar", pa.int32(), nullable=False, metadata={"PARQUET:field_id": "2"}), - pa.field("baz", pa.bool_(), nullable=True, metadata={"PARQUET:field_id": "3"}), - ] - ) + pyarrow_struct = pa.struct([ + pa.field("foo", pa.string(), nullable=True, metadata={"PARQUET:field_id": "1", "doc": "foo doc"}), + pa.field("bar", pa.int32(), nullable=False, metadata={"PARQUET:field_id": "2"}), + pa.field("baz", pa.bool_(), nullable=True, metadata={"PARQUET:field_id": "3"}), + ]) expected = StructType( NestedField(field_id=1, name="foo", field_type=StringType(), required=False, doc="foo doc"), NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True),