Skip to content

Commit 4eae64a

Browse files
author
Yingjian Wu
committed
abort the whole transaction if any update on the chain has failed
1 parent ff3a249 commit 4eae64a

File tree

2 files changed

+74
-5
lines changed

2 files changed

+74
-5
lines changed

pyiceberg/table/__init__.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from dataclasses import dataclass
2424
from functools import cached_property
2525
from itertools import chain
26+
from types import TracebackType
2627
from typing import (
2728
TYPE_CHECKING,
2829
Any,
@@ -33,6 +34,7 @@
3334
Optional,
3435
Set,
3536
Tuple,
37+
Type,
3638
TypeVar,
3739
Union,
3840
)
@@ -231,9 +233,13 @@ def __enter__(self) -> Transaction:
231233
"""Start a transaction to update the table."""
232234
return self
233235

234-
def __exit__(self, _: Any, value: Any, traceback: Any) -> None:
235-
"""Close and commit the transaction."""
236-
self.commit_transaction()
236+
def __exit__(
237+
self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType]
238+
) -> None:
239+
"""Close and commit the transaction, or handle exceptions."""
240+
# Only commit the full transaction, if there is no exception in all updates on the chain
241+
if exctb is None:
242+
self.commit_transaction()
237243

238244
def _apply(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...] = ()) -> Transaction:
239245
"""Check if the requirements are met, and applies the updates to the metadata."""

tests/catalog/test_base.py

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,17 @@
4141
NoSuchTableError,
4242
TableAlreadyExistsError,
4343
)
44-
from pyiceberg.io import WAREHOUSE, load_file_io
44+
from pyiceberg.expressions import BooleanExpression
45+
from pyiceberg.io import WAREHOUSE, FileIO, load_file_io
4546
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec
4647
from pyiceberg.schema import Schema
4748
from pyiceberg.table import (
49+
ALWAYS_TRUE,
4850
CommitTableResponse,
4951
Table,
52+
Transaction,
5053
)
51-
from pyiceberg.table.metadata import new_table_metadata
54+
from pyiceberg.table.metadata import TableMetadata, new_table_metadata
5255
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
5356
from pyiceberg.table.update import (
5457
AddSchemaUpdate,
@@ -265,6 +268,42 @@ def drop_view(self, identifier: Union[str, Identifier]) -> None:
265268
raise NotImplementedError
266269

267270

271+
class TransactionThrowExceptionInOverwrite(Transaction):
272+
def __init__(self, table: Table):
273+
super().__init__(table)
274+
275+
# Override the default overwrite to simulate exception during append
276+
def overwrite(
277+
self,
278+
df: pa.Table,
279+
overwrite_filter: Union[BooleanExpression, str] = ALWAYS_TRUE,
280+
snapshot_properties: Dict[str, str] = EMPTY_DICT,
281+
) -> None:
282+
self.delete(delete_filter=overwrite_filter, snapshot_properties=snapshot_properties)
283+
raise Exception("Fail Append Commit Exception")
284+
285+
286+
class TableThrowExceptionInOverwrite(Table):
287+
def __init__(self, identifier: Identifier, metadata: TableMetadata, metadata_location: str, io: FileIO, catalog: Catalog):
288+
# Call the constructor of the parent class
289+
super().__init__(identifier, metadata, metadata_location, io, catalog)
290+
291+
def transaction(self) -> Transaction:
292+
return TransactionThrowExceptionInOverwrite(self)
293+
294+
295+
def given_catalog_has_a_table_throw_exception_in_overwrite(
296+
catalog: InMemoryCatalog, properties: Properties = EMPTY_DICT
297+
) -> TableThrowExceptionInOverwrite:
298+
table = catalog.create_table(
299+
identifier=TEST_TABLE_IDENTIFIER,
300+
schema=TEST_TABLE_SCHEMA,
301+
partition_spec=TEST_TABLE_PARTITION_SPEC,
302+
properties=properties or TEST_TABLE_PROPERTIES,
303+
)
304+
return TableThrowExceptionInOverwrite(table.identifier, table.metadata, table.metadata_location, table.io, table.catalog)
305+
306+
268307
@pytest.fixture
269308
def catalog(tmp_path: PosixPath) -> InMemoryCatalog:
270309
return InMemoryCatalog("test.in_memory.catalog", **{WAREHOUSE: tmp_path.absolute().as_posix(), "test.key": "test.value"})
@@ -766,3 +805,27 @@ def test_table_properties_raise_for_none_value(catalog: InMemoryCatalog) -> None
766805
with pytest.raises(ValidationError) as exc_info:
767806
_ = given_catalog_has_a_table(catalog, properties=property_with_none)
768807
assert "None type is not a supported value in properties: property_name" in str(exc_info.value)
808+
809+
810+
def test_table_overwrite_with_exception(catalog: InMemoryCatalog) -> None:
811+
given_table = given_catalog_has_a_table_throw_exception_in_overwrite(catalog)
812+
# Populate some initial data
813+
data = pa.Table.from_pylist(
814+
[{"x": 1, "y": 2, "z": 3}, {"x": 4, "y": 5, "z": 6}],
815+
schema=TEST_TABLE_SCHEMA.as_arrow(),
816+
)
817+
given_table.append(data)
818+
819+
# Data to overwrite
820+
data = pa.Table.from_pylist(
821+
[{"x": 7, "y": 8, "z": 9}],
822+
schema=TEST_TABLE_SCHEMA.as_arrow(),
823+
)
824+
825+
# Since overwrite has an exception, we should fail the whole overwrite transaction
826+
try:
827+
given_table.overwrite(data)
828+
except Exception as e:
829+
assert str(e) == "Fail Append Commit Exception", f"Expected 'Fail Append Commit Exception', but got '{str(e)}'"
830+
831+
assert len(given_table.scan().to_arrow()) == 2

0 commit comments

Comments
 (0)