Skip to content

Commit 6d3800c

Browse files
jlandersenjeffwidman
authored andcommitted
Fix describe config for multi-broker clusters (#1869)
* Fix describe config for multi-broker clusters Currently all describe config requests are sent to "least loaded node". Requests for broker configs must, however, be sent to the specific broker, otherwise an error is returned. Only topic requests can be handled by any node. This changes the logic to send all describe config requests to the specific broker.
1 parent 84e37e0 commit 6d3800c

File tree

2 files changed

+112
-15
lines changed

2 files changed

+112
-15
lines changed

kafka/admin/client.py

Lines changed: 56 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import logging
66
import socket
77

8+
from . import ConfigResourceType
89
from kafka.vendor import six
910

1011
from kafka.client_async import KafkaClient, selectors
@@ -763,29 +764,70 @@ def describe_configs(self, config_resources, include_synonyms=False):
763764
supported by all versions. Default: False.
764765
:return: Appropriate version of DescribeConfigsResponse class.
765766
"""
767+
768+
# Break up requests by type - a broker config request must be sent to the specific broker.
769+
# All other (currently just topic resources) can be sent to any broker.
770+
broker_resources = []
771+
topic_resources = []
772+
773+
for config_resource in config_resources:
774+
if config_resource.resource_type == ConfigResourceType.BROKER:
775+
broker_resources.append(self._convert_describe_config_resource_request(config_resource))
776+
else:
777+
topic_resources.append(self._convert_describe_config_resource_request(config_resource))
778+
779+
futures = []
766780
version = self._matching_api_version(DescribeConfigsRequest)
767781
if version == 0:
768782
if include_synonyms:
769783
raise IncompatibleBrokerVersion(
770784
"include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}."
771-
.format(self.config['api_version']))
772-
request = DescribeConfigsRequest[version](
773-
resources=[self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources]
774-
)
785+
.format(self.config['api_version']))
786+
787+
if len(broker_resources) > 0:
788+
for broker_resource in broker_resources:
789+
try:
790+
broker_id = int(broker_resource[1])
791+
except ValueError:
792+
raise ValueError("Broker resource names must be an integer or a string represented integer")
793+
794+
futures.append(self._send_request_to_node(
795+
broker_id,
796+
DescribeConfigsRequest[version](resources=[broker_resource])
797+
))
798+
799+
if len(topic_resources) > 0:
800+
futures.append(self._send_request_to_node(
801+
self._client.least_loaded_node(),
802+
DescribeConfigsRequest[version](resources=topic_resources)
803+
))
804+
775805
elif version == 1:
776-
request = DescribeConfigsRequest[version](
777-
resources=[self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources],
778-
include_synonyms=include_synonyms
779-
)
806+
if len(broker_resources) > 0:
807+
for broker_resource in broker_resources:
808+
try:
809+
broker_id = int(broker_resource[1])
810+
except ValueError:
811+
raise ValueError("Broker resource names must be an integer or a string represented integer")
812+
813+
futures.append(self._send_request_to_node(
814+
broker_id,
815+
DescribeConfigsRequest[version](
816+
resources=[broker_resource],
817+
include_synonyms=include_synonyms)
818+
))
819+
820+
if len(topic_resources) > 0:
821+
futures.append(self._send_request_to_node(
822+
self._client.least_loaded_node(),
823+
DescribeConfigsRequest[version](resources=topic_resources, include_synonyms=include_synonyms)
824+
))
780825
else:
781826
raise NotImplementedError(
782-
"Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient."
783-
.format(version))
784-
future = self._send_request_to_node(self._client.least_loaded_node(), request)
827+
"Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient.".format(version))
785828

786-
self._wait_for_futures([future])
787-
response = future.value
788-
return response
829+
self._wait_for_futures(futures)
830+
return [f.value for f in futures]
789831

790832
@staticmethod
791833
def _convert_alter_config_resource_request(config_resource):

test/test_admin_integration.py

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
from test.testutil import env_kafka_version
44

55
from kafka.errors import NoError
6-
from kafka.admin import ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL
6+
from kafka.admin import (
7+
ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType)
78

89

910
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11")
@@ -80,3 +81,57 @@ def test_create_describe_delete_acls(kafka_admin_client):
8081

8182
assert error is NoError
8283
assert len(acls) == 0
84+
85+
86+
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11")
87+
def test_describe_configs_broker_resource_returns_configs(kafka_admin_client):
88+
"""Tests that describe config returns configs for broker
89+
"""
90+
broker_id = kafka_admin_client._client.cluster._brokers[0].nodeId
91+
configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)])
92+
93+
assert len(configs) == 1
94+
assert configs[0].resources[0][2] == ConfigResourceType.BROKER
95+
assert configs[0].resources[0][3] == str(broker_id)
96+
assert len(configs[0].resources[0][4]) > 1
97+
98+
99+
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11")
100+
def test_describe_configs_topic_resource_returns_configs(topic, kafka_admin_client):
101+
"""Tests that describe config returns configs for topic
102+
"""
103+
configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.TOPIC, topic)])
104+
105+
assert len(configs) == 1
106+
assert configs[0].resources[0][2] == ConfigResourceType.TOPIC
107+
assert configs[0].resources[0][3] == topic
108+
assert len(configs[0].resources[0][4]) > 1
109+
110+
111+
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11")
112+
def test_describe_configs_mixed_resources_returns_configs(topic, kafka_admin_client):
113+
"""Tests that describe config returns configs for mixed resource types (topic + broker)
114+
"""
115+
broker_id = kafka_admin_client._client.cluster._brokers[0].nodeId
116+
configs = kafka_admin_client.describe_configs([
117+
ConfigResource(ConfigResourceType.TOPIC, topic),
118+
ConfigResource(ConfigResourceType.BROKER, broker_id)])
119+
120+
assert len(configs) == 2
121+
122+
for config in configs:
123+
assert (config.resources[0][2] == ConfigResourceType.TOPIC
124+
and config.resources[0][3] == topic) or \
125+
(config.resources[0][2] == ConfigResourceType.BROKER
126+
and config.resources[0][3] == str(broker_id))
127+
assert len(config.resources[0][4]) > 1
128+
129+
130+
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11")
131+
def test_describe_configs_invalid_broker_id_raises(kafka_admin_client):
132+
"""Tests that describe config raises exception on non-integer broker id
133+
"""
134+
broker_id = "str"
135+
136+
with pytest.raises(ValueError):
137+
configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)])

0 commit comments

Comments
 (0)