Skip to content

Commit 7066745

Browse files
authored
GH-2853: Handle null Cluster ID
Resolves #2853 Apparently, some third party Kafka replacements can return a `null` `clusterId`. This causes an attempt to retrieve it on every operation. **cherry-pick to 3.0.x**
1 parent 61bc219 commit 7066745

File tree

2 files changed

+26
-1
lines changed

2 files changed

+26
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,9 @@ public String clusterId() {
321321
if (this.clusterId == null) {
322322
try (AdminClient client = createAdmin()) {
323323
this.clusterId = client.describeCluster().clusterId().get(this.operationTimeout, TimeUnit.SECONDS);
324+
if (this.clusterId == null) {
325+
this.clusterId = "null";
326+
}
324327
}
325328
catch (InterruptedException ex) {
326329
Thread.currentThread().interrupt();
@@ -358,7 +361,7 @@ public Map<String, TopicDescription> describeTopics(String... topicNames) {
358361
}
359362
}
360363

361-
private AdminClient createAdmin() {
364+
AdminClient createAdmin() {
362365
Map<String, Object> configs2 = new HashMap<>(this.configs);
363366
checkBootstrap(configs2);
364367
return AdminClient.create(configs2);

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
2121
import static org.awaitility.Awaitility.await;
22+
import static org.mockito.BDDMockito.given;
23+
import static org.mockito.Mockito.mock;
2224

2325
import java.lang.reflect.Method;
2426
import java.util.Arrays;
@@ -37,11 +39,13 @@
3739
import org.apache.kafka.clients.admin.AlterConfigOp;
3840
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
3941
import org.apache.kafka.clients.admin.ConfigEntry;
42+
import org.apache.kafka.clients.admin.DescribeClusterResult;
4043
import org.apache.kafka.clients.admin.DescribeConfigsResult;
4144
import org.apache.kafka.clients.admin.DescribeTopicsResult;
4245
import org.apache.kafka.clients.admin.NewPartitions;
4346
import org.apache.kafka.clients.admin.NewTopic;
4447
import org.apache.kafka.clients.admin.TopicDescription;
48+
import org.apache.kafka.common.KafkaFuture;
4549
import org.apache.kafka.common.config.ConfigResource;
4650
import org.apache.kafka.common.config.ConfigResource.Type;
4751
import org.apache.kafka.common.config.TopicConfig;
@@ -268,6 +272,24 @@ void toggleBootstraps() {
268272
.isEqualTo("a,b,c");
269273
}
270274

275+
@Test
276+
void nullClusterId() {
277+
AdminClient mock = mock(AdminClient.class);
278+
DescribeClusterResult result = mock(DescribeClusterResult.class);
279+
KafkaFuture<String> fut = KafkaFuture.completedFuture(null);
280+
given(result.clusterId()).willReturn(fut);
281+
given(mock.describeCluster()).willReturn(result);
282+
KafkaAdmin admin = new KafkaAdmin(Map.of()) {
283+
284+
@Override
285+
AdminClient createAdmin() {
286+
return mock;
287+
}
288+
289+
};
290+
assertThat(admin.clusterId()).isEqualTo("null");
291+
}
292+
271293
@Configuration
272294
public static class Config {
273295

0 commit comments

Comments
 (0)