From 5d3c5ec9f720c5661e4e0616187b7fefa805c510 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Thu, 11 Jan 2024 21:58:27 +0000 Subject: [PATCH 1/7] sql commit --- pyiceberg/catalog/sql.py | 39 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index 77ece56163..10b9a83614 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -48,6 +48,7 @@ PropertiesUpdateSummary, ) from pyiceberg.exceptions import ( + CommitFailedException, NamespaceAlreadyExistsError, NamespaceNotEmptyError, NoSuchNamespaceError, @@ -59,7 +60,7 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.serializers import FromInputFile -from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table +from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT @@ -329,8 +330,42 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons Raises: NoSuchTableError: If a table with the given identifier does not exist. + CommitFailedException: If the commit failed. """ - raise NotImplementedError + from_identifier_tuple = tuple(table_request.identifier.namespace.root + [table_request.identifier.name]) + current_table = self.load_table(from_identifier_tuple) + from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError) + base_metadata = current_table.metadata + for requirement in table_request.requirements: + requirement.validate(base_metadata) + + updated_metadata = update_table_metadata(base_metadata, table_request.updates) + if updated_metadata == base_metadata: + # no changes, do nothing + return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location) + + # write new metadata + new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 + new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version) + self._write_metadata(updated_metadata, current_table.io, new_metadata_location) + + with Session(self.engine) as session: + stmt = ( + update(IcebergTables) + .where( + IcebergTables.catalog_name == self.name, + IcebergTables.table_namespace == from_database_name, + IcebergTables.table_name == from_table_name, + IcebergTables.metadata_location == current_table.metadata_location, + ) + .values(metadata_location=new_metadata_location, previous_metadata_location=current_table.metadata_location) + ) + result = session.execute(stmt) + if result.rowcount < 1: + raise CommitFailedException("A concurrent commit has been made.") + session.commit() + + return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) def _namespace_exists(self, identifier: Union[str, Identifier]) -> bool: namespace = self.identifier_to_database(identifier) From 079a3dc741e54339e5b4c4619103fe22b6e2a899 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Fri, 12 Jan 2024 02:24:55 +0000 Subject: [PATCH 2/7] SqlCatalog _commit_table --- pyiceberg/catalog/sql.py | 12 ++++++++---- tests/catalog/test_sql.py | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index 10b9a83614..d3526ec26a 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -332,9 +332,11 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons NoSuchTableError: If a table with the given identifier does not exist. CommitFailedException: If the commit failed. """ - from_identifier_tuple = tuple(table_request.identifier.namespace.root + [table_request.identifier.name]) - current_table = self.load_table(from_identifier_tuple) - from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError) + identifier_tuple = self.identifier_to_tuple_without_catalog( + tuple(table_request.identifier.namespace.root + [table_request.identifier.name]) + ) + current_table = self.load_table(identifier_tuple) + from_database_name, from_table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError) base_metadata = current_table.metadata for requirement in table_request.requirements: requirement.validate(base_metadata) @@ -362,7 +364,9 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons ) result = session.execute(stmt) if result.rowcount < 1: - raise CommitFailedException("A concurrent commit has been made.") + raise CommitFailedException( + "Commit was unsuccessful as a conflicting concurrent commit was made to the database." + ) session.commit() return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py index 95dc24ad15..c58e755a8c 100644 --- a/tests/catalog/test_sql.py +++ b/tests/catalog/test_sql.py @@ -42,6 +42,7 @@ SortOrder, ) from pyiceberg.transforms import IdentityTransform +from pyiceberg.types import IntegerType @pytest.fixture(name="warehouse", scope="session") @@ -664,3 +665,35 @@ def test_update_namespace_properties(catalog: SqlCatalog, database_name: str) -> else: assert k in update_report.removed assert "updated test description" == catalog.load_namespace_properties(database_name)["comment"] + + +@pytest.mark.parametrize( + 'catalog', + [ + lazy_fixture('catalog_memory'), + lazy_fixture('catalog_sqlite'), + ], +) +def test_commit_table(catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: Identifier) -> None: + database_name, _table_name = random_identifier + catalog.create_namespace(database_name) + table = catalog.create_table(random_identifier, table_schema_nested) + + assert catalog._parse_metadata_version(table.metadata_location) == 0 + assert table.metadata.current_schema_id == 0 + + transaction = table.transaction() + update = transaction.update_schema() + update.add_column(path="b", field_type=IntegerType()) + update.commit() + transaction.commit_transaction() + + updated_table_metadata = table.metadata + + assert catalog._parse_metadata_version(table.metadata_location) == 1 + assert updated_table_metadata.current_schema_id == 1 + assert len(updated_table_metadata.schemas) == 2 + new_schema = next(schema for schema in updated_table_metadata.schemas if schema.schema_id == 1) + assert new_schema + assert new_schema == update._apply() + assert new_schema.find_field("b").field_type == IntegerType() From e09ba7cc248f857ff4cb23cde71f5f0f393c68bb Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Fri, 12 Jan 2024 04:45:34 +0000 Subject: [PATCH 3/7] better variable names --- pyiceberg/catalog/sql.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index d3526ec26a..14ad97ed9f 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -336,7 +336,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons tuple(table_request.identifier.namespace.root + [table_request.identifier.name]) ) current_table = self.load_table(identifier_tuple) - from_database_name, from_table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError) + database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError) base_metadata = current_table.metadata for requirement in table_request.requirements: requirement.validate(base_metadata) @@ -356,8 +356,8 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons update(IcebergTables) .where( IcebergTables.catalog_name == self.name, - IcebergTables.table_namespace == from_database_name, - IcebergTables.table_name == from_table_name, + IcebergTables.table_namespace == database_name, + IcebergTables.table_name == table_name, IcebergTables.metadata_location == current_table.metadata_location, ) .values(metadata_location=new_metadata_location, previous_metadata_location=current_table.metadata_location) From 6bf3a316ddaf7f9912f13c69d48875c62bc8a224 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Sat, 13 Jan 2024 07:05:18 +0000 Subject: [PATCH 4/7] fallback to FOR UPDATE commit when engine.dialect.supports_sane_rowcount is False --- pyiceberg/catalog/sql.py | 121 +++++++++++++++++++++++++++----------- tests/catalog/test_sql.py | 23 ++++++++ 2 files changed, 110 insertions(+), 34 deletions(-) diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index 14ad97ed9f..7c19de83e0 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -31,7 +31,7 @@ union, update, ) -from sqlalchemy.exc import IntegrityError, OperationalError +from sqlalchemy.exc import IntegrityError, NoResultFound, OperationalError from sqlalchemy.orm import ( DeclarativeBase, Mapped, @@ -269,16 +269,32 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None: identifier_tuple = self.identifier_to_tuple_without_catalog(identifier) database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError) with Session(self.engine) as session: - res = session.execute( - delete(IcebergTables).where( - IcebergTables.catalog_name == self.name, - IcebergTables.table_namespace == database_name, - IcebergTables.table_name == table_name, + if self.engine.dialect.supports_sane_rowcount: + res = session.execute( + delete(IcebergTables).where( + IcebergTables.catalog_name == self.name, + IcebergTables.table_namespace == database_name, + IcebergTables.table_name == table_name, + ) ) - ) + if res.rowcount < 1: + raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") + else: + try: + tbl = ( + session.query(IcebergTables) + .with_for_update(of=IcebergTables, nowait=True) + .filter( + IcebergTables.catalog_name == self.name, + IcebergTables.table_namespace == database_name, + IcebergTables.table_name == table_name, + ) + .one() + ) + session.delete(tbl) + except NoResultFound as e: + raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e session.commit() - if res.rowcount < 1: - raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: """Rename a fully classified table name. @@ -302,18 +318,35 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U raise NoSuchNamespaceError(f"Namespace does not exist: {to_database_name}") with Session(self.engine) as session: try: - stmt = ( - update(IcebergTables) - .where( - IcebergTables.catalog_name == self.name, - IcebergTables.table_namespace == from_database_name, - IcebergTables.table_name == from_table_name, + if self.engine.dialect.supports_sane_rowcount: + stmt = ( + update(IcebergTables) + .where( + IcebergTables.catalog_name == self.name, + IcebergTables.table_namespace == from_database_name, + IcebergTables.table_name == from_table_name, + ) + .values(table_namespace=to_database_name, table_name=to_table_name) ) - .values(table_namespace=to_database_name, table_name=to_table_name) - ) - result = session.execute(stmt) - if result.rowcount < 1: - raise NoSuchTableError(f"Table does not exist: {from_table_name}") + result = session.execute(stmt) + if result.rowcount < 1: + raise NoSuchTableError(f"Table does not exist: {from_table_name}") + else: + try: + tbl = ( + session.query(IcebergTables) + .with_for_update(of=IcebergTables, nowait=True) + .filter( + IcebergTables.catalog_name == self.name, + IcebergTables.table_namespace == from_database_name, + IcebergTables.table_name == from_table_name, + ) + .one() + ) + tbl.table_namespace = to_database_name + tbl.table_name = to_table_name + except NoResultFound as e: + raise NoSuchTableError(f"Table does not exist: {from_table_name}") from e session.commit() except IntegrityError as e: raise TableAlreadyExistsError(f"Table {to_database_name}.{to_table_name} already exists") from e @@ -352,21 +385,41 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons self._write_metadata(updated_metadata, current_table.io, new_metadata_location) with Session(self.engine) as session: - stmt = ( - update(IcebergTables) - .where( - IcebergTables.catalog_name == self.name, - IcebergTables.table_namespace == database_name, - IcebergTables.table_name == table_name, - IcebergTables.metadata_location == current_table.metadata_location, - ) - .values(metadata_location=new_metadata_location, previous_metadata_location=current_table.metadata_location) - ) - result = session.execute(stmt) - if result.rowcount < 1: - raise CommitFailedException( - "Commit was unsuccessful as a conflicting concurrent commit was made to the database." + if self.engine.dialect.supports_sane_rowcount: + stmt = ( + update(IcebergTables) + .where( + IcebergTables.catalog_name == self.name, + IcebergTables.table_namespace == database_name, + IcebergTables.table_name == table_name, + IcebergTables.metadata_location == current_table.metadata_location, + ) + .values(metadata_location=new_metadata_location, previous_metadata_location=current_table.metadata_location) ) + result = session.execute(stmt) + if result.rowcount < 1: + raise CommitFailedException( + "Commit was unsuccessful as a conflicting concurrent commit was made to the database." + ) + else: + try: + tbl = ( + session.query(IcebergTables) + .with_for_update(of=IcebergTables, nowait=True) + .filter( + IcebergTables.catalog_name == self.name, + IcebergTables.table_namespace == database_name, + IcebergTables.table_name == table_name, + IcebergTables.metadata_location == current_table.metadata_location, + ) + .one() + ) + tbl.metadata_location = new_metadata_location + tbl.previous_metadata_location = current_table.metadata_location + except NoResultFound as e: + raise CommitFailedException( + "Commit was unsuccessful as a conflicting concurrent commit was made to the database." + ) from e session.commit() return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py index c58e755a8c..d10a719376 100644 --- a/tests/catalog/test_sql.py +++ b/tests/catalog/test_sql.py @@ -88,6 +88,20 @@ def catalog_sqlite(warehouse: Path) -> Generator[SqlCatalog, None, None]: catalog.destroy_tables() +@pytest.fixture(scope="module") +def catalog_sqlite_without_rowcount(warehouse: Path) -> Generator[SqlCatalog, None, None]: + props = { + "uri": "sqlite:////tmp/sql-catalog.db", + "warehouse": f"file://{warehouse}", + } + catalog = SqlCatalog("test_sql_catalog", **props) + catalog.engine.dialect.supports_sane_rowcount = False + print(f"DEBUG: {catalog.engine.dialect.supports_sane_rowcount=}") + catalog.create_tables() + yield catalog + catalog.destroy_tables() + + def test_creation_with_no_uri() -> None: with pytest.raises(NoSuchPropertyException): SqlCatalog("test_ddb_catalog", not_uri="unused") @@ -306,6 +320,7 @@ def test_load_table_from_self_identifier(catalog: SqlCatalog, table_schema_neste [ lazy_fixture('catalog_memory'), lazy_fixture('catalog_sqlite'), + lazy_fixture('catalog_sqlite_without_rowcount'), ], ) def test_drop_table(catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: Identifier) -> None: @@ -323,6 +338,7 @@ def test_drop_table(catalog: SqlCatalog, table_schema_nested: Schema, random_ide [ lazy_fixture('catalog_memory'), lazy_fixture('catalog_sqlite'), + lazy_fixture('catalog_sqlite_without_rowcount'), ], ) def test_drop_table_from_self_identifier(catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: Identifier) -> None: @@ -342,6 +358,7 @@ def test_drop_table_from_self_identifier(catalog: SqlCatalog, table_schema_neste [ lazy_fixture('catalog_memory'), lazy_fixture('catalog_sqlite'), + lazy_fixture('catalog_sqlite_without_rowcount'), ], ) def test_drop_table_that_does_not_exist(catalog: SqlCatalog, random_identifier: Identifier) -> None: @@ -354,6 +371,7 @@ def test_drop_table_that_does_not_exist(catalog: SqlCatalog, random_identifier: [ lazy_fixture('catalog_memory'), lazy_fixture('catalog_sqlite'), + lazy_fixture('catalog_sqlite_without_rowcount'), ], ) def test_rename_table( @@ -378,6 +396,7 @@ def test_rename_table( [ lazy_fixture('catalog_memory'), lazy_fixture('catalog_sqlite'), + lazy_fixture('catalog_sqlite_without_rowcount'), ], ) def test_rename_table_from_self_identifier( @@ -404,6 +423,7 @@ def test_rename_table_from_self_identifier( [ lazy_fixture('catalog_memory'), lazy_fixture('catalog_sqlite'), + lazy_fixture('catalog_sqlite_without_rowcount'), ], ) def test_rename_table_to_existing_one( @@ -426,6 +446,7 @@ def test_rename_table_to_existing_one( [ lazy_fixture('catalog_memory'), lazy_fixture('catalog_sqlite'), + lazy_fixture('catalog_sqlite_without_rowcount'), ], ) def test_rename_missing_table(catalog: SqlCatalog, random_identifier: Identifier, another_random_identifier: Identifier) -> None: @@ -440,6 +461,7 @@ def test_rename_missing_table(catalog: SqlCatalog, random_identifier: Identifier [ lazy_fixture('catalog_memory'), lazy_fixture('catalog_sqlite'), + lazy_fixture('catalog_sqlite_without_rowcount'), ], ) def test_rename_table_to_missing_namespace( @@ -672,6 +694,7 @@ def test_update_namespace_properties(catalog: SqlCatalog, database_name: str) -> [ lazy_fixture('catalog_memory'), lazy_fixture('catalog_sqlite'), + lazy_fixture('catalog_sqlite_without_rowcount'), ], ) def test_commit_table(catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: Identifier) -> None: From d79884f12669221454e3caa746d6f7a14c1929af Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Sat, 13 Jan 2024 23:09:23 -0500 Subject: [PATCH 5/7] remove stray print --- tests/catalog/test_sql.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py index d10a719376..8bf921aa4d 100644 --- a/tests/catalog/test_sql.py +++ b/tests/catalog/test_sql.py @@ -96,7 +96,6 @@ def catalog_sqlite_without_rowcount(warehouse: Path) -> Generator[SqlCatalog, No } catalog = SqlCatalog("test_sql_catalog", **props) catalog.engine.dialect.supports_sane_rowcount = False - print(f"DEBUG: {catalog.engine.dialect.supports_sane_rowcount=}") catalog.create_tables() yield catalog catalog.destroy_tables() From 6516b57f21f7392aa3222f95b3060ea54328e13d Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Sun, 14 Jan 2024 15:38:10 +0000 Subject: [PATCH 6/7] wait --- pyiceberg/catalog/sql.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index 7c19de83e0..9f0241e6a0 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -283,7 +283,7 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None: try: tbl = ( session.query(IcebergTables) - .with_for_update(of=IcebergTables, nowait=True) + .with_for_update(of=IcebergTables) .filter( IcebergTables.catalog_name == self.name, IcebergTables.table_namespace == database_name, @@ -335,7 +335,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U try: tbl = ( session.query(IcebergTables) - .with_for_update(of=IcebergTables, nowait=True) + .with_for_update(of=IcebergTables) .filter( IcebergTables.catalog_name == self.name, IcebergTables.table_namespace == from_database_name, @@ -405,7 +405,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons try: tbl = ( session.query(IcebergTables) - .with_for_update(of=IcebergTables, nowait=True) + .with_for_update(of=IcebergTables) .filter( IcebergTables.catalog_name == self.name, IcebergTables.table_namespace == database_name, From 90516cffdfcaafefb02b1ba34fc5736935d19046 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Tue, 16 Jan 2024 20:18:39 +0000 Subject: [PATCH 7/7] better logging --- pyiceberg/catalog/sql.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index 9f0241e6a0..593c6b54a1 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -398,9 +398,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons ) result = session.execute(stmt) if result.rowcount < 1: - raise CommitFailedException( - "Commit was unsuccessful as a conflicting concurrent commit was made to the database." - ) + raise CommitFailedException(f"Table has been updated by another process: {database_name}.{table_name}") else: try: tbl = ( @@ -417,9 +415,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons tbl.metadata_location = new_metadata_location tbl.previous_metadata_location = current_table.metadata_location except NoResultFound as e: - raise CommitFailedException( - "Commit was unsuccessful as a conflicting concurrent commit was made to the database." - ) from e + raise CommitFailedException(f"Table has been updated by another process: {database_name}.{table_name}") from e session.commit() return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)