@@ -120,6 +120,7 @@ def __init__(
120120 ca = None ,
121121 use_ssl = False ,
122122 verify_certs = True ,
123+ concurrent_request_limit = 0 ,
123124 ** kwargs ,
124125 ):
125126 """Create a :class:`KazooClient` instance. All time arguments
@@ -241,6 +242,18 @@ def __init__(
241242 self .keyfile = keyfile
242243 self .keyfile_password = keyfile_password
243244 self .ca = ca
245+ if concurrent_request_limit > 0 :
246+ self .logger .info (
247+ "Zookeeper client rate-limited to %d concurrent requests" ,
248+ concurrent_request_limit ,
249+ )
250+ self .rate_limiting_sem = self .handler .semaphore_impl (
251+ concurrent_request_limit
252+ )
253+
254+ else :
255+ self .rate_limiting_sem = None
256+
244257 # Curator like simplified state tracking, and listeners for
245258 # state transitions
246259 self ._state = KeeperState .CLOSED
@@ -635,6 +648,16 @@ def _call(self, request, async_object):
635648 async_object .set_exception (SessionExpiredError ())
636649 return False
637650
651+ if self .rate_limiting_sem :
652+ if not self .rate_limiting_sem .acquire (blocking = False ):
653+ self .logger .info (
654+ "Limiting concurrent requests. Waiting for completion." ,
655+ concurrent_request_limit ,
656+ )
657+ # Actually block on the sempahore here
658+ self .rate_limiting_sem .acquire (blocking = True )
659+ async_object .rawlink (lambda _res : self .rate_limiting_sem .release ())
660+
638661 self ._queue .append ((request , async_object ))
639662
640663 # wake the connection, guarding against a race with close()
0 commit comments