We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
There was an error while loading. Please reload this page.
if (message != null) {
IntegrationReactiveUtils
1 parent 6d77ce0 commit 443eb93Copy full SHA for 443eb93
spring-integration-core/src/main/java/org/springframework/integration/util/IntegrationReactiveUtils.java
@@ -133,7 +133,9 @@ public static <T> Flux<Message<T>> messageSourceToFlux(MessageSource<T> messageS
133
<Message<T>>create(monoSink ->
134
monoSink.onRequest(value -> monoSink.success(messageSource.receive())))
135
.doOnSuccess((message) -> {
136
- AckUtils.autoAck(StaticMessageHeaderAccessor.getAcknowledgmentCallback(message));
+ if (message != null) {
137
+ AckUtils.autoAck(StaticMessageHeaderAccessor.getAcknowledgmentCallback(message));
138
+ }
139
})
140
.doOnError(MessagingException.class,
141
(ex) -> {
0 commit comments