1515# specific language governing permissions and limitations
1616# under the License.
1717import getpass
18+ import logging
1819import socket
1920import time
2021from types import TracebackType
3334from hive_metastore .ThriftHiveMetastore import Client
3435from hive_metastore .ttypes import (
3536 AlreadyExistsException ,
37+ CheckLockRequest ,
3638 FieldSchema ,
3739 InvalidOperationException ,
3840 LockComponent ,
4951)
5052from hive_metastore .ttypes import Database as HiveDatabase
5153from hive_metastore .ttypes import Table as HiveTable
54+ from tenacity import retry , retry_if_exception_type , stop_after_attempt , wait_exponential
5255from thrift .protocol import TBinaryProtocol
5356from thrift .transport import TSocket , TTransport
5457
6972 NoSuchNamespaceError ,
7073 NoSuchTableError ,
7174 TableAlreadyExistsError ,
75+ WaitingForLockException ,
7276)
7377from pyiceberg .io import FileIO , load_file_io
7478from pyiceberg .partitioning import UNPARTITIONED_PARTITION_SPEC , PartitionSpec
7579from pyiceberg .schema import Schema , SchemaVisitor , visit
7680from pyiceberg .serializers import FromInputFile
77- from pyiceberg .table import CommitTableRequest , CommitTableResponse , PropertyUtil , Table , TableProperties , update_table_metadata
81+ from pyiceberg .table import (
82+ CommitTableRequest ,
83+ CommitTableResponse ,
84+ PropertyUtil ,
85+ Table ,
86+ TableProperties ,
87+ update_table_metadata ,
88+ )
7889from pyiceberg .table .metadata import new_table_metadata
7990from pyiceberg .table .sorting import UNSORTED_SORT_ORDER , SortOrder
8091from pyiceberg .typedef import EMPTY_DICT , Identifier , Properties
111122HIVE2_COMPATIBLE = "hive.hive2-compatible"
112123HIVE2_COMPATIBLE_DEFAULT = False
113124
125+ LOCK_CHECK_MIN_WAIT_TIME = "lock-check-min-wait-time"
126+ LOCK_CHECK_MAX_WAIT_TIME = "lock-check-max-wait-time"
127+ LOCK_CHECK_RETRIES = "lock-check-retries"
128+ DEFAULT_LOCK_CHECK_MIN_WAIT_TIME = 0.1 # 100 milliseconds
129+ DEFAULT_LOCK_CHECK_MAX_WAIT_TIME = 60 # 1 min
130+ DEFAULT_LOCK_CHECK_RETRIES = 4
131+
132+ logger = logging .getLogger (__name__ )
133+
114134
115135class _HiveClient :
116136 """Helper class to nicely open and close the transport."""
@@ -240,6 +260,18 @@ def __init__(self, name: str, **properties: str):
240260 super ().__init__ (name , ** properties )
241261 self ._client = _HiveClient (properties ["uri" ], properties .get ("ugi" ))
242262
263+ self ._lock_check_min_wait_time = PropertyUtil .property_as_float (
264+ properties , LOCK_CHECK_MIN_WAIT_TIME , DEFAULT_LOCK_CHECK_MIN_WAIT_TIME
265+ )
266+ self ._lock_check_max_wait_time = PropertyUtil .property_as_float (
267+ properties , LOCK_CHECK_MAX_WAIT_TIME , DEFAULT_LOCK_CHECK_MAX_WAIT_TIME
268+ )
269+ self ._lock_check_retries = PropertyUtil .property_as_float (
270+ properties ,
271+ LOCK_CHECK_RETRIES ,
272+ DEFAULT_LOCK_CHECK_RETRIES ,
273+ )
274+
243275 def _convert_hive_into_iceberg (self , table : HiveTable , io : FileIO ) -> Table :
244276 properties : Dict [str , str ] = table .parameters
245277 if TABLE_TYPE not in properties :
@@ -356,6 +388,26 @@ def _create_lock_request(self, database_name: str, table_name: str) -> LockReque
356388
357389 return lock_request
358390
391+ def _wait_for_lock (self , database_name : str , table_name : str , lockid : int , open_client : Client ) -> LockResponse :
392+ @retry (
393+ retry = retry_if_exception_type (WaitingForLockException ),
394+ wait = wait_exponential (multiplier = 2 , min = self ._lock_check_min_wait_time , max = self ._lock_check_max_wait_time ),
395+ stop = stop_after_attempt (self ._lock_check_retries ),
396+ reraise = True ,
397+ )
398+ def _do_wait_for_lock () -> LockResponse :
399+ response : LockResponse = open_client .check_lock (CheckLockRequest (lockid = lockid ))
400+ if response .state == LockState .ACQUIRED :
401+ return response
402+ elif response .state == LockState .WAITING :
403+ msg = f"Wait on lock for { database_name } .{ table_name } "
404+ logger .warning (msg )
405+ raise WaitingForLockException (msg )
406+ else :
407+ raise CommitFailedException (f"Failed to check lock for { database_name } .{ table_name } , state: { response .state } " )
408+
409+ return _do_wait_for_lock ()
410+
359411 def _commit_table (self , table_request : CommitTableRequest ) -> CommitTableResponse :
360412 """Update the table.
361413
@@ -380,7 +432,10 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
380432
381433 try :
382434 if lock .state != LockState .ACQUIRED :
383- raise CommitFailedException (f"Failed to acquire lock for { table_request .identifier } , state: { lock .state } " )
435+ if lock .state == LockState .WAITING :
436+ self ._wait_for_lock (database_name , table_name , lock .lockid , open_client )
437+ else :
438+ raise CommitFailedException (f"Failed to acquire lock for { table_request .identifier } , state: { lock .state } " )
384439
385440 hive_table = open_client .get_table (dbname = database_name , tbl_name = table_name )
386441 io = load_file_io ({** self .properties , ** hive_table .parameters }, hive_table .sd .location )
@@ -406,6 +461,8 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
406461 open_client .alter_table (dbname = database_name , tbl_name = table_name , new_tbl = hive_table )
407462 except NoSuchObjectException as e :
408463 raise NoSuchTableError (f"Table does not exist: { table_name } " ) from e
464+ except WaitingForLockException as e :
465+ raise CommitFailedException (f"Failed to acquire lock for { table_request .identifier } , state: { lock .state } " ) from e
409466 finally :
410467 open_client .unlock (UnlockRequest (lockid = lock .lockid ))
411468
0 commit comments