@@ -236,6 +236,7 @@ def __init__(self, host, port, afi, **configs):
236
236
self ._sock_afi = afi
237
237
self ._sock_addr = None
238
238
self ._api_versions = None
239
+ self ._throttle_time = None
239
240
240
241
self .config = copy .copy (self .DEFAULT_CONFIG )
241
242
for key in self .config :
@@ -851,6 +852,27 @@ def blacked_out(self):
851
852
return self .connection_delay () > 0
852
853
return False
853
854
855
+ def throttled (self ):
856
+ """
857
+ Return True if we are connected but currently throttled.
858
+ """
859
+ if self .state is not ConnectionStates .CONNECTED :
860
+ return False
861
+ return self .throttle_delay () > 0
862
+
863
+ def throttle_delay (self ):
864
+ """
865
+ Return the number of milliseconds to wait until connection is no longer throttled.
866
+ """
867
+ if self ._throttle_time is not None :
868
+ remaining_ms = (self ._throttle_time - time .time ()) * 1000
869
+ if remaining_ms > 0 :
870
+ return remaining_ms
871
+ else :
872
+ self ._throttle_time = None
873
+ return 0
874
+ return 0
875
+
854
876
def connection_delay (self ):
855
877
"""
856
878
Return the number of milliseconds to wait, based on the connection
@@ -976,6 +998,9 @@ def send(self, request, blocking=True, request_timeout_ms=None):
976
998
elif not self .connected ():
977
999
return future .failure (Errors .KafkaConnectionError (str (self )))
978
1000
elif not self .can_send_more ():
1001
+ # very small race here, but prefer it over breaking abstraction to check self._throttle_time
1002
+ if self .throttled ():
1003
+ return future .failure (Errors .ThrottlingQuotaExceededError (str (self )))
979
1004
return future .failure (Errors .TooManyInFlightRequests (str (self )))
980
1005
return self ._send (request , blocking = blocking , request_timeout_ms = request_timeout_ms )
981
1006
@@ -1063,8 +1088,26 @@ def send_pending_requests_v2(self):
1063
1088
self .close (error = error )
1064
1089
return False
1065
1090
1091
+ def _maybe_throttle (self , response ):
1092
+ throttle_time_ms = getattr (response , 'throttle_time_ms' , 0 )
1093
+ if self ._sensors :
1094
+ self ._sensors .throttle_time .record (throttle_time_ms )
1095
+ if not throttle_time_ms :
1096
+ if self ._throttle_time is not None :
1097
+ self ._throttle_time = None
1098
+ return
1099
+ # Client side throttling enabled in v2.0 brokers
1100
+ # prior to that throttling (if present) was managed broker-side
1101
+ if self .config ['api_version' ] is not None and self .config ['api_version' ] >= (2 , 0 ):
1102
+ throttle_time = time .time () + throttle_time_ms / 1000
1103
+ self ._throttle_time = max (throttle_time , self ._throttle_time or 0 )
1104
+ log .warning ("%s: %s throttled by broker (%d ms)" , self ,
1105
+ response .__class__ .__name__ , throttle_time_ms )
1106
+
1066
1107
def can_send_more (self ):
1067
- """Return True unless there are max_in_flight_requests_per_connection."""
1108
+ """Check for throttling / quota violations and max in-flight-requests"""
1109
+ if self .throttle_delay () > 0 :
1110
+ return False
1068
1111
max_ifrs = self .config ['max_in_flight_requests_per_connection' ]
1069
1112
return len (self .in_flight_requests ) < max_ifrs
1070
1113
@@ -1097,6 +1140,7 @@ def recv(self):
1097
1140
self ._sensors .request_time .record (latency_ms )
1098
1141
1099
1142
log .debug ('%s Response %d (%s ms): %s' , self , correlation_id , latency_ms , response )
1143
+ self ._maybe_throttle (response )
1100
1144
responses [i ] = (response , future )
1101
1145
1102
1146
return responses
@@ -1399,6 +1443,16 @@ def __init__(self, metrics, metric_group_prefix, node_id):
1399
1443
'The maximum request latency in ms.' ),
1400
1444
Max ())
1401
1445
1446
+ throttle_time = metrics .sensor ('throttle-time' )
1447
+ throttle_time .add (metrics .metric_name (
1448
+ 'throttle-time-avg' , metric_group_name ,
1449
+ 'The average throttle time in ms.' ),
1450
+ Avg ())
1451
+ throttle_time .add (metrics .metric_name (
1452
+ 'throttle-time-max' , metric_group_name ,
1453
+ 'The maximum throttle time in ms.' ),
1454
+ Max ())
1455
+
1402
1456
# if one sensor of the metrics has been registered for the connection,
1403
1457
# then all other sensors should have been registered; and vice versa
1404
1458
node_str = 'node-{0}' .format (node_id )
@@ -1450,9 +1504,23 @@ def __init__(self, metrics, metric_group_prefix, node_id):
1450
1504
'The maximum request latency in ms.' ),
1451
1505
Max ())
1452
1506
1507
+ throttle_time = metrics .sensor (
1508
+ node_str + '.throttle' ,
1509
+ parents = [metrics .get_sensor ('throttle-time' )])
1510
+ throttle_time .add (metrics .metric_name (
1511
+ 'throttle-time-avg' , metric_group_name ,
1512
+ 'The average throttle time in ms.' ),
1513
+ Avg ())
1514
+ throttle_time .add (metrics .metric_name (
1515
+ 'throttle-time-max' , metric_group_name ,
1516
+ 'The maximum throttle time in ms.' ),
1517
+ Max ())
1518
+
1519
+
1453
1520
self .bytes_sent = metrics .sensor (node_str + '.bytes-sent' )
1454
1521
self .bytes_received = metrics .sensor (node_str + '.bytes-received' )
1455
1522
self .request_time = metrics .sensor (node_str + '.latency' )
1523
+ self .throttle_time = metrics .sensor (node_str + '.throttle' )
1456
1524
1457
1525
1458
1526
def _address_family (address ):
0 commit comments