diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index 3f59a196ea..c7a19e67e4 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -18,7 +18,9 @@ from typing import ( TYPE_CHECKING, Any, + Callable, Dict, + Iterator, List, Optional, Set, @@ -27,7 +29,7 @@ ) from pydantic import Field, field_validator -from requests import HTTPError, Session +from requests import HTTPError, Response, Session from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt from pyiceberg import __version__ @@ -184,6 +186,7 @@ class ConfigResponse(IcebergBaseModel): class ListNamespaceResponse(IcebergBaseModel): namespaces: List[Identifier] = Field() + next_page_token: Optional[str] = Field(alias="next-page-token", default=None) class NamespaceResponse(IcebergBaseModel): @@ -209,10 +212,12 @@ class ListViewResponseEntry(IcebergBaseModel): class ListTablesResponse(IcebergBaseModel): identifiers: List[ListTableResponseEntry] = Field() + next_page_token: Optional[str] = Field(alias="next-page-token", default=None) class ListViewsResponse(IcebergBaseModel): identifiers: List[ListViewResponseEntry] = Field() + next_page_token: Optional[str] = Field(alias="next-page-token", default=None) class RestCatalog(Catalog): @@ -470,6 +475,26 @@ def _config_headers(self, session: Session) -> None: session.headers["User-Agent"] = f"PyIceberg/{__version__}" session.headers.setdefault("X-Iceberg-Access-Delegation", ACCESS_DELEGATION_DEFAULT) + @retry(**_RETRY_ARGS) + def _request_with_retries( + self, + method: Callable[..., Response], + url: str, + headers: Optional[Dict[str, Any]] = None, + json: Any = None, + params: Optional[Dict[str, Any]] = None, + data: Any = None, + non_200_response_mapping: Optional[dict[int, type[Exception]]] = None, + ) -> Response: + response = method(url, headers=headers, json=json, params=params, data=data) + try: + response.raise_for_status() + except HTTPError as exc: + if non_200_response_mapping is not None: + _handle_non_200_response(exc, non_200_response_mapping) + + return response + def _create_table( self, identifier: Union[str, Identifier], @@ -498,17 +523,14 @@ def _create_table( properties=properties, ) serialized_json = request.model_dump_json().encode(UTF8) - response = self._session.post( + response = self._request_with_retries( + self._session.post, self.url(Endpoints.create_table, namespace=namespace_and_table["namespace"]), data=serialized_json, + non_200_response_mapping={409: TableAlreadyExistsError}, ) - try: - response.raise_for_status() - except HTTPError as exc: - _handle_non_200_response(exc, {409: TableAlreadyExistsError}) return TableResponse.model_validate_json(response.text) - @retry(**_RETRY_ARGS) def create_table( self, identifier: Union[str, Identifier], @@ -529,7 +551,6 @@ def create_table( ) return self._response_to_table(self.identifier_to_tuple(identifier), table_response) - @retry(**_RETRY_ARGS) def create_table_transaction( self, identifier: Union[str, Identifier], @@ -551,7 +572,6 @@ def create_table_transaction( staged_table = self._response_to_staged_table(self.identifier_to_tuple(identifier), table_response) return CreateTableTransaction(staged_table) - @retry(**_RETRY_ARGS) def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: """Register a new table using existing metadata. @@ -571,30 +591,46 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location: metadata_location=metadata_location, ) serialized_json = request.model_dump_json().encode(UTF8) - response = self._session.post( + response = self._request_with_retries( + self._session.post, self.url(Endpoints.register_table, namespace=namespace_and_table["namespace"]), data=serialized_json, + non_200_response_mapping={409: TableAlreadyExistsError}, ) - try: - response.raise_for_status() - except HTTPError as exc: - _handle_non_200_response(exc, {409: TableAlreadyExistsError}) table_response = TableResponse.model_validate_json(response.text) return self._response_to_table(self.identifier_to_tuple(identifier), table_response) - @retry(**_RETRY_ARGS) def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: + return list(self.list_tables_lazy(namespace)) + + def list_tables_lazy(self, namespace: Union[str, Identifier], page_size: Optional[int] = 100) -> Iterator[Identifier]: namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple) - response = self._session.get(self.url(Endpoints.list_tables, namespace=namespace_concat)) - try: - response.raise_for_status() - except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchNamespaceError}) - return [(*table.namespace, table.name) for table in ListTablesResponse.model_validate_json(response.text).identifiers] - @retry(**_RETRY_ARGS) + next_page_token: Optional[str] = None + + while True: + params: Dict[str, Any] = {} + if next_page_token is not None: + params["pageToken"] = next_page_token + if page_size is not None: + params["pageSize"] = page_size + + response = self._request_with_retries( + self._session.get, + self.url(Endpoints.list_tables, namespace=namespace_concat), + params=params, + non_200_response_mapping={404: NoSuchNamespaceError}, + ) + parsed = ListTablesResponse.model_validate_json(response.text) + for table in parsed.identifiers: + yield (*table.namespace, table.name) + + next_page_token = parsed.next_page_token + if next_page_token is None: + break + def load_table(self, identifier: Union[str, Identifier]) -> Table: params = {} if mode := self.properties.get(SNAPSHOT_LOADING_MODE): @@ -603,43 +639,38 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table: else: raise ValueError("Invalid snapshot-loading-mode: {}") - response = self._session.get( - self.url(Endpoints.load_table, prefixed=True, **self._split_identifier_for_path(identifier)), params=params + response = self._request_with_retries( + self._session.get, + self.url(Endpoints.load_table, prefixed=True, **self._split_identifier_for_path(identifier)), + params=params, + non_200_response_mapping={404: NoSuchTableError}, ) - try: - response.raise_for_status() - except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchTableError}) table_response = TableResponse.model_validate_json(response.text) return self._response_to_table(self.identifier_to_tuple(identifier), table_response) - @retry(**_RETRY_ARGS) def drop_table(self, identifier: Union[str, Identifier], purge_requested: bool = False) -> None: - response = self._session.delete( + self._request_with_retries( + self._session.delete, self.url(Endpoints.drop_table, prefixed=True, **self._split_identifier_for_path(identifier)), params={"purgeRequested": purge_requested}, + non_200_response_mapping={404: NoSuchTableError}, ) - try: - response.raise_for_status() - except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchTableError}) - @retry(**_RETRY_ARGS) def purge_table(self, identifier: Union[str, Identifier]) -> None: self.drop_table(identifier=identifier, purge_requested=True) - @retry(**_RETRY_ARGS) def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: payload = { "source": self._split_identifier_for_json(from_identifier), "destination": self._split_identifier_for_json(to_identifier), } - response = self._session.post(self.url(Endpoints.rename_table), json=payload) - try: - response.raise_for_status() - except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchTableError, 409: TableAlreadyExistsError}) + self._request_with_retries( + self._session.post, + self.url(Endpoints.rename_table), + json=payload, + non_200_response_mapping={404: NoSuchTableError, 409: TableAlreadyExistsError}, + ) return self.load_table(to_identifier) @@ -654,18 +685,37 @@ def _remove_catalog_name_from_table_request_identifier(self, table_request: Comm ) return table_request - @retry(**_RETRY_ARGS) def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: + return list(self.list_views_lazy(namespace)) + + def list_views_lazy(self, namespace: Union[str, Identifier], page_size: Optional[int] = 100) -> Iterator[Identifier]: namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple) - response = self._session.get(self.url(Endpoints.list_views, namespace=namespace_concat)) - try: - response.raise_for_status() - except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchNamespaceError}) - return [(*view.namespace, view.name) for view in ListViewsResponse.model_validate_json(response.text).identifiers] - @retry(**_RETRY_ARGS) + next_page_token: Optional[str] = None + + while True: + params: Dict[str, Any] = {} + if next_page_token is not None: + params["pageToken"] = next_page_token + if page_size is not None: + params["pageSize"] = page_size + + response = self._request_with_retries( + self._session.get, + self.url(Endpoints.list_views, namespace=namespace_concat), + params=params, + non_200_response_mapping={404: NoSuchNamespaceError}, + ) + + parsed = ListViewsResponse.model_validate_json(response.text) + for view in parsed.identifiers: + yield (*view.namespace, view.name) + + next_page_token = parsed.next_page_token + if next_page_token is None: + break + def commit_table( self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] ) -> CommitTableResponse: @@ -692,86 +742,98 @@ def commit_table( if table_token := table.config.get(TOKEN): headers[AUTHORIZATION_HEADER] = f"{BEARER_PREFIX} {table_token}" - response = self._session.post( + response = self._request_with_retries( + self._session.post, self.url(Endpoints.update_table, prefixed=True, **self._split_identifier_for_path(table_request.identifier)), data=table_request.model_dump_json().encode(UTF8), headers=headers, + non_200_response_mapping={ + 409: CommitFailedException, + 500: CommitStateUnknownException, + 502: CommitStateUnknownException, + 504: CommitStateUnknownException, + }, ) - try: - response.raise_for_status() - except HTTPError as exc: - _handle_non_200_response( - exc, - { - 409: CommitFailedException, - 500: CommitStateUnknownException, - 502: CommitStateUnknownException, - 504: CommitStateUnknownException, - }, - ) return CommitTableResponse.model_validate_json(response.text) - @retry(**_RETRY_ARGS) def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: namespace_tuple = self._check_valid_namespace_identifier(namespace) payload = {"namespace": namespace_tuple, "properties": properties} - response = self._session.post(self.url(Endpoints.create_namespace), json=payload) - try: - response.raise_for_status() - except HTTPError as exc: - _handle_non_200_response(exc, {409: NamespaceAlreadyExistsError}) + self._request_with_retries( + self._session.post, + self.url(Endpoints.create_namespace), + json=payload, + non_200_response_mapping={409: NamespaceAlreadyExistsError}, + ) - @retry(**_RETRY_ARGS) def drop_namespace(self, namespace: Union[str, Identifier]) -> None: namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace = NAMESPACE_SEPARATOR.join(namespace_tuple) - response = self._session.delete(self.url(Endpoints.drop_namespace, namespace=namespace)) - try: - response.raise_for_status() - except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchNamespaceError, 409: NamespaceNotEmptyError}) + self._request_with_retries( + self._session.delete, + self.url(Endpoints.drop_namespace, namespace=namespace), + non_200_response_mapping={404: NoSuchNamespaceError, 409: NamespaceNotEmptyError}, + ) - @retry(**_RETRY_ARGS) def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: + return list(self.list_namespaces_lazy(namespace)) + + def list_namespaces_lazy( + self, namespace: Union[str, Identifier] = (), page_size: Optional[int] = 100 + ) -> Iterator[Identifier]: namespace_tuple = self.identifier_to_tuple(namespace) - response = self._session.get( - self.url( - f"{Endpoints.list_namespaces}?parent={NAMESPACE_SEPARATOR.join(namespace_tuple)}" - if namespace_tuple - else Endpoints.list_namespaces - ), - ) - try: - response.raise_for_status() - except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchNamespaceError}) - return ListNamespaceResponse.model_validate_json(response.text).namespaces + next_page_token: Optional[str] = None + + while True: + params: Dict[str, Any] = {} + if next_page_token is not None: + params["pageToken"] = next_page_token + if page_size is not None: + params["pageSize"] = page_size + + response = self._request_with_retries( + self._session.get, + self.url( + f"{Endpoints.list_namespaces}?parent={NAMESPACE_SEPARATOR.join(namespace_tuple)}" + if namespace_tuple + else Endpoints.list_namespaces + ), + params=params, + non_200_response_mapping={404: NoSuchNamespaceError}, + ) + + parsed = ListNamespaceResponse.model_validate_json(response.text) + yield from parsed.namespaces + + next_page_token = parsed.next_page_token + if next_page_token is None: + break - @retry(**_RETRY_ARGS) def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace = NAMESPACE_SEPARATOR.join(namespace_tuple) - response = self._session.get(self.url(Endpoints.load_namespace_metadata, namespace=namespace)) - try: - response.raise_for_status() - except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchNamespaceError}) + response = self._request_with_retries( + self._session.get, + self.url(Endpoints.load_namespace_metadata, namespace=namespace), + non_200_response_mapping={404: NoSuchNamespaceError}, + ) return NamespaceResponse.model_validate_json(response.text).properties - @retry(**_RETRY_ARGS) def update_namespace_properties( self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT ) -> PropertiesUpdateSummary: namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace = NAMESPACE_SEPARATOR.join(namespace_tuple) payload = {"removals": list(removals or []), "updates": updates} - response = self._session.post(self.url(Endpoints.update_namespace_properties, namespace=namespace), json=payload) - try: - response.raise_for_status() - except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchNamespaceError}) + response = self._request_with_retries( + self._session.post, + self.url(Endpoints.update_namespace_properties, namespace=namespace), + json=payload, + non_200_response_mapping={404: NoSuchNamespaceError}, + ) + parsed_response = UpdateNamespacePropertiesResponse.model_validate_json(response.text) return PropertiesUpdateSummary( removed=parsed_response.removed, @@ -779,11 +841,10 @@ def update_namespace_properties( missing=parsed_response.missing, ) - @retry(**_RETRY_ARGS) def namespace_exists(self, namespace: Union[str, Identifier]) -> bool: namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace = NAMESPACE_SEPARATOR.join(namespace_tuple) - response = self._session.head(self.url(Endpoints.namespace_exists, namespace=namespace)) + response = self._request_with_retries(self._session.head, self.url(Endpoints.namespace_exists, namespace=namespace)) if response.status_code == 404: return False @@ -797,7 +858,6 @@ def namespace_exists(self, namespace: Union[str, Identifier]) -> bool: return False - @retry(**_RETRY_ARGS) def table_exists(self, identifier: Union[str, Identifier]) -> bool: """Check if a table exists. @@ -823,7 +883,6 @@ def table_exists(self, identifier: Union[str, Identifier]) -> bool: return False - @retry(**_RETRY_ARGS) def view_exists(self, identifier: Union[str, Identifier]) -> bool: """Check if a view exists. @@ -833,7 +892,8 @@ def view_exists(self, identifier: Union[str, Identifier]) -> bool: Returns: bool: True if the view exists, False otherwise. """ - response = self._session.head( + response = self._request_with_retries( + self._session.head, self.url(Endpoints.view_exists, prefixed=True, **self._split_identifier_for_path(identifier, IdentifierKind.VIEW)), ) if response.status_code == 404: @@ -841,19 +901,15 @@ def view_exists(self, identifier: Union[str, Identifier]) -> bool: elif response.status_code in [200, 204]: return True - try: - response.raise_for_status() - except HTTPError as exc: - _handle_non_200_response(exc, {}) - return False - @retry(**_RETRY_ARGS) - def drop_view(self, identifier: Union[str]) -> None: - response = self._session.delete( - self.url(Endpoints.drop_view, prefixed=True, **self._split_identifier_for_path(identifier, IdentifierKind.VIEW)), + def drop_view(self, identifier: Union[str, Identifier]) -> None: + self._request_with_retries( + self._session.delete, + self.url( + Endpoints.drop_view, + prefixed=True, + **self._split_identifier_for_path(identifier, IdentifierKind.VIEW), + ), + non_200_response_mapping={404: NoSuchViewError}, ) - try: - response.raise_for_status() - except HTTPError as exc: - _handle_non_200_response(exc, {404: NoSuchViewError}) diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index ed91dd15a1..492680851b 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -412,6 +412,28 @@ def test_list_tables_200(rest_mock: Mocker) -> None: assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_tables(namespace) == [("examples", "fooshare")] +def test_list_tables_paginated_200(rest_mock: Mocker) -> None: + namespace = "examples" + rest_mock.get( + f"{TEST_URI}v1/namespaces/{namespace}/tables", + json={"identifiers": [{"namespace": ["examples"], "name": "fooshare"}], "next-page-token": "page2"}, + status_code=200, + request_headers=TEST_HEADERS, + ) + rest_mock.get( + f"{TEST_URI}v1/namespaces/{namespace}/tables?pageToken=page2", + json={"identifiers": [{"namespace": ["examples"], "name": "fooshare2"}]}, + status_code=200, + request_headers=TEST_HEADERS, + ) + + assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_tables(namespace) == [ + ("examples", "fooshare"), + ("examples", "fooshare2"), + ] + assert rest_mock.call_count == 3 + + def test_list_tables_200_sigv4(rest_mock: Mocker) -> None: namespace = "examples" rest_mock.get( @@ -458,6 +480,30 @@ def test_list_views_200(rest_mock: Mocker) -> None: assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace) == [("examples", "fooshare")] +def test_list_views_paginated_200(rest_mock: Mocker) -> None: + namespace = "examples" + rest_mock.get( + f"{TEST_URI}v1/namespaces/{namespace}/views", + json={"identifiers": [{"namespace": ["examples"], "name": "fooshare"}], "next-page-token": "page2"}, + status_code=200, + request_headers=TEST_HEADERS, + ) + + rest_mock.get( + f"{TEST_URI}v1/namespaces/{namespace}/views?pageToken=page2", + json={"identifiers": [{"namespace": ["examples"], "name": "fooshare2"}]}, + status_code=200, + request_headers=TEST_HEADERS, + ) + + assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace) == [ + ("examples", "fooshare"), + ("examples", "fooshare2"), + ] + + assert rest_mock.call_count == 3 + + def test_list_views_200_sigv4(rest_mock: Mocker) -> None: namespace = "examples" rest_mock.get( @@ -543,6 +589,33 @@ def test_list_namespaces_200(rest_mock: Mocker) -> None: ] +def test_list_namespaces_paginated_200(rest_mock: Mocker) -> None: + rest_mock.get( + f"{TEST_URI}v1/namespaces", + json={"namespaces": [["default"], ["examples"], ["fokko"], ["system"]], "next-page-token": "page2"}, + status_code=200, + request_headers=TEST_HEADERS, + ) + rest_mock.get( + f"{TEST_URI}v1/namespaces?pageToken=page2", + json={"namespaces": [["default2"], ["examples2"], ["fokko2"], ["system2"]]}, + status_code=200, + request_headers=TEST_HEADERS, + ) + assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces() == [ + ("default",), + ("examples",), + ("fokko",), + ("system",), + ("default2",), + ("examples2",), + ("fokko2",), + ("system2",), + ] + + assert rest_mock.call_count == 3 + + def test_list_namespace_with_parent_200(rest_mock: Mocker) -> None: rest_mock.get( f"{TEST_URI}v1/namespaces?parent=accounting",