From 945d61d3a9f5d6ace47d32731bef6182520f3e08 Mon Sep 17 00:00:00 2001 From: Yingjian Wu Date: Wed, 23 Oct 2024 02:11:00 -0700 Subject: [PATCH 1/4] abort the whole transaction if any update on the chain has failed --- pyiceberg/table/__init__.py | 11 +++++++--- tests/integration/test_writes/test_writes.py | 23 ++++++++++++++++++++ 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 66b22a7a79..1b6cd4141d 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -23,6 +23,7 @@ from dataclasses import dataclass from functools import cached_property from itertools import chain +from types import TracebackType from typing import ( TYPE_CHECKING, Any, @@ -33,6 +34,7 @@ Optional, Set, Tuple, + Type, TypeVar, Union, ) @@ -231,9 +233,12 @@ def __enter__(self) -> Transaction: """Start a transaction to update the table.""" return self - def __exit__(self, _: Any, value: Any, traceback: Any) -> None: - """Close and commit the transaction.""" - self.commit_transaction() + def __exit__( + self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType] + ) -> None: + """Close and commit the transaction if no exceptions have been raised.""" + if exctype is None and excinst is None and exctb is None: + self.commit_transaction() def _apply(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...] = ()) -> Transaction: """Check if the requirements are met, and applies the updates to the metadata.""" diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index fc2746c614..c1024193d2 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -1448,3 +1448,26 @@ def test_rewrite_manifest_after_partition_evolution(session_catalog: Catalog) -> EqualTo("category", "A"), ), ) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_abort_table_transaction_on_exception( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: + identifier = "default.table_test_abort_table_transaction_on_exception" + tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) + + # Pre-populate some data + tbl.append(arrow_table_with_null) + assert len(tbl.scan().to_pandas()) == 3 + + # try to commit a transaction that raises exception at the middle + with pytest.raises(ValueError): + with tbl.transaction() as txn: + txn.append(arrow_table_with_null) + raise ValueError + txn.append(arrow_table_with_null) # type: ignore + + # Validate the transaction is aborted + assert len(tbl.scan().to_pandas()) == 3 # type: ignore From ab8a0a0e8348498c15c2ddb4d7bc9605277d10f6 Mon Sep 17 00:00:00 2001 From: stevie9868 <151791653+stevie9868@users.noreply.github.com> Date: Mon, 28 Oct 2024 16:38:59 -0700 Subject: [PATCH 2/4] Update tests/integration/test_writes/test_writes.py Co-authored-by: Kevin Liu --- tests/integration/test_writes/test_writes.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index c1024193d2..980185dcda 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -1460,7 +1460,8 @@ def test_abort_table_transaction_on_exception( # Pre-populate some data tbl.append(arrow_table_with_null) - assert len(tbl.scan().to_pandas()) == 3 + table_size = len(arrow_table_with_null) + assert len(tbl.scan().to_pandas()) == table_size # try to commit a transaction that raises exception at the middle with pytest.raises(ValueError): From af5d165a392d256a761a239bef2859f56241c2da Mon Sep 17 00:00:00 2001 From: stevie9868 <151791653+stevie9868@users.noreply.github.com> Date: Mon, 28 Oct 2024 16:39:08 -0700 Subject: [PATCH 3/4] Update tests/integration/test_writes/test_writes.py Co-authored-by: Kevin Liu --- tests/integration/test_writes/test_writes.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 980185dcda..5a4b37583d 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -1470,5 +1470,5 @@ def test_abort_table_transaction_on_exception( raise ValueError txn.append(arrow_table_with_null) # type: ignore - # Validate the transaction is aborted - assert len(tbl.scan().to_pandas()) == 3 # type: ignore + # Validate the transaction is aborted and no partial update is applied + assert len(tbl.scan().to_pandas()) == table_size From fbb76042f38e0ab0c14b161f45e6151332d3f00e Mon Sep 17 00:00:00 2001 From: Yingjian Wu Date: Mon, 28 Oct 2024 17:31:53 -0700 Subject: [PATCH 4/4] add type:ignore to prevent lint error --- tests/integration/test_writes/test_writes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 5a4b37583d..9cccb542d6 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -1471,4 +1471,4 @@ def test_abort_table_transaction_on_exception( txn.append(arrow_table_with_null) # type: ignore # Validate the transaction is aborted and no partial update is applied - assert len(tbl.scan().to_pandas()) == table_size + assert len(tbl.scan().to_pandas()) == table_size # type: ignore