Skip to content

Commit 9e09f0f

Browse files
committed
Fix describe config for multi-broker clusters
Currently all describe config requests are sent to "least loaded node". Requests for broker configs must, however, be sent to the specific broker, otherwise an error is returned. Only topic requests can be handled by any node. This changes the logic to send all describe config requests to the specific broker.
1 parent eed25fc commit 9e09f0f

File tree

2 files changed

+125
-12
lines changed

2 files changed

+125
-12
lines changed

kafka/admin/client.py

Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import logging
66
import socket
77

8+
from . import ConfigResourceType
89
from kafka.vendor import six
910

1011
from kafka.client_async import KafkaClient, selectors
@@ -484,28 +485,74 @@ def describe_configs(self, config_resources, include_synonyms=False):
484485
supported by all versions. Default: False.
485486
:return: Appropriate version of DescribeConfigsResponse class.
486487
"""
488+
489+
# Break up requests by type - a broker config request must be sent to the specific broker.
490+
# All other (currently just topic resources) can be sent to any broker.
491+
broker_resources = []
492+
topic_resources = []
493+
494+
for config_resource in config_resources:
495+
if config_resource.resource_type == ConfigResourceType.BROKER:
496+
broker_resources.append(self._convert_describe_config_resource_request(config_resource))
497+
else:
498+
topic_resources.append(self._convert_describe_config_resource_request(config_resource))
499+
500+
futures = []
487501
version = self._matching_api_version(DescribeConfigsRequest)
488502
if version == 0:
489503
if include_synonyms:
490504
raise IncompatibleBrokerVersion(
491505
"include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}."
492-
.format(self.config['api_version']))
493-
request = DescribeConfigsRequest[version](
494-
resources=[self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources]
495-
)
506+
.format(self.config['api_version']))
507+
508+
if len(broker_resources) > 0:
509+
for broker_resource in broker_resources:
510+
try:
511+
futures.append(self._send_request_to_node(
512+
int(broker_resource[1]),
513+
DescribeConfigsRequest[version](resources=[broker_resource])
514+
))
515+
except ValueError:
516+
raise ValueError("Broker resource names must be an integer or a string represented integer")
517+
518+
if len(topic_resources) > 0:
519+
futures.append(self._send_request_to_node(
520+
self._client.least_loaded_node(),
521+
DescribeConfigsRequest[version](resources=topic_resources)
522+
))
523+
496524
elif version == 1:
497-
request = DescribeConfigsRequest[version](
498-
resources=[self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources],
499-
include_synonyms=include_synonyms
500-
)
525+
if len(broker_resources) > 0:
526+
for broker_resource in broker_resources:
527+
try:
528+
futures.append(self._send_request_to_node(
529+
int(broker_resource[1]),
530+
DescribeConfigsRequest[version](
531+
resources=[broker_resource],
532+
include_synonyms=include_synonyms)
533+
))
534+
except ValueError:
535+
raise ValueError("Broker resource names must be an integer or a string represented integer")
536+
537+
if len(topic_resources) > 0:
538+
futures.append(self._send_request_to_node(
539+
self._client.least_loaded_node(),
540+
DescribeConfigsRequest[version](resources=topic_resources, include_synonyms=include_synonyms)
541+
))
501542
else:
502543
raise NotImplementedError(
503544
"Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient."
504-
.format(version))
505-
future = self._send_request_to_node(self._client.least_loaded_node(), request)
545+
.format(version))
546+
547+
self._wait_for_futures(futures)
548+
549+
# Use one of the results as the general response and add all other resources to it
550+
response = copy.copy(futures[0])
551+
response.resources = []
552+
553+
for future in futures:
554+
response.resources.extend(future.value.resources)
506555

507-
self._wait_for_futures([future])
508-
response = future.value
509556
return response
510557

511558
@staticmethod

test/test_admin_integration.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import os
2+
3+
from kafka import KafkaAdminClient
4+
from kafka.admin import ConfigResource, ConfigResourceType
5+
from test.fixtures import ZookeeperFixture, KafkaFixture
6+
from test.testutil import KafkaIntegrationTestCase, kafka_versions
7+
8+
9+
class TestAdminClientIntegration(KafkaIntegrationTestCase):
10+
@classmethod
11+
def setUpClass(cls): # noqa
12+
if not os.environ.get('KAFKA_VERSION'):
13+
return
14+
15+
cls.zk = ZookeeperFixture.instance()
16+
cls.server = KafkaFixture.instance(0, cls.zk)
17+
18+
@classmethod
19+
def tearDownClass(cls): # noqa
20+
if not os.environ.get('KAFKA_VERSION'):
21+
return
22+
23+
cls.server.close()
24+
cls.zk.close()
25+
26+
def admin_client(self, **configs):
27+
brokers = '%s:%d' % (self.server.host, self.server.port)
28+
admin_client = KafkaAdminClient(
29+
bootstrap_servers=brokers, **configs)
30+
return admin_client
31+
32+
@kafka_versions('>=0.10.0')
33+
def test_describe_configs_broker_resource_returns_configs(self):
34+
broker_id = self.client.brokers[0].nodeId
35+
configs = self.admin_client().describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)])
36+
37+
self.assertEqual(len(configs.resources), 1)
38+
self.assertEqual(configs.resources[0][2], ConfigResourceType.BROKER)
39+
self.assertEqual(configs.resources[0][3], str(broker_id))
40+
self.assertGreater(len(configs.resources[0][4]), 1)
41+
42+
@kafka_versions('>=0.10.0')
43+
def test_describe_configs_topic_resource_returns_configs(self):
44+
topic = self.client.topics[0]
45+
configs = self.admin_client().describe_configs([ConfigResource(ConfigResourceType.TOPIC, topic)])
46+
47+
self.assertEqual(len(configs.resources), 1)
48+
self.assertEqual(configs.resources[0][2], ConfigResourceType.TOPIC)
49+
self.assertEqual(configs.resources[0][3], topic)
50+
self.assertGreater(len(configs.resources[0][4]), 1)
51+
52+
@kafka_versions('>=0.10.0')
53+
def test_describe_configs_mixed_resources_returns_configs(self):
54+
topic = self.client.topics[0]
55+
broker_id = self.client.brokers[0].nodeId
56+
configs = self.admin_client().describe_configs([
57+
ConfigResource(ConfigResourceType.TOPIC, topic),
58+
ConfigResource(ConfigResourceType.BROKER, broker_id)])
59+
60+
self.assertEqual(len(configs.resources), 2)
61+
62+
for resource in configs.resources:
63+
self.assertTrue(
64+
(configs.resources[0][2] == ConfigResourceType.TOPIC and configs.resources[0][3] == topic) or
65+
(configs.resources[0][2] == ConfigResourceType.BROKER and configs.resources[0][3] == str(broker_id)))
66+
self.assertGreater(len(resource[4]), 1)

0 commit comments

Comments
 (0)