Skip to content

Commit 9fc37fe

Browse files
Fix setting V1 format version for Non-REST catalogs
1 parent a576fc9 commit 9fc37fe

File tree

6 files changed

+204
-4
lines changed

6 files changed

+204
-4
lines changed

pyiceberg/table/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,8 @@ class TableProperties:
166166
METRICS_MODE_COLUMN_CONF_PREFIX = "write.metadata.metrics.column"
167167

168168
DEFAULT_NAME_MAPPING = "schema.name-mapping.default"
169+
FORMAT_VERSION = "format-version"
170+
DEFAULT_FORMAT_VERSION = 2
169171

170172

171173
class PropertyUtil:

pyiceberg/table/metadata.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -260,8 +260,10 @@ def set_v2_compatible_defaults(cls, data: Dict[str, Any]) -> Dict[str, Any]:
260260
The TableMetadata with the defaults applied.
261261
"""
262262
# When the schema doesn't have an ID
263-
if data.get("schema") and "schema_id" not in data["schema"]:
264-
data["schema"]["schema_id"] = DEFAULT_SCHEMA_ID
263+
schema = data.get("schema")
264+
if isinstance(schema, dict):
265+
if "schema_id" not in schema and "schema-id" not in schema:
266+
schema["schema_id"] = DEFAULT_SCHEMA_ID
265267

266268
return data
267269

@@ -335,7 +337,7 @@ def to_v2(self) -> TableMetadataV2:
335337
metadata["format-version"] = 2
336338
return TableMetadataV2.model_validate(metadata)
337339

338-
format_version: Literal[1] = Field(alias="format-version")
340+
format_version: Literal[1] = Field(alias="format-version", default=1)
339341
"""An integer version number for the format. Currently, this can be 1 or 2
340342
based on the spec. Implementations must throw an exception if a table’s
341343
version is higher than the supported version."""
@@ -404,13 +406,34 @@ def new_table_metadata(
404406
properties: Properties = EMPTY_DICT,
405407
table_uuid: Optional[uuid.UUID] = None,
406408
) -> TableMetadata:
409+
from pyiceberg.table import TableProperties
410+
407411
fresh_schema = assign_fresh_schema_ids(schema)
408412
fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, schema, fresh_schema)
409413
fresh_sort_order = assign_fresh_sort_order_ids(sort_order, schema, fresh_schema)
410414

411415
if table_uuid is None:
412416
table_uuid = uuid.uuid4()
413417

418+
# Remove format-version so it does not get persisted
419+
format_version = int(properties.pop(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION))
420+
if format_version == 1:
421+
return TableMetadataV1(
422+
location=location,
423+
schema_=fresh_schema,
424+
last_column_id=fresh_schema.highest_field_id,
425+
current_schema_id=fresh_schema.schema_id,
426+
schema=fresh_schema,
427+
partition_spec=[fresh_partition_spec.model_dump()],
428+
partition_specs=[fresh_partition_spec],
429+
default_spec_id=fresh_partition_spec.spec_id,
430+
sort_orders=[fresh_sort_order],
431+
default_sort_order_id=fresh_sort_order.order_id,
432+
properties=properties,
433+
last_partition_id=fresh_partition_spec.last_assigned_field_id,
434+
table_uuid=table_uuid,
435+
)
436+
414437
return TableMetadataV2(
415438
location=location,
416439
schemas=[fresh_schema],

tests/catalog/test_glue.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,38 @@ def test_create_table_with_database_location(
7272
assert storage_descriptor["Location"] == f"s3://{BUCKET_NAME}/{database_name}.db/{table_name}"
7373

7474

75+
@mock_aws
76+
def test_create_v1_table(
77+
_bucket_initialize: None,
78+
_glue: boto3.client,
79+
moto_endpoint_url: str,
80+
table_schema_nested: Schema,
81+
database_name: str,
82+
table_name: str,
83+
) -> None:
84+
catalog_name = "glue"
85+
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url})
86+
test_catalog.create_namespace(namespace=database_name, properties={"location": f"s3://{BUCKET_NAME}/{database_name}.db"})
87+
table = test_catalog.create_table((database_name, table_name), table_schema_nested, properties={"format-version": "1"})
88+
assert table.format_version == 1
89+
90+
table_info = _glue.get_table(
91+
DatabaseName=database_name,
92+
Name=table_name,
93+
)
94+
95+
storage_descriptor = table_info["Table"]["StorageDescriptor"]
96+
columns = storage_descriptor["Columns"]
97+
assert len(columns) == len(table_schema_nested.fields)
98+
assert columns[0] == {
99+
"Name": "foo",
100+
"Type": "string",
101+
"Parameters": {"iceberg.field.id": "1", "iceberg.field.optional": "true", "iceberg.field.current": "true"},
102+
}
103+
104+
assert storage_descriptor["Location"] == f"s3://{BUCKET_NAME}/{database_name}.db/{table_name}"
105+
106+
75107
@mock_aws
76108
def test_create_table_with_default_warehouse(
77109
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str

tests/catalog/test_hive.py

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
)
4444
from pyiceberg.partitioning import PartitionField, PartitionSpec
4545
from pyiceberg.schema import Schema
46-
from pyiceberg.table.metadata import TableMetadataUtil, TableMetadataV2
46+
from pyiceberg.table.metadata import TableMetadataUtil, TableMetadataV1, TableMetadataV2
4747
from pyiceberg.table.refs import SnapshotRef, SnapshotRefType
4848
from pyiceberg.table.snapshots import (
4949
MetadataLogEntry,
@@ -295,6 +295,61 @@ def test_create_table(table_schema_simple: Schema, hive_database: HiveDatabase,
295295
assert metadata.model_dump() == expected.model_dump()
296296

297297

298+
@patch("time.time", MagicMock(return_value=12345))
299+
def test_create_v1_table(table_schema_simple: Schema, hive_database: HiveDatabase, hive_table: HiveTable) -> None:
300+
catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL)
301+
302+
catalog._client = MagicMock()
303+
catalog._client.__enter__().create_table.return_value = None
304+
catalog._client.__enter__().get_table.return_value = hive_table
305+
catalog._client.__enter__().get_database.return_value = hive_database
306+
catalog.create_table(
307+
("default", "table"), schema=table_schema_simple, properties={"owner": "javaberg", "format-version": "1"}
308+
)
309+
310+
# Test creating V1 table
311+
called_v1_table: HiveTable = catalog._client.__enter__().create_table.call_args[0][0]
312+
metadata_location = called_v1_table.parameters["metadata_location"]
313+
with open(metadata_location, encoding=UTF8) as f:
314+
payload = f.read()
315+
316+
expected_schema = Schema(
317+
NestedField(field_id=1, name="foo", field_type=StringType(), required=False),
318+
NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True),
319+
NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False),
320+
schema_id=0,
321+
identifier_field_ids=[2],
322+
)
323+
actual_v1_metadata = TableMetadataUtil.parse_raw(payload)
324+
expected_spec = PartitionSpec()
325+
expected_spec_dump = expected_spec.model_dump()
326+
expected_spec_dump["fields"] = []
327+
expected_v1_metadata = TableMetadataV1(
328+
location=actual_v1_metadata.location,
329+
table_uuid=actual_v1_metadata.table_uuid,
330+
last_updated_ms=actual_v1_metadata.last_updated_ms,
331+
last_column_id=3,
332+
schema=expected_schema,
333+
schemas=[expected_schema],
334+
current_schema_id=0,
335+
last_partition_id=1000,
336+
properties={"owner": "javaberg", "write.parquet.compression-codec": "zstd"},
337+
partition_spec=[expected_spec_dump],
338+
partition_specs=[expected_spec],
339+
default_spec_id=0,
340+
current_snapshot_id=None,
341+
snapshots=[],
342+
snapshot_log=[],
343+
metadata_log=[],
344+
sort_orders=[SortOrder(order_id=0)],
345+
default_sort_order_id=0,
346+
refs={},
347+
format_version=1,
348+
)
349+
350+
assert actual_v1_metadata.model_dump() == expected_v1_metadata.model_dump()
351+
352+
298353
def test_load_table(hive_table: HiveTable) -> None:
299354
catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL)
300355

tests/catalog/test_sql.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
)
3939
from pyiceberg.io import FSSPEC_FILE_IO, PY_IO_IMPL
4040
from pyiceberg.io.pyarrow import schema_to_pyarrow
41+
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC
4142
from pyiceberg.schema import Schema
4243
from pyiceberg.table.snapshots import Operation
4344
from pyiceberg.table.sorting import (
@@ -158,6 +159,24 @@ def test_create_table_default_sort_order(catalog: SqlCatalog, table_schema_neste
158159
catalog.drop_table(random_identifier)
159160

160161

162+
@pytest.mark.parametrize(
163+
'catalog',
164+
[
165+
lazy_fixture('catalog_memory'),
166+
lazy_fixture('catalog_sqlite'),
167+
],
168+
)
169+
def test_create_v1_table(catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: Identifier) -> None:
170+
database_name, _table_name = random_identifier
171+
catalog.create_namespace(database_name)
172+
table = catalog.create_table(random_identifier, table_schema_nested, properties={"format-version": "1"})
173+
assert table.sort_order().order_id == 0, "Order ID must match"
174+
assert table.sort_order().is_unsorted is True, "Order must be unsorted"
175+
assert table.format_version == 1
176+
assert table.spec() == UNPARTITIONED_PARTITION_SPEC
177+
catalog.drop_table(random_identifier)
178+
179+
161180
@pytest.mark.parametrize(
162181
'catalog',
163182
[

tests/table/test_metadata.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,75 @@ def test_migrate_v1_partition_specs(example_table_metadata_v1: Dict[str, Any]) -
199199
]
200200

201201

202+
def test_new_table_metadata_with_explicit_v1_format() -> None:
203+
schema = Schema(
204+
NestedField(field_id=10, name="foo", field_type=StringType(), required=False),
205+
NestedField(field_id=22, name="bar", field_type=IntegerType(), required=True),
206+
NestedField(field_id=33, name="baz", field_type=BooleanType(), required=False),
207+
schema_id=10,
208+
identifier_field_ids=[22],
209+
)
210+
211+
partition_spec = PartitionSpec(
212+
PartitionField(source_id=22, field_id=1022, transform=IdentityTransform(), name="bar"), spec_id=10
213+
)
214+
215+
sort_order = SortOrder(
216+
SortField(source_id=10, transform=IdentityTransform(), direction=SortDirection.ASC, null_order=NullOrder.NULLS_LAST),
217+
order_id=10,
218+
)
219+
220+
actual = new_table_metadata(
221+
schema=schema,
222+
partition_spec=partition_spec,
223+
sort_order=sort_order,
224+
location="s3://some_v1_location/",
225+
properties={'format-version': "1"},
226+
)
227+
228+
expected_schema = Schema(
229+
NestedField(field_id=1, name="foo", field_type=StringType(), required=False),
230+
NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True),
231+
NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False),
232+
schema_id=0,
233+
identifier_field_ids=[2],
234+
)
235+
236+
expected_spec = PartitionSpec(PartitionField(source_id=2, field_id=1000, transform=IdentityTransform(), name="bar"))
237+
238+
expected = TableMetadataV1(
239+
location="s3://some_v1_location/",
240+
table_uuid=actual.table_uuid,
241+
last_updated_ms=actual.last_updated_ms,
242+
last_column_id=3,
243+
schemas=[expected_schema],
244+
schema_=expected_schema,
245+
current_schema_id=0,
246+
partition_spec=[expected_spec.model_dump()],
247+
partition_specs=[PartitionSpec(PartitionField(source_id=2, field_id=1000, transform=IdentityTransform(), name="bar"))],
248+
default_spec_id=0,
249+
last_partition_id=1000,
250+
properties={},
251+
current_snapshot_id=None,
252+
snapshots=[],
253+
snapshot_log=[],
254+
metadata_log=[],
255+
sort_orders=[
256+
SortOrder(
257+
SortField(
258+
source_id=1, transform=IdentityTransform(), direction=SortDirection.ASC, null_order=NullOrder.NULLS_LAST
259+
),
260+
order_id=1,
261+
)
262+
],
263+
default_sort_order_id=1,
264+
refs={},
265+
format_version=1,
266+
)
267+
268+
assert actual.model_dump() == expected.model_dump()
269+
270+
202271
def test_invalid_format_version(example_table_metadata_v1: Dict[str, Any]) -> None:
203272
"""Test the exception when trying to load an unknown version"""
204273

0 commit comments

Comments
 (0)