diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 279cce033..957cffdda 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -46,7 +46,10 @@ class KafkaConsumer(six.Iterator): It just needs to have at least one broker that will respond to a Metadata API Request. Default port is 9092. If no servers are specified, will default to localhost:9092. - client_id (str): A name for this client. This string is passed in + client (kafka.client_async.KafkaClient): a kafka client to + use, or if unprovided, one is constructed from the provided + configuration. + client_id (str): a name for this client. This string is passed in each request to servers and can be used to identify specific server-side log entries that correspond to this client. Also submitted to GroupCoordinator for logging with respect to @@ -249,6 +252,7 @@ class KafkaConsumer(six.Iterator): """ DEFAULT_CONFIG = { 'bootstrap_servers': 'localhost', + 'client': None, 'client_id': 'kafka-python-' + __version__, 'group_id': None, 'key_deserializer': None, @@ -345,7 +349,11 @@ def __init__(self, *topics, **configs): log.warning('use api_version=%s [tuple] -- "%s" as str is deprecated', str(self.config['api_version']), str_version) - self._client = KafkaClient(metrics=self._metrics, **self.config) + client = self.config.pop('client', None) or KafkaClient( + metrics=self._metrics, + **self.config + ) + self._client = client # Get auto-discovered version from client if necessary if self.config['api_version'] is None: diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 4fc7bc687..9926081de 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -86,6 +86,9 @@ class KafkaProducer(object): It just needs to have at least one broker that will respond to a Metadata API Request. Default port is 9092. If no servers are specified, will default to localhost:9092. + client (kafka.client_async.KafkaClient): a kafka client to + use, or if unprovided, one is constructed from the provided + configuration. client_id (str): a name for this client. This string is passed in each request to servers and can be used to identify specific server-side log entries that correspond to this client. @@ -279,6 +282,7 @@ class KafkaProducer(object): """ DEFAULT_CONFIG = { 'bootstrap_servers': 'localhost', + 'client': None, 'client_id': None, 'key_serializer': None, 'value_serializer': None, @@ -367,8 +371,11 @@ def __init__(self, **configs): reporters = [reporter() for reporter in self.config['metric_reporters']] self._metrics = Metrics(metric_config, reporters) - client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer', - **self.config) + client = self.config['client'] or KafkaClient( + metrics=self._metrics, + metric_group_prefix='producer', + **self.config + ) # Get auto-discovered version from client if necessary if self.config['api_version'] is None: