Skip to content

Commit c84d0f7

Browse files
authored
GH-2859: Option to Manually Set the Cluster ID
Resolves #2859 * Suppress fetching cluster id in `initialize()` if set manually.
1 parent a455eed commit c84d0f7

File tree

2 files changed

+17
-2
lines changed

2 files changed

+17
-2
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,3 +109,6 @@ Starting with version 3.0.6, you can add dynamic tags to the timers and traces,
109109
To do so, add a custom `KafkaListenerObservationConvention` and/or `KafkaTemplateObservationConvention` to the listener container properties or `KafkaTemplate` respectively.
110110
The `record` property in both observation contexts contains the `ConsumerRecord` or `ProducerRecord` respectively.
111111

112+
The sender and receiver contexts' `remoteServiceName` properties are set to the Kafka `clusterId` property; this is retrieved by a `KafkaAdmin`.
113+
If, for some reason - perhaps lack of admin permissions, you cannot retrieve the cluster id, starting with version 3.1, you can set a manual `clusterId` on the `KafkaAdmin` and inject it into `KafkaTemplate` s and listener containers.
114+
When it is `null` (default), the admin will invoke the `describeCluster` admin operation to retrieve it from the broker.

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,16 @@ protected Predicate<NewTopic> getCreateOrModifyTopic() {
198198
return this.createOrModifyTopic;
199199
}
200200

201+
/**
202+
* Set the cluster id. Use this to prevent attempting to fetch the cluster id
203+
* from the broker, perhaps if the user does not have admin permissions.
204+
* @param clusterId the clusterId to set
205+
* @since 3.1
206+
*/
207+
public void setClusterId(String clusterId) {
208+
this.clusterId = clusterId;
209+
}
210+
201211
@Override
202212
public Map<String, Object> getConfigurationProperties() {
203213
Map<String, Object> configs2 = new HashMap<>(this.configs);
@@ -240,8 +250,10 @@ public final boolean initialize() {
240250
if (adminClient != null) {
241251
try {
242252
synchronized (this) {
243-
this.clusterId = adminClient.describeCluster().clusterId().get(this.operationTimeout,
244-
TimeUnit.SECONDS);
253+
if (this.clusterId != null) {
254+
this.clusterId = adminClient.describeCluster().clusterId().get(this.operationTimeout,
255+
TimeUnit.SECONDS);
256+
}
245257
}
246258
addOrModifyTopicsIfNeeded(adminClient, newTopics);
247259
return true;

0 commit comments

Comments
 (0)