From 818ff59897fcc80bc84cb2a9abd6de655bda4523 Mon Sep 17 00:00:00 2001 From: licy121 Date: Tue, 8 Jan 2019 14:22:25 +0800 Subject: [PATCH] Bugfix: sync group request timeout error will lead to consumer come in an abnormal status --- kafka/coordinator/base.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 8ce9a24e3..8426b9542 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -474,6 +474,12 @@ def _send_join_group_request(self): def _failed_request(self, node_id, request, future, error): log.error('Error sending %s to node %s [%s]', request.__class__.__name__, node_id, error) + + # If sync group request timeout, we need try to rejoin group + version = 0 if self.config['api_version'] < (0, 11, 0) else 1 + if isinstance(request, SyncGroupRequest[version]): + self.request_rejoin() + # Marking coordinator dead # unless the error is caused by internal client pipelining if not isinstance(error, (Errors.NodeNotReadyError, @@ -716,7 +722,8 @@ def reset_generation(self): self.state = MemberState.UNJOINED def request_rejoin(self): - self.rejoin_needed = True + with self._lock: + self.rejoin_needed = True def _start_heartbeat_thread(self): if self._heartbeat_thread is None: