Skip to content

Commit 50a1a69

Browse files
authored
Add logic for table format-version updates (#55)
* Add logic for table format-version updates Add a few more tests * Set -> Upgrade
1 parent bd4485d commit 50a1a69

File tree

4 files changed

+67
-3
lines changed

4 files changed

+67
-3
lines changed

dev/provision.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,3 +279,25 @@
279279
(CAST('2023-03-12' AS date), CAST('2023-03-12 12:22:00' AS timestamp), 12, 'l');
280280
"""
281281
)
282+
283+
# There is an issue with CREATE OR REPLACE
284+
# https://github.com/apache/iceberg/issues/8756
285+
spark.sql(
286+
"""
287+
DROP TABLE IF EXISTS default.test_table_version
288+
"""
289+
)
290+
291+
spark.sql(
292+
"""
293+
CREATE TABLE default.test_table_version (
294+
dt date,
295+
number integer,
296+
letter string
297+
)
298+
USING iceberg
299+
TBLPROPERTIES (
300+
'format-version'='1'
301+
);
302+
"""
303+
)

dev/spark-defaults.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ spark.sql.catalog.demo org.apache.iceberg.spark.SparkCatalog
2020
spark.sql.catalog.demo.type rest
2121
spark.sql.catalog.demo.uri http://rest:8181
2222
spark.sql.catalog.demo.io-impl org.apache.iceberg.aws.s3.S3FileIO
23-
spark.sql.catalog.demo.warehouse s3a://warehouse/wh/
23+
spark.sql.catalog.demo.warehouse s3://warehouse/wh/
2424
spark.sql.catalog.demo.s3.endpoint http://minio:9000
2525
spark.sql.defaultCatalog demo
2626
spark.eventLog.enabled true

pyiceberg/table/__init__.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ def _append_requirements(self, *new_requirements: TableRequirement) -> Transacti
166166
self._requirements = self._requirements + new_requirements
167167
return self
168168

169-
def set_table_version(self, format_version: Literal[1, 2]) -> Transaction:
169+
def upgrade_table_version(self, format_version: Literal[1, 2]) -> Transaction:
170170
"""Set the table to a certain version.
171171
172172
Args:
@@ -175,7 +175,15 @@ def set_table_version(self, format_version: Literal[1, 2]) -> Transaction:
175175
Returns:
176176
The alter table builder.
177177
"""
178-
raise NotImplementedError("Not yet implemented")
178+
if format_version not in {1, 2}:
179+
raise ValueError(f"Unsupported table format version: {format_version}")
180+
181+
if format_version < self._table.metadata.format_version:
182+
raise ValueError(f"Cannot downgrade v{self._table.metadata.format_version} table to v{format_version}")
183+
if format_version > self._table.metadata.format_version:
184+
return self._append_updates(UpgradeFormatVersionUpdate(format_version=format_version))
185+
else:
186+
return self
179187

180188
def set_properties(self, **updates: str) -> Transaction:
181189
"""Set properties.
@@ -482,6 +490,10 @@ def scan(
482490
limit=limit,
483491
)
484492

493+
@property
494+
def format_version(self) -> Literal[1, 2]:
495+
return self.metadata.format_version
496+
485497
def schema(self) -> Schema:
486498
"""Return the schema for this table."""
487499
return next(schema for schema in self.metadata.schemas if schema.schema_id == self.metadata.current_schema_id)

tests/test_integration.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ def table_test_all_types(catalog: Catalog) -> Table:
8383
return catalog.load_table("default.test_all_types")
8484

8585

86+
@pytest.fixture()
87+
def table_test_table_version(catalog: Catalog) -> Table:
88+
return catalog.load_table("default.test_table_version")
89+
90+
8691
TABLE_NAME = ("default", "t1")
8792

8893

@@ -366,3 +371,28 @@ def test_scan_tag(test_positional_mor_deletes: Table) -> None:
366371
def test_scan_branch(test_positional_mor_deletes: Table) -> None:
367372
arrow_table = test_positional_mor_deletes.scan().use_ref("without_5").to_arrow()
368373
assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12]
374+
375+
376+
@pytest.mark.integration
377+
def test_upgrade_table_version(table_test_table_version: Table) -> None:
378+
assert table_test_table_version.format_version == 1
379+
380+
with table_test_table_version.transaction() as transaction:
381+
transaction.upgrade_table_version(format_version=1)
382+
383+
assert table_test_table_version.format_version == 1
384+
385+
with table_test_table_version.transaction() as transaction:
386+
transaction.upgrade_table_version(format_version=2)
387+
388+
assert table_test_table_version.format_version == 2
389+
390+
with pytest.raises(ValueError) as e: # type: ignore
391+
with table_test_table_version.transaction() as transaction:
392+
transaction.upgrade_table_version(format_version=1)
393+
assert "Cannot downgrade v2 table to v1" in str(e.value)
394+
395+
with pytest.raises(ValueError) as e:
396+
with table_test_table_version.transaction() as transaction:
397+
transaction.upgrade_table_version(format_version=3)
398+
assert "Unsupported table format version: 3" in str(e.value)

0 commit comments

Comments
 (0)