10
10
11
11
from kafka .admin .acl_resource import ACLOperation , ACLPermissionType , ACLFilter , ACL , ResourcePattern , ResourceType , \
12
12
ACLResourcePatternType
13
- from kafka .client_async import KafkaClient , selectors
13
+ from kafka .client_async import selectors , KafkaClient
14
14
from kafka .coordinator .protocol import ConsumerProtocolMemberMetadata , ConsumerProtocolMemberAssignment , ConsumerProtocol
15
15
import kafka .errors as Errors
16
16
from kafka .errors import (
26
26
from kafka .protocol .metadata import MetadataRequest
27
27
from kafka .protocol .types import Array
28
28
from kafka .structs import TopicPartition , OffsetAndMetadata , MemberInformation , GroupInformation
29
+ from kafka .util import get_client_factory
29
30
from kafka .version import __version__
30
31
31
32
@@ -146,6 +147,7 @@ class KafkaAdminClient(object):
146
147
sasl mechanism handshake. Default: one of bootstrap servers
147
148
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
148
149
instance. (See kafka.oauth.abstract). Default: None
150
+ client (callable): Custom class / callable for creating KafkaClient instances
149
151
150
152
"""
151
153
DEFAULT_CONFIG = {
@@ -186,6 +188,7 @@ class KafkaAdminClient(object):
186
188
'metric_reporters' : [],
187
189
'metrics_num_samples' : 2 ,
188
190
'metrics_sample_window_ms' : 30000 ,
191
+ 'client' : KafkaClient ,
189
192
}
190
193
191
194
def __init__ (self , ** configs ):
@@ -204,10 +207,12 @@ def __init__(self, **configs):
204
207
tags = metrics_tags )
205
208
reporters = [reporter () for reporter in self .config ['metric_reporters' ]]
206
209
self ._metrics = Metrics (metric_config , reporters )
207
-
208
- self ._client = KafkaClient (metrics = self ._metrics ,
209
- metric_group_prefix = 'admin' ,
210
- ** self .config )
210
+ assert callable (self .config ['client' ]), "Client parameter should be callable"
211
+ self ._client = self .config ['client' ](
212
+ metrics = self ._metrics ,
213
+ metric_group_prefix = 'admin' ,
214
+ ** self .config
215
+ )
211
216
self ._client .check_version (timeout = (self .config ['api_version_auto_timeout_ms' ] / 1000 ))
212
217
213
218
# Get auto-discovered version from client if necessary
0 commit comments