From 83d1910ac4e800c8aa132fa2bb8d7d889ee688f1 Mon Sep 17 00:00:00 2001 From: abilan Date: Mon, 9 Jan 2023 14:01:49 -0500 Subject: [PATCH] GH-3986: Integration mock: fix ReactiveMH support Fixes https://github.com/spring-projects/spring-integration/issues/3986 The `ReactiveStreamsConsumer` in addition to plain `Subscriber` and `MessageHandler` also supports a `ReactiveMessageHandler`, but `MockIntegrationContext` doesn't handle a scenario when `ReactiveMessageHandler` is provided for consumer * Rework the logic in the `MockIntegrationContext` to substitute a `ReactiveMessageHandler` in the `ReactiveStreamsConsumer` which has a precedence in its logic over plain `Subscriber`. * Wrap a plain `MessageHandler` mock into a `ReactiveMessageHandler` before substitution in the `ReactiveStreamsConsumer` * Reset `ReactiveStreamsConsumer.reactiveMessageHandler` with source `ReactiveMessageHandler` or `null` respectively to an original `ReactiveStreamsConsumer` configuration * Mention `ReactiveMessageHandler` use-case in the `testing.adoc` **Cherry-pick to `5.5.x`** --- .../test/context/MockIntegrationContext.java | 30 +++++++++++++----- .../test/mock/MockMessageHandlerTests.java | 31 ++++++++++++++++++- src/reference/asciidoc/testing.adoc | 2 ++ 3 files changed, 54 insertions(+), 9 deletions(-) diff --git a/spring-integration-test/src/main/java/org/springframework/integration/test/context/MockIntegrationContext.java b/spring-integration-test/src/main/java/org/springframework/integration/test/context/MockIntegrationContext.java index b03f8ec942e..3723274de11 100644 --- a/spring-integration-test/src/main/java/org/springframework/integration/test/context/MockIntegrationContext.java +++ b/spring-integration-test/src/main/java/org/springframework/integration/test/context/MockIntegrationContext.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2022 the original author or authors. + * Copyright 2017-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; +import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; @@ -45,6 +46,7 @@ import org.springframework.integration.test.util.TestUtils; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.ReactiveMessageHandler; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; @@ -151,9 +153,15 @@ private void resetBean(Object endpoint, Object handler) { directFieldAccessor.setPropertyValue("source", handler); } else if (endpoint instanceof ReactiveStreamsConsumer) { - Tuple2 value = (Tuple2) handler; - directFieldAccessor.setPropertyValue(HANDLER, value.getT1()); - directFieldAccessor.setPropertyValue("subscriber", value.getT2()); + if (handler instanceof Tuple2) { + Tuple2 value = (Tuple2) handler; + directFieldAccessor.setPropertyValue(HANDLER, value.getT1()); + directFieldAccessor.setPropertyValue("reactiveMessageHandler", value.getT2()); + } + else { + directFieldAccessor.setPropertyValue(HANDLER, handler); + directFieldAccessor.setPropertyValue("reactiveMessageHandler", null); + } } else if (endpoint instanceof IntegrationConsumer) { directFieldAccessor.setPropertyValue(HANDLER, handler); @@ -206,9 +214,13 @@ public void substituteMessageHandlerFor(String consumerEndpointId, // NOSONAR - Object targetMessageHandler = directFieldAccessor.getPropertyValue(HANDLER); Assert.notNull(targetMessageHandler, () -> "'handler' must not be null in the: " + endpoint); if (endpoint instanceof ReactiveStreamsConsumer) { - Object targetSubscriber = directFieldAccessor.getPropertyValue("subscriber"); - Assert.notNull(targetSubscriber, () -> "'subscriber' must not be null in the: " + endpoint); - this.beans.put(consumerEndpointId, Tuples.of(targetMessageHandler, targetSubscriber)); + Object targetReactiveMessageHandler = directFieldAccessor.getPropertyValue("reactiveMessageHandler"); + if (targetReactiveMessageHandler != null) { + this.beans.put(consumerEndpointId, Tuples.of(targetMessageHandler, targetReactiveMessageHandler)); + } + else { + this.beans.put(consumerEndpointId, targetMessageHandler); + } } else { this.beans.put(consumerEndpointId, targetMessageHandler); @@ -236,7 +248,9 @@ public void substituteMessageHandlerFor(String consumerEndpointId, // NOSONAR - directFieldAccessor.setPropertyValue(HANDLER, mockMessageHandler); if (endpoint instanceof ReactiveStreamsConsumer) { - directFieldAccessor.setPropertyValue("subscriber", mockMessageHandler); + ReactiveMessageHandler reactiveMessageHandler = + (message) -> Mono.fromRunnable(() -> mockMessageHandler.handleMessage(message)); + directFieldAccessor.setPropertyValue("reactiveMessageHandler", reactiveMessageHandler); } if (autoStartup && endpoint instanceof Lifecycle) { diff --git a/spring-integration-test/src/test/java/org/springframework/integration/test/mock/MockMessageHandlerTests.java b/spring-integration-test/src/test/java/org/springframework/integration/test/mock/MockMessageHandlerTests.java index 91e70dfbcf2..159b83310f7 100644 --- a/spring-integration-test/src/test/java/org/springframework/integration/test/mock/MockMessageHandlerTests.java +++ b/spring-integration-test/src/test/java/org/springframework/integration/test/mock/MockMessageHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2022 the original author or authors. + * Copyright 2017-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.reactivestreams.Subscriber; +import reactor.core.publisher.Mono; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; @@ -30,6 +31,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.EndpointId; import org.springframework.integration.annotation.Poller; +import org.springframework.integration.annotation.Reactive; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.QueueChannel; @@ -48,6 +50,7 @@ import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.PollableChannel; +import org.springframework.messaging.ReactiveMessageHandler; import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.support.GenericMessage; import org.springframework.test.annotation.DirtiesContext; @@ -271,6 +274,24 @@ public void testMockIntegrationContextReset() { } + @Autowired + private MessageChannel reactiveInputChannel; + + @Test + void reactiveMessageHandlerSubstitution() { + MockMessageHandler mockMessageHandler = + mockMessageHandler() + .handleNext(message -> { + }); + + this.mockIntegrationContext.substituteMessageHandlerFor("reactiveEndpoint", mockMessageHandler); + + this.reactiveInputChannel.send(new GenericMessage<>("test")); + + verify(mockMessageHandler).handleMessage(any(Message.class)); + } + + @Configuration @EnableIntegration public static class Config { @@ -341,6 +362,14 @@ public MessageHandler logHandler() { return new LoggingHandler(LoggingHandler.Level.FATAL); } + + @Bean + @EndpointId("reactiveEndpoint") + @ServiceActivator(inputChannel = "reactiveInputChannel", reactive = @Reactive) + public ReactiveMessageHandler reactiveMessageHandler() { + return message -> Mono.empty(); + } + } } diff --git a/src/reference/asciidoc/testing.adoc b/src/reference/asciidoc/testing.adoc index 2338b4c565a..29439e36a25 100644 --- a/src/reference/asciidoc/testing.adoc +++ b/src/reference/asciidoc/testing.adoc @@ -347,6 +347,8 @@ assertSame(message, messageArgumentCaptor.getValue()); ---- ==== +NOTE: The regular `MessageHandler` mocking (or `MockMessageHandler`) has to be used even for a `ReactiveStreamsConsumer` with a `ReactiveMessageHandler` configuration. + See the https://docs.spring.io/spring-integration/api/org/springframework/integration/test/mock/MockIntegration.html[`MockIntegration`] and https://docs.spring.io/spring-integration/api/org/springframework/integration/test/mock/MockMessageHandler.html[`MockMessageHandler`] Javadoc for more information. [[testing-other-resources]]