diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/ConsumerEndpointSpec.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/ConsumerEndpointSpec.java index 227fd75a015..0d276e6e9e0 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/ConsumerEndpointSpec.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/ConsumerEndpointSpec.java @@ -223,12 +223,16 @@ public S transactional(boolean handleMessageAdvice) { /** * Specify a {@link BiFunction} for customizing {@link Mono} replies via {@link ReactiveRequestHandlerAdvice}. * @param replyCustomizer the {@link BiFunction} to propagate into {@link ReactiveRequestHandlerAdvice}. + * @param inbound reply payload. + * @param outbound reply payload. * @return the spec. * @since 5.3 * @see ReactiveRequestHandlerAdvice */ - public S customizeMonoReply(BiFunction, Mono, Publisher> replyCustomizer) { - return advice(new ReactiveRequestHandlerAdvice(replyCustomizer)); + @SuppressWarnings({ "unchecked", "rawtypes"}) + public S customizeMonoReply(BiFunction, Mono, Publisher> replyCustomizer) { + return advice(new ReactiveRequestHandlerAdvice( + (BiFunction, Mono, Publisher>) (BiFunction) replyCustomizer)); } /** diff --git a/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/dsl/WebFluxDslTests.java b/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/dsl/WebFluxDslTests.java index 202b39f0aa5..9305fa4634d 100644 --- a/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/dsl/WebFluxDslTests.java +++ b/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/dsl/WebFluxDslTests.java @@ -91,6 +91,7 @@ import org.springframework.web.reactive.config.EnableWebFlux; import org.springframework.web.reactive.config.WebFluxConfigurer; import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClientResponseException; import org.springframework.web.util.UriComponentsBuilder; import reactor.core.publisher.Flux; @@ -422,8 +423,12 @@ public IntegrationFlow webFluxFlowWithReplyPayloadToFlux() { .id("webFluxWithReplyPayloadToFlux") .customizeMonoReply( (message, mono) -> - mono.timeout(Duration.ofMillis(100)) - .retry())); + mono. + timeout(Duration.ofMillis(100)) + .retry() + .onErrorResume( + WebClientResponseException.NotFound.class, + ex -> Mono.just("Not Found")))); } @Bean