Skip to content

Commit 6fe3bb7

Browse files
committed
set socket timeout for the wake_w
1 parent 9ac3cb1 commit 6fe3bb7

File tree

1 file changed

+5
-0
lines changed

1 file changed

+5
-0
lines changed

kafka/client_async.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ class KafkaClient(object):
151151
'bootstrap_servers': 'localhost',
152152
'client_id': 'kafka-python-' + __version__,
153153
'request_timeout_ms': 30000,
154+
'max_block_ms': 60000,
154155
'connections_max_idle_ms': 9 * 60 * 1000,
155156
'reconnect_backoff_ms': 50,
156157
'reconnect_backoff_max_ms': 1000,
@@ -198,6 +199,7 @@ def __init__(self, **configs):
198199
self._bootstrap_fails = 0
199200
self._wake_r, self._wake_w = socket.socketpair()
200201
self._wake_r.setblocking(False)
202+
self._wake_w.settimeout(self.config['max_block_ms'] / 1000.0)
201203
self._wake_lock = threading.Lock()
202204

203205
self._lock = threading.RLock()
@@ -847,6 +849,9 @@ def wakeup(self):
847849
with self._wake_lock:
848850
try:
849851
self._wake_w.sendall(b'x')
852+
except socket.timeout:
853+
log.warning('Timeout to send to wakeup socket!')
854+
raise Errors.KafkaTimeoutError()
850855
except socket.error:
851856
log.warning('Unable to send to wakeup socket!')
852857

0 commit comments

Comments
 (0)