From 73d820a64cbb6bb64d3ef5588063d6ca7aa862e1 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 22 Mar 2025 09:49:10 -0700 Subject: [PATCH 1/6] KIP-70: Auto-commit offsets on consumer.unsubscribe() --- kafka/consumer/group.py | 16 +++++++++++++--- kafka/consumer/subscription_state.py | 5 ----- kafka/coordinator/consumer.py | 11 +++++++++-- 3 files changed, 22 insertions(+), 10 deletions(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 71b295d49..ee3f95be7 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -444,8 +444,15 @@ def assign(self, partitions): no rebalance operation triggered when group membership or cluster and topic metadata change. """ - self._subscription.assign_from_user(partitions) - self._client.set_topics([tp.topic for tp in partitions]) + if not partitions: + self.unsubscribe() + else: + # make sure the offsets of topic partitions the consumer is unsubscribing from + # are committed since there will be no following rebalance + self._coordinator.maybe_auto_commit_offsets_now() + self._subscription.assign_from_user(partitions) + self._client.set_topics([tp.topic for tp in partitions]) + log.debug("Subscribed to partition(s): %s", partitions) def assignment(self): """Get the TopicPartitions currently assigned to this consumer. @@ -959,8 +966,11 @@ def subscription(self): def unsubscribe(self): """Unsubscribe from all topics and clear all assigned partitions.""" + # make sure the offsets of topic partitions the consumer is unsubscribing from + # are committed since there will be no following rebalance + self._coordinator.maybe_auto_commit_offsets_now() self._subscription.unsubscribe() - self._coordinator.close() + self._coordinator.maybe_leave_group() self._client.cluster.need_all_topic_metadata = False self._client.set_topics([]) log.debug("Unsubscribed all topics or patterns and assigned partitions") diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 2b2bcb477..fdccc3b28 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -151,11 +151,6 @@ def change_subscription(self, topics): self.subscription = set(topics) self._group_subscription.update(topics) - # Remove any assigned partitions which are no longer subscribed to - for tp in set(self.assignment.keys()): - if tp.topic not in self.subscription: - del self.assignment[tp] - def group_subscribe(self, topics): """Add topics to the current group subscription. diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 3d180ca0c..f086b0fd7 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -878,8 +878,15 @@ def _maybe_auto_commit_offsets_async(self): self.next_auto_commit_deadline = time.time() + self.config['retry_backoff_ms'] / 1000 elif time.time() > self.next_auto_commit_deadline: self.next_auto_commit_deadline = time.time() + self.auto_commit_interval - self.commit_offsets_async(self._subscription.all_consumed_offsets(), - self._commit_offsets_async_on_complete) + self._do_auto_commit_offsets_async() + + def maybe_auto_commit_offsets_now(self): + if self.config['enable_auto_commit'] and not self.coordinator_unknown(): + self._do_auto_commit_offsets_async() + + def _do_auto_commit_offsets_async(self): + self.commit_offsets_async(self._subscription.all_consumed_offsets(), + self._commit_offsets_async_on_complete) class ConsumerCoordinatorMetrics(object): From 856c08d43875ab1f0ddea7b62d74fc3d45f113f4 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 24 Mar 2025 10:22:25 -0700 Subject: [PATCH 2/6] Raise TypeError if subscription topics is not list or non-str sequence --- kafka/consumer/subscription_state.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index fdccc3b28..a1675c724 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -1,6 +1,10 @@ from __future__ import absolute_import import abc +try: + from collections import Sequence +except ImportError: + from collections.abc import Sequence import logging import re @@ -114,6 +118,8 @@ def subscribe(self, topics=(), pattern=None, listener=None): self.subscription = set() self.subscribed_pattern = re.compile(pattern) else: + if isinstance(topics, str) or not isinstance(topics, Sequence): + raise TypeError('Topics must be a list (or non-str sequence)') self.change_subscription(topics) if listener and not isinstance(listener, ConsumerRebalanceListener): From dfca19c8a242a50d62b75ea022bb84231bb6c0a0 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 24 Mar 2025 10:23:38 -0700 Subject: [PATCH 3/6] test_assign --- test/test_consumer.py | 64 ++++++++++++++++++++++++++++++------------- 1 file changed, 45 insertions(+), 19 deletions(-) diff --git a/test/test_consumer.py b/test/test_consumer.py index 8186125df..0d9477729 100644 --- a/test/test_consumer.py +++ b/test/test_consumer.py @@ -1,26 +1,52 @@ +from __future__ import absolute_import + import pytest -from kafka import KafkaConsumer -from kafka.errors import KafkaConfigurationError +from kafka import KafkaConsumer, TopicPartition +from kafka.errors import KafkaConfigurationError, IllegalStateError + + +def test_session_timeout_larger_than_request_timeout_raises(): + with pytest.raises(KafkaConfigurationError): + KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 9), group_id='foo', session_timeout_ms=50000, request_timeout_ms=40000) + + +def test_fetch_max_wait_larger_than_request_timeout_raises(): + with pytest.raises(KafkaConfigurationError): + KafkaConsumer(bootstrap_servers='localhost:9092', fetch_max_wait_ms=50000, request_timeout_ms=40000) + + +def test_request_timeout_larger_than_connections_max_idle_ms_raises(): + with pytest.raises(KafkaConfigurationError): + KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 9), request_timeout_ms=50000, connections_max_idle_ms=40000) + +def test_subscription_copy(): + consumer = KafkaConsumer('foo', api_version=(0, 10, 0)) + sub = consumer.subscription() + assert sub is not consumer.subscription() + assert sub == set(['foo']) + sub.add('fizz') + assert consumer.subscription() == set(['foo']) -class TestKafkaConsumer: - def test_session_timeout_larger_than_request_timeout_raises(self): - with pytest.raises(KafkaConfigurationError): - KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 9), group_id='foo', session_timeout_ms=50000, request_timeout_ms=40000) - def test_fetch_max_wait_larger_than_request_timeout_raises(self): - with pytest.raises(KafkaConfigurationError): - KafkaConsumer(bootstrap_servers='localhost:9092', fetch_max_wait_ms=50000, request_timeout_ms=40000) +def test_assign(): + # Consumer w/ subscription to topic 'foo' + consumer = KafkaConsumer('foo', api_version=(0, 10, 0)) + assert consumer.assignment() == set() + # Cannot assign manually + with pytest.raises(IllegalStateError): + consumer.assign([TopicPartition('foo', 0)]) - def test_request_timeout_larger_than_connections_max_idle_ms_raises(self): - with pytest.raises(KafkaConfigurationError): - KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 9), request_timeout_ms=50000, connections_max_idle_ms=40000) + assert 'foo' in consumer._client._topics - def test_subscription_copy(self): - consumer = KafkaConsumer('foo', api_version=(0, 10, 0)) - sub = consumer.subscription() - assert sub is not consumer.subscription() - assert sub == set(['foo']) - sub.add('fizz') - assert consumer.subscription() == set(['foo']) + consumer = KafkaConsumer(api_version=(0, 10, 0)) + assert consumer.assignment() == set() + consumer.assign([TopicPartition('foo', 0)]) + assert consumer.assignment() == set([TopicPartition('foo', 0)]) + assert 'foo' in consumer._client._topics + # Cannot subscribe + with pytest.raises(IllegalStateError): + consumer.subscribe(topics=['foo']) + consumer.assign([]) + assert consumer.assignment() == set() From 6db022c985f3d896c25953723c491392f9c2a5ed Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 24 Mar 2025 10:35:57 -0700 Subject: [PATCH 4/6] test_subscription_state --- test/test_subscription_state.py | 48 +++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 test/test_subscription_state.py diff --git a/test/test_subscription_state.py b/test/test_subscription_state.py new file mode 100644 index 000000000..aaec44c4c --- /dev/null +++ b/test/test_subscription_state.py @@ -0,0 +1,48 @@ +from __future__ import absolute_import + +import pytest + +from kafka import TopicPartition +from kafka.consumer.subscription_state import SubscriptionState, TopicPartitionState +from kafka.vendor import six + + +def test_type_error(): + s = SubscriptionState() + with pytest.raises(TypeError): + s.subscribe(topics='foo') + + s.subscribe(topics=['foo']) + + +def test_change_subscription(): + s = SubscriptionState() + s.subscribe(topics=['foo']) + assert s.subscription == set(['foo']) + s.change_subscription(['bar']) + assert s.subscription == set(['bar']) + + +def test_group_subscribe(): + s = SubscriptionState() + s.subscribe(topics=['foo']) + assert s.subscription == set(['foo']) + s.group_subscribe(['bar']) + assert s.subscription == set(['foo']) + assert s._group_subscription == set(['foo', 'bar']) + + s.reset_group_subscription() + assert s.subscription == set(['foo']) + assert s._group_subscription == set(['foo']) + + +def test_assign_from_subscribed(): + s = SubscriptionState() + s.subscribe(topics=['foo']) + with pytest.raises(ValueError): + s.assign_from_subscribed([TopicPartition('bar', 0)]) + + s.assign_from_subscribed([TopicPartition('foo', 0), TopicPartition('foo', 1)]) + assert set(s.assignment.keys()) == set([TopicPartition('foo', 0), TopicPartition('foo', 1)]) + assert all([isinstance(s, TopicPartitionState) for s in six.itervalues(s.assignment)]) + assert all([not s.has_valid_position for s in six.itervalues(s.assignment)]) From 21fbd6ec720a702865b12f36129d7aa0b19f12d9 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 24 Mar 2025 10:48:14 -0700 Subject: [PATCH 5/6] fixup fetcher test --- test/test_fetcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_fetcher.py b/test/test_fetcher.py index a22f78657..f6e1cf5f4 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -148,7 +148,7 @@ def test_update_fetch_positions(fetcher, topic, mocker): def test__reset_offset(fetcher, mocker): tp = TopicPartition("topic", 0) - fetcher._subscriptions.subscribe(topics="topic") + fetcher._subscriptions.subscribe(topics=["topic"]) fetcher._subscriptions.assign_from_subscribed([tp]) fetcher._subscriptions.need_offset_reset(tp) mocked = mocker.patch.object(fetcher, '_retrieve_offsets') From 6ad4c510c4382ce95a8c9340e3e52876b85f8059 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 24 Mar 2025 10:52:39 -0700 Subject: [PATCH 6/6] test changing subscription retains assignment --- test/test_subscription_state.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/test/test_subscription_state.py b/test/test_subscription_state.py index aaec44c4c..bb2c81bff 100644 --- a/test/test_subscription_state.py +++ b/test/test_subscription_state.py @@ -46,3 +46,12 @@ def test_assign_from_subscribed(): assert set(s.assignment.keys()) == set([TopicPartition('foo', 0), TopicPartition('foo', 1)]) assert all([isinstance(s, TopicPartitionState) for s in six.itervalues(s.assignment)]) assert all([not s.has_valid_position for s in six.itervalues(s.assignment)]) + + +def test_change_subscription_after_assignment(): + s = SubscriptionState() + s.subscribe(topics=['foo']) + s.assign_from_subscribed([TopicPartition('foo', 0), TopicPartition('foo', 1)]) + # Changing subscription retains existing assignment until next rebalance + s.change_subscription(['bar']) + assert set(s.assignment.keys()) == set([TopicPartition('foo', 0), TopicPartition('foo', 1)])