diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 8ce9a24e3..8426b9542 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -474,6 +474,12 @@ def _send_join_group_request(self): def _failed_request(self, node_id, request, future, error): log.error('Error sending %s to node %s [%s]', request.__class__.__name__, node_id, error) + + # If sync group request timeout, we need try to rejoin group + version = 0 if self.config['api_version'] < (0, 11, 0) else 1 + if isinstance(request, SyncGroupRequest[version]): + self.request_rejoin() + # Marking coordinator dead # unless the error is caused by internal client pipelining if not isinstance(error, (Errors.NodeNotReadyError, @@ -716,7 +722,8 @@ def reset_generation(self): self.state = MemberState.UNJOINED def request_rejoin(self): - self.rejoin_needed = True + with self._lock: + self.rejoin_needed = True def _start_heartbeat_thread(self): if self._heartbeat_thread is None: