From 939551922244788822d0e778dd0e30851e88cd09 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 9 Jan 2018 16:56:41 -0800 Subject: [PATCH 1/2] Test ensure active group --- test/test_coordinator.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 7dc0e0484..f56736912 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -620,3 +620,16 @@ def test_lookup_coordinator_failure(mocker, coordinator): return_value=Future().failure(Exception('foobar'))) future = coordinator.lookup_coordinator() assert future.failed() + + +def test_ensure_active_group(mocker, coordinator): + coordinator._subscription.subscribe(topics=['foobar']) + mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False) + mocker.patch.object(coordinator, '_send_join_group_request', return_value=Future().success(True)) + mocker.patch.object(coordinator, 'need_rejoin', side_effect=[True, True, False]) + mocker.patch.object(coordinator, '_on_join_complete') + mocker.patch.object(coordinator, '_heartbeat_thread') + + coordinator.ensure_active_group() + + coordinator._send_join_group_request.assert_called_once_with() From 987e3fef748f4d7989c61b72c24ade7cff71b17d Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 9 Jan 2018 16:57:29 -0800 Subject: [PATCH 2/2] Fix race condition in coordinator join_future handling --- kafka/coordinator/base.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 30b9c4052..24412c9bf 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -377,19 +377,23 @@ def ensure_active_group(self): # before the pending rebalance has completed. if self.join_future is None: self.state = MemberState.REBALANCING - self.join_future = self._send_join_group_request() + future = self._send_join_group_request() + + self.join_future = future # this should happen before adding callbacks # handle join completion in the callback so that the # callback will be invoked even if the consumer is woken up # before finishing the rebalance - self.join_future.add_callback(self._handle_join_success) + future.add_callback(self._handle_join_success) # we handle failures below after the request finishes. # If the join completes after having been woken up, the # exception is ignored and we will rejoin - self.join_future.add_errback(self._handle_join_failure) + future.add_errback(self._handle_join_failure) + + else: + future = self.join_future - future = self.join_future self._client.poll(future=future) if future.failed():