Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 59 additions & 2 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
import getpass
import logging
import socket
import time
from types import TracebackType
Expand All @@ -33,6 +34,7 @@
from hive_metastore.ThriftHiveMetastore import Client
from hive_metastore.ttypes import (
AlreadyExistsException,
CheckLockRequest,
FieldSchema,
InvalidOperationException,
LockComponent,
Expand All @@ -49,6 +51,7 @@
)
from hive_metastore.ttypes import Database as HiveDatabase
from hive_metastore.ttypes import Table as HiveTable
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential
from thrift.protocol import TBinaryProtocol
from thrift.transport import TSocket, TTransport

Expand All @@ -69,12 +72,20 @@
NoSuchNamespaceError,
NoSuchTableError,
TableAlreadyExistsError,
WaitingForLockException,
)
from pyiceberg.io import FileIO, load_file_io
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableRequest, CommitTableResponse, PropertyUtil, Table, TableProperties, update_table_metadata
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
PropertyUtil,
Table,
TableProperties,
update_table_metadata,
)
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
Expand Down Expand Up @@ -111,6 +122,15 @@
HIVE2_COMPATIBLE = "hive.hive2-compatible"
HIVE2_COMPATIBLE_DEFAULT = False

LOCK_CHECK_MIN_WAIT_TIME = "lock-check-min-wait-time"
LOCK_CHECK_MAX_WAIT_TIME = "lock-check-max-wait-time"
LOCK_CHECK_RETRIES = "lock-check-retries"
DEFAULT_LOCK_CHECK_MIN_WAIT_TIME = 0.1 # 100 milliseconds
DEFAULT_LOCK_CHECK_MAX_WAIT_TIME = 60 # 1 min
DEFAULT_LOCK_CHECK_RETRIES = 4

logger = logging.getLogger(__name__)


class _HiveClient:
"""Helper class to nicely open and close the transport."""
Expand Down Expand Up @@ -240,6 +260,18 @@ def __init__(self, name: str, **properties: str):
super().__init__(name, **properties)
self._client = _HiveClient(properties["uri"], properties.get("ugi"))

self._lock_check_min_wait_time = PropertyUtil.property_as_float(
properties, LOCK_CHECK_MIN_WAIT_TIME, DEFAULT_LOCK_CHECK_MIN_WAIT_TIME
)
self._lock_check_max_wait_time = PropertyUtil.property_as_float(
properties, LOCK_CHECK_MAX_WAIT_TIME, DEFAULT_LOCK_CHECK_MAX_WAIT_TIME
)
self._lock_check_retries = PropertyUtil.property_as_float(
properties,
LOCK_CHECK_RETRIES,
DEFAULT_LOCK_CHECK_RETRIES,
)

def _convert_hive_into_iceberg(self, table: HiveTable, io: FileIO) -> Table:
properties: Dict[str, str] = table.parameters
if TABLE_TYPE not in properties:
Expand Down Expand Up @@ -356,6 +388,26 @@ def _create_lock_request(self, database_name: str, table_name: str) -> LockReque

return lock_request

def _wait_for_lock(self, database_name: str, table_name: str, lockid: int, open_client: Client) -> LockResponse:
@retry(
retry=retry_if_exception_type(WaitingForLockException),
wait=wait_exponential(multiplier=2, min=self._lock_check_min_wait_time, max=self._lock_check_max_wait_time),
stop=stop_after_attempt(self._lock_check_retries),
reraise=True,
)
def _do_wait_for_lock() -> LockResponse:
response: LockResponse = open_client.check_lock(CheckLockRequest(lockid=lockid))
if response.state == LockState.ACQUIRED:
return response
elif response.state == LockState.WAITING:
msg = f"Wait on lock for {database_name}.{table_name}"
logger.warning(msg)
raise WaitingForLockException(msg)
else:
raise CommitFailedException(f"Failed to check lock for {database_name}.{table_name}, state: {response.state}")

return _do_wait_for_lock()

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update the table.

Expand All @@ -380,7 +432,10 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons

try:
if lock.state != LockState.ACQUIRED:
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}")
if lock.state == LockState.WAITING:
self._wait_for_lock(database_name, table_name, lock.lockid, open_client)
else:
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}")

hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name)
io = load_file_io({**self.properties, **hive_table.parameters}, hive_table.sd.location)
Expand All @@ -406,6 +461,8 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=hive_table)
except NoSuchObjectException as e:
raise NoSuchTableError(f"Table does not exist: {table_name}") from e
except WaitingForLockException as e:
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}") from e
finally:
open_client.unlock(UnlockRequest(lockid=lock.lockid))

Expand Down
4 changes: 4 additions & 0 deletions pyiceberg/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,7 @@ class CommitFailedException(Exception):

class CommitStateUnknownException(RESTError):
"""Commit failed due to unknown reason."""


class WaitingForLockException(Exception):
"""Need to wait for a lock, try again."""
10 changes: 10 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,16 @@ def property_as_int(properties: Dict[str, str], property_name: str, default: Opt
else:
return default

@staticmethod
def property_as_float(properties: Dict[str, str], property_name: str, default: Optional[float] = None) -> Optional[float]:
if value := properties.get(property_name):
try:
return float(value)
except ValueError as e:
raise ValueError(f"Could not parse table property {property_name} to a float: {value}") from e
else:
return default

@staticmethod
def property_as_bool(properties: Dict[str, str], property_name: str, default: bool) -> bool:
if value := properties.get(property_name):
Expand Down
39 changes: 38 additions & 1 deletion tests/catalog/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
AlreadyExistsException,
FieldSchema,
InvalidOperationException,
LockResponse,
LockState,
MetaException,
NoSuchObjectException,
SerDeInfo,
Expand All @@ -34,12 +36,19 @@
from hive_metastore.ttypes import Table as HiveTable

from pyiceberg.catalog import PropertiesUpdateSummary
from pyiceberg.catalog.hive import HiveCatalog, _construct_hive_storage_descriptor
from pyiceberg.catalog.hive import (
LOCK_CHECK_MAX_WAIT_TIME,
LOCK_CHECK_MIN_WAIT_TIME,
LOCK_CHECK_RETRIES,
HiveCatalog,
_construct_hive_storage_descriptor,
)
from pyiceberg.exceptions import (
NamespaceAlreadyExistsError,
NamespaceNotEmptyError,
NoSuchNamespaceError,
NoSuchTableError,
WaitingForLockException,
)
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
Expand Down Expand Up @@ -1158,3 +1167,31 @@ def test_resolve_table_location_warehouse(hive_database: HiveDatabase) -> None:

location = catalog._resolve_table_location(None, "database", "table")
assert location == "/tmp/warehouse/database.db/table"


def test_hive_wait_for_lock() -> None:
lockid = 12345
acquired = LockResponse(lockid=lockid, state=LockState.ACQUIRED)
waiting = LockResponse(lockid=lockid, state=LockState.WAITING)
prop = {
"uri": HIVE_METASTORE_FAKE_URL,
LOCK_CHECK_MIN_WAIT_TIME: 0.1,
LOCK_CHECK_MAX_WAIT_TIME: 0.5,
LOCK_CHECK_RETRIES: 5,
}
catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop) # type: ignore
catalog._client = MagicMock()
catalog._client.lock.return_value = LockResponse(lockid=lockid, state=LockState.WAITING)

# lock will be acquired after 3 retries
catalog._client.check_lock.side_effect = [waiting if i < 2 else acquired for i in range(10)]
response: LockResponse = catalog._wait_for_lock("db", "tbl", lockid, catalog._client)
assert response.state == LockState.ACQUIRED
assert catalog._client.check_lock.call_count == 3

# lock wait should exit with WaitingForLockException finally after enough retries
catalog._client.check_lock.side_effect = [waiting for _ in range(10)]
catalog._client.check_lock.call_count = 0
with pytest.raises(WaitingForLockException):
catalog._wait_for_lock("db", "tbl", lockid, catalog._client)
assert catalog._client.check_lock.call_count == 5
31 changes: 31 additions & 0 deletions tests/integration/test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# pylint:disable=redefined-outer-name

import math
import time
import uuid
from urllib.parse import urlparse

Expand Down Expand Up @@ -48,6 +49,7 @@
StringType,
TimestampType,
)
from pyiceberg.utils.concurrent import ExecutorFactory

DEFAULT_PROPERTIES = {'write.parquet.compression-codec': 'zstd'}

Expand Down Expand Up @@ -506,3 +508,32 @@ def test_hive_locking(session_catalog_hive: HiveCatalog) -> None:
table.transaction().set_properties(lock="fail").commit_transaction()
finally:
open_client.unlock(UnlockRequest(lock.lockid))


@pytest.mark.integration
def test_hive_locking_with_retry(session_catalog_hive: HiveCatalog) -> None:
table = create_table(session_catalog_hive)
database_name: str
table_name: str
_, database_name, table_name = table.identifier
session_catalog_hive._lock_check_min_wait_time = 0.1
session_catalog_hive._lock_check_max_wait_time = 0.5
session_catalog_hive._lock_check_retries = 5

hive_client: _HiveClient = _HiveClient(session_catalog_hive.properties["uri"])

executor = ExecutorFactory.get_or_create()

with hive_client as open_client:

def another_task() -> None:
lock: LockResponse = open_client.lock(session_catalog_hive._create_lock_request(database_name, table_name))
time.sleep(1)
open_client.unlock(UnlockRequest(lock.lockid))

# test transaction commit with concurrent locking
executor.submit(another_task)
time.sleep(0.5)

table.transaction().set_properties(lock="xxx").commit_transaction()
assert table.properties.get("lock") == "xxx"