Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,21 @@ def drop_view(self, identifier: Union[str, Identifier]) -> None:
NoSuchViewError: If a view with the given name does not exist.
"""

@abstractmethod
def rename_view(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> None:
"""Rename a fully classified view name.

Args:
from_identifier (str | Identifier): Existing view identifier.
to_identifier (str | Identifier): New view identifier.

Returns:
Table: the updated table instance with its metadata.

Raises:
NoSuchViewError: If a table with the name does not exist.
"""

@staticmethod
def identifier_to_tuple(identifier: Union[str, Identifier]) -> Identifier:
"""Parse an identifier to a tuple.
Expand Down
3 changes: 3 additions & 0 deletions pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,9 @@ def drop_view(self, identifier: Union[str, Identifier]) -> None:
def view_exists(self, identifier: Union[str, Identifier]) -> bool:
raise NotImplementedError

def rename_view(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> None:
raise NotImplementedError

def _get_iceberg_table_item(self, database_name: str, table_name: str) -> Dict[str, Any]:
try:
return self._get_dynamo_item(identifier=f"{database_name}.{table_name}", namespace=database_name)
Expand Down
3 changes: 3 additions & 0 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,9 @@ def drop_view(self, identifier: Union[str, Identifier]) -> None:
def view_exists(self, identifier: Union[str, Identifier]) -> bool:
raise NotImplementedError

def rename_view(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> None:
raise NotImplementedError

@staticmethod
def __is_iceberg_table(table: "TableTypeDef") -> bool:
return table.get("Parameters", {}).get(TABLE_TYPE, "").lower() == ICEBERG
Expand Down
3 changes: 3 additions & 0 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,9 @@ def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
def view_exists(self, identifier: Union[str, Identifier]) -> bool:
raise NotImplementedError

def rename_view(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> None:
raise NotImplementedError

def _create_lock_request(self, database_name: str, table_name: str) -> LockRequest:
lock_component: LockComponent = LockComponent(
level=LockLevel.TABLE, type=LockType.EXCLUSIVE, dbname=database_name, tablename=table_name, isTransactional=True
Expand Down
3 changes: 3 additions & 0 deletions pyiceberg/catalog/noop.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,6 @@ def view_exists(self, identifier: Union[str, Identifier]) -> bool:

def drop_view(self, identifier: Union[str, Identifier]) -> None:
raise NotImplementedError

def rename_view(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> None:
raise NotImplementedError
24 changes: 24 additions & 0 deletions pyiceberg/catalog/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
NoSuchViewError,
TableAlreadyExistsError,
UnauthorizedError,
ViewAlreadyExistsError,
)
from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec, assign_fresh_partition_spec_ids
Expand Down Expand Up @@ -101,6 +102,7 @@ class Endpoints:
list_views: str = "namespaces/{namespace}/views"
drop_view: str = "namespaces/{namespace}/views/{view}"
view_exists: str = "namespaces/{namespace}/views/{view}"
rename_view: str = "views/rename"


class IdentifierKind(Enum):
Expand Down Expand Up @@ -877,6 +879,28 @@ def drop_view(self, identifier: Union[str]) -> None:
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchViewError})

@retry(**_RETRY_ARGS)
def rename_view(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> None:
payload = {
"source": self._split_identifier_for_json(from_identifier),
"destination": self._split_identifier_for_json(to_identifier),
}

# Ensure source and destination namespaces exist before rename.
source_namespace = self._split_identifier_for_json(from_identifier)["namespace"]
dest_namespace = self._split_identifier_for_path(to_identifier)["namespace"]

if not self.namespace_exists(source_namespace):
raise NoSuchNamespaceError(f"Source namespace does not exist: {source_namespace}")
if not self.namespace_exists(dest_namespace):
raise NoSuchNamespaceError(f"Destination namespace does not exist: {dest_namespace}")

response = self._session.post(self.url(Endpoints.rename_view), json=payload)
try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {404: NoSuchViewError, 409: ViewAlreadyExistsError})

def close(self) -> None:
"""Close the catalog and release Session connection adapters.

Expand Down
3 changes: 3 additions & 0 deletions pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,3 +744,6 @@ def close(self) -> None:
"""
if hasattr(self, "engine"):
self.engine.dispose()

def rename_view(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> None:
raise NotImplementedError
4 changes: 4 additions & 0 deletions pyiceberg/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ class NoSuchViewError(Exception):
"""Raises when the view can't be found in the REST catalog."""


class ViewAlreadyExistsError(Exception):
"""Raises when the view being created already exists in the REST catalog."""


class NoSuchIdentifierError(Exception):
"""Raises when the identifier can't be found in the REST catalog."""

Expand Down
120 changes: 120 additions & 0 deletions tests/catalog/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
OAuthError,
ServerError,
TableAlreadyExistsError,
ViewAlreadyExistsError,
)
from pyiceberg.io import load_file_io
from pyiceberg.partitioning import PartitionField, PartitionSpec
Expand Down Expand Up @@ -1918,3 +1919,122 @@ def test_rest_catalog_context_manager_with_exception_sigv4(self, rest_mock: Mock

assert catalog is not None and hasattr(catalog, "_session")
assert len(catalog._session.adapters) == self.EXPECTED_ADAPTERS_SIGV4


def test_rename_view_204(rest_mock: Mocker) -> None:
from_identifier = ("some_namespace", "old_view")
to_identifier = ("some_namespace", "new_view")
rest_mock.head(
f"{TEST_URI}v1/namespaces/some_namespace",
status_code=200,
request_headers=TEST_HEADERS,
)
rest_mock.post(
f"{TEST_URI}v1/views/rename",
json={
"source": {"namespace": ["some_namespace"], "name": "old_view"},
"destination": {"namespace": ["some_namespace"], "name": "new_view"},
},
status_code=204,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
catalog.rename_view(from_identifier, to_identifier)
assert (
rest_mock.last_request.text
== '''{"source": {"namespace": ["some_namespace"], "name": "old_view"}, "destination": {"namespace": ["some_namespace"], "name": "new_view"}}'''
)


def test_rename_view_404(rest_mock: Mocker) -> None:
from_identifier = ("some_namespace", "non_existent_view")
to_identifier = ("some_namespace", "new_view")
rest_mock.head(
f"{TEST_URI}v1/namespaces/some_namespace",
status_code=200,
request_headers=TEST_HEADERS,
)
rest_mock.post(
f"{TEST_URI}v1/views/rename",
json={
"error": {
"message": "View does not exist: some_namespace.non_existent_view",
"type": "NoSuchViewException",
"code": 404,
}
},
status_code=404,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
with pytest.raises(NoSuchViewError) as exc_info:
catalog.rename_view(from_identifier, to_identifier)
assert "View does not exist: some_namespace.non_existent_view" in str(exc_info.value)


def test_rename_view_409(rest_mock: Mocker) -> None:
from_identifier = ("some_namespace", "old_view")
to_identifier = ("some_namespace", "existing_view")
rest_mock.head(
f"{TEST_URI}v1/namespaces/some_namespace",
status_code=200,
request_headers=TEST_HEADERS,
)
rest_mock.post(
f"{TEST_URI}v1/views/rename",
json={
"error": {
"message": "View already exists: some_namespace.existing_view",
"type": "ViewAlreadyExistsException",
"code": 409,
}
},
status_code=409,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
with pytest.raises(ViewAlreadyExistsError) as exc_info:
catalog.rename_view(from_identifier, to_identifier)
assert "View already exists: some_namespace.existing_view" in str(exc_info.value)


def test_rename_view_source_namespace_does_not_exist(rest_mock: Mocker) -> None:
from_identifier = ("non_existent_namespace", "old_view")
to_identifier = ("some_namespace", "new_view")

rest_mock.head(
f"{TEST_URI}v1/namespaces/non_existent_namespace",
status_code=404,
request_headers=TEST_HEADERS,
)
rest_mock.head(
f"{TEST_URI}v1/namespaces/some_namespace",
status_code=200,
request_headers=TEST_HEADERS,
)

catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
with pytest.raises(NoSuchNamespaceError) as exc_info:
catalog.rename_view(from_identifier, to_identifier)
assert "Source namespace does not exist: ('non_existent_namespace',)" in str(exc_info.value)


def test_rename_view_destination_namespace_does_not_exist(rest_mock: Mocker) -> None:
from_identifier = ("some_namespace", "old_view")
to_identifier = ("non_existent_namespace", "new_view")

rest_mock.head(
f"{TEST_URI}v1/namespaces/some_namespace",
status_code=200,
request_headers=TEST_HEADERS,
)
rest_mock.head(
f"{TEST_URI}v1/namespaces/non_existent_namespace",
status_code=404,
request_headers=TEST_HEADERS,
)

catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
with pytest.raises(NoSuchNamespaceError) as exc_info:
catalog.rename_view(from_identifier, to_identifier)
assert "Destination namespace does not exist: non_existent_namespace" in str(exc_info.value)
Loading