@@ -280,6 +280,7 @@ class KafkaProducer(object):
280
280
sasl mechanism handshake. Default: one of bootstrap servers
281
281
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
282
282
instance. (See kafka.oauth.abstract). Default: None
283
+ kafka_client (callable): Custom class / callable for creating KafkaClient instances
283
284
284
285
Note:
285
286
Configuration parameters are described in more detail at
@@ -332,7 +333,8 @@ class KafkaProducer(object):
332
333
'sasl_plain_password' : None ,
333
334
'sasl_kerberos_service_name' : 'kafka' ,
334
335
'sasl_kerberos_domain_name' : None ,
335
- 'sasl_oauth_token_provider' : None
336
+ 'sasl_oauth_token_provider' : None ,
337
+ 'kafka_client' : KafkaClient ,
336
338
}
337
339
338
340
_COMPRESSORS = {
@@ -378,9 +380,10 @@ def __init__(self, **configs):
378
380
reporters = [reporter () for reporter in self .config ['metric_reporters' ]]
379
381
self ._metrics = Metrics (metric_config , reporters )
380
382
381
- client = KafkaClient (metrics = self ._metrics , metric_group_prefix = 'producer' ,
382
- wakeup_timeout_ms = self .config ['max_block_ms' ],
383
- ** self .config )
383
+ client = self .config ['kafka_client' ](
384
+ metrics = self ._metrics , metric_group_prefix = 'producer' ,
385
+ wakeup_timeout_ms = self .config ['max_block_ms' ],
386
+ ** self .config )
384
387
385
388
# Get auto-discovered version from client if necessary
386
389
if self .config ['api_version' ] is None :
0 commit comments