Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ github:
collaborators: # Note: the number of collaborators is limited to 10
- ajantha-bhat
- syun64
- kevinjqliu
ghp_branch: gh-pages
ghp_path: /

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/python-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ jobs:
if: startsWith(matrix.os, 'ubuntu')

- name: Build wheels
uses: pypa/cibuildwheel@v2.17.0
uses: pypa/cibuildwheel@v2.18.1
with:
output-dir: wheelhouse
config-file: "pyproject.toml"
Expand Down
6 changes: 3 additions & 3 deletions mkdocs/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
# under the License.

mkdocs==1.6.0
griffe==0.44.0
griffe==0.45.0
jinja2==3.1.4
mkdocstrings==0.25.1
mkdocstrings-python==1.10.0
mkdocstrings-python==1.10.2
mkdocs-literate-nav==0.6.1
mkdocs-autorefs==1.0.1
mkdocs-gen-files==0.5.0
mkdocs-material==9.5.21
mkdocs-material==9.5.24
mkdocs-material-extensions==1.3.1
mkdocs-section-index==0.3.9
144 changes: 72 additions & 72 deletions poetry.lock

Large diffs are not rendered by default.

20 changes: 19 additions & 1 deletion pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@
cast,
)

from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError, NotInstalledError, TableAlreadyExistsError
from pyiceberg.exceptions import (
NamespaceAlreadyExistsError,
NoSuchNamespaceError,
NoSuchTableError,
NotInstalledError,
TableAlreadyExistsError,
)
from pyiceberg.io import FileIO, load_file_io
from pyiceberg.manifest import ManifestFile
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
Expand Down Expand Up @@ -477,6 +483,18 @@ def create_namespace(self, namespace: Union[str, Identifier], properties: Proper
NamespaceAlreadyExistsError: If a namespace with the given name already exists.
"""

def create_namespace_if_not_exists(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None:
"""Create a namespace if it does not exist.

Args:
namespace (str | Identifier): Namespace identifier.
properties (Properties): A string dictionary of properties for the given namespace.
"""
try:
self.create_namespace(namespace, properties)
except NamespaceAlreadyExistsError:
pass

@abstractmethod
def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
"""Drop a namespace.
Expand Down
61 changes: 59 additions & 2 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
import getpass
import logging
import socket
import time
from types import TracebackType
Expand All @@ -33,6 +34,7 @@
from hive_metastore.ThriftHiveMetastore import Client
from hive_metastore.ttypes import (
AlreadyExistsException,
CheckLockRequest,
FieldSchema,
InvalidOperationException,
LockComponent,
Expand All @@ -49,6 +51,7 @@
)
from hive_metastore.ttypes import Database as HiveDatabase
from hive_metastore.ttypes import Table as HiveTable
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential
from thrift.protocol import TBinaryProtocol
from thrift.transport import TSocket, TTransport

Expand All @@ -69,12 +72,20 @@
NoSuchNamespaceError,
NoSuchTableError,
TableAlreadyExistsError,
WaitingForLockException,
)
from pyiceberg.io import FileIO, load_file_io
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableRequest, CommitTableResponse, PropertyUtil, Table, TableProperties, update_table_metadata
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
PropertyUtil,
Table,
TableProperties,
update_table_metadata,
)
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
Expand Down Expand Up @@ -111,6 +122,15 @@
HIVE2_COMPATIBLE = "hive.hive2-compatible"
HIVE2_COMPATIBLE_DEFAULT = False

LOCK_CHECK_MIN_WAIT_TIME = "lock-check-min-wait-time"
LOCK_CHECK_MAX_WAIT_TIME = "lock-check-max-wait-time"
LOCK_CHECK_RETRIES = "lock-check-retries"
DEFAULT_LOCK_CHECK_MIN_WAIT_TIME = 0.1 # 100 milliseconds
DEFAULT_LOCK_CHECK_MAX_WAIT_TIME = 60 # 1 min
DEFAULT_LOCK_CHECK_RETRIES = 4

logger = logging.getLogger(__name__)


class _HiveClient:
"""Helper class to nicely open and close the transport."""
Expand Down Expand Up @@ -240,6 +260,18 @@ def __init__(self, name: str, **properties: str):
super().__init__(name, **properties)
self._client = _HiveClient(properties["uri"], properties.get("ugi"))

self._lock_check_min_wait_time = PropertyUtil.property_as_float(
properties, LOCK_CHECK_MIN_WAIT_TIME, DEFAULT_LOCK_CHECK_MIN_WAIT_TIME
)
self._lock_check_max_wait_time = PropertyUtil.property_as_float(
properties, LOCK_CHECK_MAX_WAIT_TIME, DEFAULT_LOCK_CHECK_MAX_WAIT_TIME
)
self._lock_check_retries = PropertyUtil.property_as_float(
properties,
LOCK_CHECK_RETRIES,
DEFAULT_LOCK_CHECK_RETRIES,
)

def _convert_hive_into_iceberg(self, table: HiveTable, io: FileIO) -> Table:
properties: Dict[str, str] = table.parameters
if TABLE_TYPE not in properties:
Expand Down Expand Up @@ -356,6 +388,26 @@ def _create_lock_request(self, database_name: str, table_name: str) -> LockReque

return lock_request

def _wait_for_lock(self, database_name: str, table_name: str, lockid: int, open_client: Client) -> LockResponse:
@retry(
retry=retry_if_exception_type(WaitingForLockException),
wait=wait_exponential(multiplier=2, min=self._lock_check_min_wait_time, max=self._lock_check_max_wait_time),
stop=stop_after_attempt(self._lock_check_retries),
reraise=True,
)
def _do_wait_for_lock() -> LockResponse:
response: LockResponse = open_client.check_lock(CheckLockRequest(lockid=lockid))
if response.state == LockState.ACQUIRED:
return response
elif response.state == LockState.WAITING:
msg = f"Wait on lock for {database_name}.{table_name}"
logger.warning(msg)
raise WaitingForLockException(msg)
else:
raise CommitFailedException(f"Failed to check lock for {database_name}.{table_name}, state: {response.state}")

return _do_wait_for_lock()

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update the table.

Expand All @@ -380,7 +432,10 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons

try:
if lock.state != LockState.ACQUIRED:
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}")
if lock.state == LockState.WAITING:
self._wait_for_lock(database_name, table_name, lock.lockid, open_client)
else:
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}")

hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name)
io = load_file_io({**self.properties, **hive_table.parameters}, hive_table.sd.location)
Expand All @@ -406,6 +461,8 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=hive_table)
except NoSuchObjectException as e:
raise NoSuchTableError(f"Table does not exist: {table_name}") from e
except WaitingForLockException as e:
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}") from e
finally:
open_client.unlock(UnlockRequest(lockid=lock.lockid))

Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/catalog/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ def create_namespace(self, namespace: Union[str, Identifier], properties: Proper
try:
response.raise_for_status()
except HTTPError as exc:
self._handle_non_200_response(exc, {404: NoSuchNamespaceError, 409: NamespaceAlreadyExistsError})
self._handle_non_200_response(exc, {409: NamespaceAlreadyExistsError})

@retry(**_RETRY_ARGS)
def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
Expand Down
4 changes: 4 additions & 0 deletions pyiceberg/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,7 @@ class CommitFailedException(Exception):

class CommitStateUnknownException(RESTError):
"""Commit failed due to unknown reason."""


class WaitingForLockException(Exception):
"""Need to wait for a lock, try again."""
10 changes: 10 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,16 @@ def property_as_int(properties: Dict[str, str], property_name: str, default: Opt
else:
return default

@staticmethod
def property_as_float(properties: Dict[str, str], property_name: str, default: Optional[float] = None) -> Optional[float]:
if value := properties.get(property_name):
try:
return float(value)
except ValueError as e:
raise ValueError(f"Could not parse table property {property_name} to a float: {value}") from e
else:
return default

@staticmethod
def property_as_bool(properties: Dict[str, str], property_name: str, default: bool) -> bool:
if value := properties.get(property_name):
Expand Down
Loading