Skip to content

Commit 19e1393

Browse files
committed
Allow injecting clients into Kafka Consumers and Producers.
1 parent cbcb4a6 commit 19e1393

File tree

2 files changed

+15
-3
lines changed

2 files changed

+15
-3
lines changed

kafka/consumer/group.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ class KafkaConsumer(six.Iterator):
4040
It just needs to have at least one broker that will respond to a
4141
Metadata API Request. Default port is 9092. If no servers are
4242
specified, will default to localhost:9092.
43+
client (kafka.client_async.KafkaClient): a kafka client to
44+
use, or if unprovided, one is constructed from the provided
45+
configuration.
4346
client_id (str): a name for this client. This string is passed in
4447
each request to servers and can be used to identify specific
4548
server-side log entries that correspond to this client. Also
@@ -201,6 +204,8 @@ class KafkaConsumer(six.Iterator):
201204
}
202205

203206
def __init__(self, *topics, **configs):
207+
client = configs.pop("client", None)
208+
204209
self.config = copy.copy(self.DEFAULT_CONFIG)
205210
for key in self.config:
206211
if key in configs:
@@ -226,7 +231,9 @@ def __init__(self, *topics, **configs):
226231
metric_group_prefix = 'consumer'
227232
# TODO _metrics likely needs to be passed to KafkaClient, etc.
228233

229-
self._client = KafkaClient(**self.config)
234+
if client is None:
235+
client = KafkaClient(**self.config)
236+
self._client = client
230237

231238
# Check Broker Version if not set explicitly
232239
if self.config['api_version'] == 'auto':

kafka/producer/kafka.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ class KafkaProducer(object):
7979
It just needs to have at least one broker that will respond to a
8080
Metadata API Request. Default port is 9092. If no servers are
8181
specified, will default to localhost:9092.
82+
client (kafka.client_async.KafkaClient): a kafka client to
83+
use, or if unprovided, one is constructed from the provided
84+
configuration.
8285
client_id (str): a name for this client. This string is passed in
8386
each request to servers and can be used to identify specific
8487
server-side log entries that correspond to this client.
@@ -253,7 +256,7 @@ class KafkaProducer(object):
253256
'api_version': 'auto',
254257
}
255258

256-
def __init__(self, **configs):
259+
def __init__(self, client=None, **configs):
257260
log.debug("Starting the Kafka producer") # trace
258261
self.config = copy.copy(self._DEFAULT_CONFIG)
259262
for key in self.config:
@@ -270,7 +273,9 @@ def __init__(self, **configs):
270273
if self.config['acks'] == 'all':
271274
self.config['acks'] = -1
272275

273-
client = KafkaClient(**self.config)
276+
if client is None:
277+
client = KafkaClient(**self.config)
278+
self.client = client
274279

275280
# Check Broker Version if not set explicitly
276281
if self.config['api_version'] == 'auto':

0 commit comments

Comments
 (0)