From 5cabe3ca0ac760580ac9c635f958f40cea062192 Mon Sep 17 00:00:00 2001 From: Glenn Renfro Date: Tue, 22 Jul 2025 12:14:53 -0400 Subject: [PATCH 1/4] GH-10083: Apply Nullability to core dispatcher package PartititonDispatcher.partition will always return at least 1 partition. So return value is wrapped with Objects.requireNonNull, for nullaway Related to: https://github.com/spring-projects/spring-integration/issues/10083 --- .../integration/dispatcher/AbstractDispatcher.java | 3 ++- .../dispatcher/AggregateMessageDeliveryException.java | 5 ++++- .../integration/dispatcher/BroadcastingDispatcher.java | 8 ++++++-- .../integration/dispatcher/PartitionedDispatcher.java | 9 +++++---- .../integration/dispatcher/UnicastingDispatcher.java | 5 +++-- .../integration/dispatcher/package-info.java | 1 + 6 files changed, 21 insertions(+), 10 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/AbstractDispatcher.java b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/AbstractDispatcher.java index 8f2ea397750..fa027359901 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/AbstractDispatcher.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/AbstractDispatcher.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.jspecify.annotations.Nullable; import org.springframework.integration.support.utils.IntegrationUtils; import org.springframework.messaging.Message; @@ -53,7 +54,7 @@ public abstract class AbstractDispatcher implements MessageDispatcher { private volatile int maxSubscribers = Integer.MAX_VALUE; - private volatile MessageHandler theOneHandler; + private volatile @Nullable MessageHandler theOneHandler; private final Lock lock = new ReentrantLock(); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/AggregateMessageDeliveryException.java b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/AggregateMessageDeliveryException.java index 49c1993920b..2dba0a2a698 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/AggregateMessageDeliveryException.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/AggregateMessageDeliveryException.java @@ -19,6 +19,8 @@ import java.util.ArrayList; import java.util.List; +import org.jspecify.annotations.Nullable; + import org.springframework.messaging.Message; import org.springframework.messaging.MessageDeliveryException; import org.springframework.util.StringUtils; @@ -30,6 +32,7 @@ * @author Mark Fisher * @author Artem Bilan * @author Gary Russell + * @author Glenn Renfro * * @since 1.0.3 */ @@ -69,7 +72,7 @@ public String getMessage() { return message.toString(); } - private String appendPeriodIfNecessary(String baseMessage) { + private String appendPeriodIfNecessary(@Nullable String baseMessage) { if (!StringUtils.hasText(baseMessage)) { return ""; } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java index fdc4461c00e..f24b2b6cd1c 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java @@ -20,6 +20,8 @@ import java.util.UUID; import java.util.concurrent.Executor; +import org.jspecify.annotations.Nullable; + import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; @@ -56,12 +58,13 @@ * @author Gary Russell * @author Oleg Zhurakousky * @author Artem Bilan + * @author Glenn Renfro */ public class BroadcastingDispatcher extends AbstractDispatcher implements BeanFactoryAware { private final boolean requireSubscribers; - private final Executor executor; + private final @Nullable Executor executor; private boolean ignoreFailures; @@ -71,6 +74,7 @@ public class BroadcastingDispatcher extends AbstractDispatcher implements BeanFa private MessageHandlingTaskDecorator messageHandlingTaskDecorator = task -> task; + @SuppressWarnings("NullAway.Init") private BeanFactory beanFactory; private volatile MessageBuilderFactory messageBuilderFactory = new DefaultMessageBuilderFactory(); @@ -89,7 +93,7 @@ public BroadcastingDispatcher(boolean requireSubscribers) { this(null, requireSubscribers); } - public BroadcastingDispatcher(Executor executor, boolean requireSubscribers) { + public BroadcastingDispatcher(@Nullable Executor executor, boolean requireSubscribers) { this.requireSubscribers = requireSubscribers; this.executor = executor; } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java index 0e7b97b2097..526def5f7ad 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -54,6 +55,7 @@ * * @author Artem Bilan * @author Christian Tzolov + * @author Glenn Renfro * * @since 6.1 */ @@ -71,10 +73,9 @@ public class PartitionedDispatcher extends AbstractDispatcher { private Predicate failoverStrategy = (exception) -> true; - @Nullable - private LoadBalancingStrategy loadBalancingStrategy; + private @Nullable LoadBalancingStrategy loadBalancingStrategy; - private ErrorHandler errorHandler; + private @Nullable ErrorHandler errorHandler; private MessageHandlingTaskDecorator messageHandlingTaskDecorator = task -> task; @@ -168,7 +169,7 @@ public boolean dispatch(Message message) { populatedPartitions(); int partition = Math.abs(this.partitionKeyFunction.apply(message).hashCode()) % this.partitionCount; UnicastingDispatcher partitionDispatcher = this.partitions.get(partition); - return partitionDispatcher.dispatch(message); + return Objects.requireNonNull(partitionDispatcher).dispatch(message); } private void populatedPartitions() { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/UnicastingDispatcher.java b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/UnicastingDispatcher.java index 1966f7414bb..0bd51514e4b 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/UnicastingDispatcher.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/UnicastingDispatcher.java @@ -51,6 +51,7 @@ * @author Gary Russell * @author Oleg Zhurakousky * @author Artem Bilan + * @author Glenn Renfro * * @since 1.0.2 */ @@ -58,11 +59,11 @@ public class UnicastingDispatcher extends AbstractDispatcher { private final MessageHandler dispatchHandler = this::doDispatch; - private final Executor executor; + private final @Nullable Executor executor; private Predicate failoverStrategy = (exception) -> true; - private LoadBalancingStrategy loadBalancingStrategy; + private @Nullable LoadBalancingStrategy loadBalancingStrategy; private MessageHandlingTaskDecorator messageHandlingTaskDecorator = task -> task; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/package-info.java b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/package-info.java index 126e182014a..075712e7f83 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/package-info.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/package-info.java @@ -1,4 +1,5 @@ /** * Provides classes related to dispatching messages. */ +@org.jspecify.annotations.NullMarked package org.springframework.integration.dispatcher; From 3939ef12f227d4daa017828a1b970305cf760b07 Mon Sep 17 00:00:00 2001 From: Glenn Renfro Date: Mon, 28 Jul 2025 15:06:24 -0400 Subject: [PATCH 2/4] Due to the high use of the `dispatch` method it is better to suppress the `NullAway` error, than take the performance hit of using `requiresNonNull` --- .../integration/dispatcher/PartitionedDispatcher.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java index 526def5f7ad..3a6b2bae01a 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java @@ -20,7 +20,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -165,11 +164,12 @@ public void shutdown() { } @Override + @SuppressWarnings("NullAway") public boolean dispatch(Message message) { populatedPartitions(); int partition = Math.abs(this.partitionKeyFunction.apply(message).hashCode()) % this.partitionCount; UnicastingDispatcher partitionDispatcher = this.partitions.get(partition); - return Objects.requireNonNull(partitionDispatcher).dispatch(message); + return partitionDispatcher.dispatch(message); } private void populatedPartitions() { From b0ecd6e770432e1c19e054fd6328d705fa4bbe0a Mon Sep 17 00:00:00 2001 From: Glenn Renfro Date: Mon, 28 Jul 2025 16:10:56 -0400 Subject: [PATCH 3/4] Add comment to `PartitionedDispatcher.dispatch` stating why a `@SuppressWarning` is needed. --- .../integration/dispatcher/PartitionedDispatcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java index 3a6b2bae01a..56f25ce8397 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java @@ -164,7 +164,7 @@ public void shutdown() { } @Override - @SuppressWarnings("NullAway") + @SuppressWarnings("NullAway") // Dataflow analysis limitation! public boolean dispatch(Message message) { populatedPartitions(); int partition = Math.abs(this.partitionKeyFunction.apply(message).hashCode()) % this.partitionCount; From 2044342d9c229dd0062bf2b10a916489781df333 Mon Sep 17 00:00:00 2001 From: Glenn Renfro Date: Tue, 29 Jul 2025 08:07:42 -0400 Subject: [PATCH 4/4] Correct comment PartitionedDispatcher.dispatch to state that partitionsMap is never null --- .../integration/dispatcher/PartitionedDispatcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java index 56f25ce8397..6037a26a28a 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java @@ -164,7 +164,7 @@ public void shutdown() { } @Override - @SuppressWarnings("NullAway") // Dataflow analysis limitation! + @SuppressWarnings("NullAway") // The partitions map never returns null according to partition hash public boolean dispatch(Message message) { populatedPartitions(); int partition = Math.abs(this.partitionKeyFunction.apply(message).hashCode()) % this.partitionCount;