Skip to content

Commit f1dc01e

Browse files
committed
Merge pull request #380 from dpkp/multiprocess_consumer_partitions
Support optional partitions kwarg in MultiProcessConsumer
2 parents 062ddff + e298081 commit f1dc01e

File tree

2 files changed

+18
-5
lines changed

2 files changed

+18
-5
lines changed

kafka/consumer/multiprocess.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ class MultiProcessConsumer(Consumer):
100100
topic: the topic to consume
101101
102102
Keyword Arguments:
103+
partitions: An optional list of partitions to consume the data from
103104
auto_commit: default True. Whether or not to auto commit the offsets
104105
auto_commit_every_n: default 100. How many messages to consume
105106
before a commit
@@ -116,16 +117,19 @@ class MultiProcessConsumer(Consumer):
116117
commit method on this class. A manual call to commit will also reset
117118
these triggers
118119
"""
119-
def __init__(self, client, group, topic, auto_commit=True,
120+
def __init__(self, client, group, topic,
121+
partitions=None,
122+
auto_commit=True,
120123
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
121124
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
122-
num_procs=1, partitions_per_proc=0,
125+
num_procs=1,
126+
partitions_per_proc=0,
123127
**simple_consumer_options):
124128

125129
# Initiate the base consumer class
126130
super(MultiProcessConsumer, self).__init__(
127131
client, group, topic,
128-
partitions=None,
132+
partitions=partitions,
129133
auto_commit=auto_commit,
130134
auto_commit_every_n=auto_commit_every_n,
131135
auto_commit_every_t=auto_commit_every_t)

test/test_consumer.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11

2-
from mock import MagicMock
2+
from mock import MagicMock, patch
33
from . import unittest
44

5-
from kafka import SimpleConsumer, KafkaConsumer
5+
from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer
66
from kafka.common import KafkaConfigurationError
77

88
class TestKafkaConsumer(unittest.TestCase):
@@ -13,3 +13,12 @@ def test_non_integer_partitions(self):
1313
def test_broker_list_required(self):
1414
with self.assertRaises(KafkaConfigurationError):
1515
KafkaConsumer()
16+
17+
class TestMultiProcessConsumer(unittest.TestCase):
18+
def test_partition_list(self):
19+
client = MagicMock()
20+
partitions = (0,)
21+
with patch.object(MultiProcessConsumer, 'fetch_last_known_offsets') as fetch_last_known_offsets:
22+
consumer = MultiProcessConsumer(client, 'testing-group', 'testing-topic', partitions=partitions)
23+
self.assertEqual(fetch_last_known_offsets.call_args[0], (partitions,) )
24+
self.assertEqual(client.get_partition_ids_for_topic.call_count, 0) # pylint: disable=no-member

0 commit comments

Comments
 (0)