Skip to content

Commit b146e4e

Browse files
authored
Apply Nullability to spring-integration-kafka module
Related to: #10083 Signed-off-by: Jiandong Ma <[email protected]>
1 parent a41c868 commit b146e4e

27 files changed

+192
-152
lines changed

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/AbstractKafkaChannel.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2022 the original author or authors.
2+
* Copyright 2020-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,8 @@
2020
import java.util.concurrent.TimeUnit;
2121
import java.util.concurrent.TimeoutException;
2222

23+
import org.jspecify.annotations.Nullable;
24+
2325
import org.springframework.integration.channel.AbstractMessageChannel;
2426
import org.springframework.kafka.core.KafkaOperations;
2527
import org.springframework.kafka.support.KafkaHeaders;
@@ -42,7 +44,7 @@ public abstract class AbstractKafkaChannel extends AbstractMessageChannel {
4244

4345
protected final String topic; // NOSONAR final
4446

45-
private String groupId;
47+
private @Nullable String groupId;
4648

4749
/**
4850
* Construct an instance with the provided {@link KafkaOperations} and topic.
@@ -64,7 +66,7 @@ public void setGroupId(String groupId) {
6466
this.groupId = groupId;
6567
}
6668

67-
protected String getGroupId() {
69+
protected @Nullable String getGroupId() {
6870
return this.groupId;
6971
}
7072

@@ -82,7 +84,8 @@ protected boolean doSend(Message<?> message, long timeout) {
8284
return false;
8385
}
8486
catch (ExecutionException e) {
85-
this.logger.error(e.getCause(), () -> "Interrupted while waiting for send result for: " + message);
87+
Throwable cause = e.getCause() != null ? e.getCause() : e;
88+
this.logger.error(cause, () -> "Interrupted while waiting for send result for: " + message);
8689
return false;
8790
}
8891
catch (TimeoutException e) {

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/PollableKafkaChannel.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2024 the original author or authors.
2+
* Copyright 2020-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,12 +20,13 @@
2020
import java.util.Deque;
2121
import java.util.List;
2222

23+
import org.jspecify.annotations.Nullable;
24+
2325
import org.springframework.integration.channel.ExecutorChannelInterceptorAware;
2426
import org.springframework.integration.kafka.inbound.KafkaMessageSource;
2527
import org.springframework.integration.support.management.metrics.CounterFacade;
2628
import org.springframework.integration.support.management.metrics.MetricsCaptor;
2729
import org.springframework.kafka.core.KafkaOperations;
28-
import org.springframework.lang.Nullable;
2930
import org.springframework.messaging.Message;
3031
import org.springframework.messaging.PollableChannel;
3132
import org.springframework.messaging.support.ChannelInterceptor;
@@ -47,7 +48,7 @@ public class PollableKafkaChannel extends AbstractKafkaChannel
4748

4849
private final KafkaMessageSource<?, ?> source;
4950

50-
private CounterFacade receiveCounter;
51+
private @Nullable CounterFacade receiveCounter;
5152

5253
private volatile int executorInterceptorsSize;
5354

@@ -197,8 +198,8 @@ public boolean hasExecutorInterceptors() {
197198

198199
private static String topic(KafkaMessageSource<?, ?> source) {
199200
Assert.notNull(source, "'source' cannot be null");
200-
String[] topics = source.getConsumerProperties().getTopics();
201-
Assert.isTrue(topics != null && topics.length == 1, "Only one topic is allowed");
201+
@Nullable String @Nullable [] topics = source.getConsumerProperties().getTopics();
202+
Assert.isTrue(topics != null && topics.length == 1 && topics[0] != null, "Only one topic is allowed");
202203
return topics[0];
203204
}
204205

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/channel/SubscribableKafkaChannel.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2024 the original author or authors.
2+
* Copyright 2020-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import org.apache.kafka.clients.consumer.Consumer;
2020
import org.apache.kafka.clients.consumer.ConsumerRecord;
21+
import org.jspecify.annotations.Nullable;
2122

2223
import org.springframework.integration.dispatcher.MessageDispatcher;
2324
import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy;
@@ -56,8 +57,10 @@ public class SubscribableKafkaChannel extends AbstractKafkaChannel implements Su
5657

5758
private final IntegrationRecordMessageListener recordListener = new IntegrationRecordMessageListener();
5859

60+
@SuppressWarnings("NullAway.Init")
5961
private MessageDispatcher dispatcher;
6062

63+
@SuppressWarnings("NullAway.Init")
6164
private MessageListenerContainer container;
6265

6366
private boolean autoStartup = true;
@@ -183,8 +186,8 @@ private class IntegrationRecordMessageListener extends RecordMessagingMessageLis
183186
}
184187

185188
@Override
186-
public void onMessage(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment,
187-
Consumer<?, ?> consumer) {
189+
public void onMessage(ConsumerRecord<Object, Object> record, @Nullable Acknowledgment acknowledgment,
190+
@Nullable Consumer<?, ?> consumer) {
188191

189192
SubscribableKafkaChannel.this.dispatcher.dispatch(toMessagingMessage(record, acknowledgment, consumer));
190193
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/**
22
* Provides classes related to message channel implementations for Apache Kafka.
33
*/
4+
@org.jspecify.annotations.NullMarked
45
package org.springframework.integration.kafka.channel;

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/config/xml/KafkaChannelParser.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.integration.kafka.config.xml;
1818

19+
import org.jspecify.annotations.Nullable;
1920
import org.w3c.dom.Element;
2021

2122
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
@@ -38,7 +39,7 @@
3839
public class KafkaChannelParser extends AbstractChannelParser {
3940

4041
@Override
41-
protected BeanDefinitionBuilder buildBeanDefinition(Element element, ParserContext parserContext) {
42+
protected @Nullable BeanDefinitionBuilder buildBeanDefinition(Element element, ParserContext parserContext) {
4243
BeanDefinitionBuilder builder;
4344
String factory = element.getAttribute("container-factory");
4445
boolean hasFactory = StringUtils.hasText(factory);
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/**
22
* Provides parser classes to provide Xml namespace support for the Apache Kafka components.
33
*/
4+
@org.jspecify.annotations.NullMarked
45
package org.springframework.integration.kafka.config.xml;

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/AbstractKafkaChannelSpec.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2023 the original author or authors.
2+
* Copyright 2020-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,9 +16,10 @@
1616

1717
package org.springframework.integration.kafka.dsl;
1818

19+
import org.jspecify.annotations.Nullable;
20+
1921
import org.springframework.integration.dsl.MessageChannelSpec;
2022
import org.springframework.integration.kafka.channel.AbstractKafkaChannel;
21-
import org.springframework.lang.Nullable;
2223

2324
/**
2425
*

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaMessageListenerContainerSpec.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2023 the original author or authors.
2+
* Copyright 2018-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020

2121
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
2222
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
23+
import org.jspecify.annotations.Nullable;
2324

2425
import org.springframework.core.task.AsyncTaskExecutor;
2526
import org.springframework.integration.dsl.IntegrationComponentSpec;
@@ -28,7 +29,6 @@
2829
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
2930
import org.springframework.kafka.listener.ContainerProperties;
3031
import org.springframework.kafka.support.TopicPartitionOffset;
31-
import org.springframework.lang.Nullable;
3232

3333
/**
3434
* A helper class in the Builder pattern style to delegate options to the

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaTemplateSpec.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2023 the original author or authors.
2+
* Copyright 2018-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,12 +16,13 @@
1616

1717
package org.springframework.integration.kafka.dsl;
1818

19+
import org.jspecify.annotations.Nullable;
20+
1921
import org.springframework.integration.dsl.IntegrationComponentSpec;
2022
import org.springframework.kafka.core.KafkaTemplate;
2123
import org.springframework.kafka.core.ProducerFactory;
2224
import org.springframework.kafka.support.ProducerListener;
2325
import org.springframework.kafka.support.converter.RecordMessageConverter;
24-
import org.springframework.lang.Nullable;
2526

2627
/**
2728
* An {@link IntegrationComponentSpec} implementation for the {@link KafkaTemplate}.
Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
/**
22
* Provides Spring Integration Java DSL Components support for Apache Kafka.
33
*/
4-
@org.springframework.lang.NonNullApi
5-
@org.springframework.lang.NonNullFields
4+
@org.jspecify.annotations.NullMarked
65
package org.springframework.integration.kafka.dsl;

0 commit comments

Comments
 (0)