From 5432e73cac72683ac1c4b5b9c91c8b2c5534d0de Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Wed, 26 Oct 2022 13:48:39 -0400 Subject: [PATCH 1/4] Fix double start for `AbstractEndpoint` When we use POJO methods in the `IntegrationFlowAdapter`, the `IntegrationFlowAdapter` is set as a `target` for the `MessagingMethodInvokerHelper`. When endpoint is started by the application context, such a `start()` is propagated down to the `MessagingMethodInvokerHelper`. And in our case back into an `IntegrationFlowAdapter` instance. This one, in turn, starts its `IntegrationFlow` internally which leads to the start of the mentioned endpoint in the beginning. Therefore, we cause a recursive `start()` call on this endpoint from itself. The `running` flag is set when we are already done with the `doStart()` logic. Therefore a recursive `start()` call leads to two concurrent polling tasks in the `AbstractPollingEndpoint`. * Check also for the `active` flag in the `AbstractEndpoint.start()` and reset it in case of exception in the `doStart()` **Cherry-pick to `5.5.x`** --- .../endpoint/AbstractEndpoint.java | 10 +++- .../dsl/flowservices/FlowServiceTests.java | 48 +++++++++++++++++++ 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java index 55b051ecb7e..3a868024380 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java @@ -149,9 +149,15 @@ public final boolean isRunning() { public final void start() { this.lifecycleLock.lock(); try { - if (!this.running) { + if (!this.running && !this.active) { this.active = true; - doStart(); + try { + doStart(); + } + catch (Exception ex) { + this.active = false; + throw new IllegalStateException("Cannot start endpoint", ex); + } this.running = true; logger.info(() -> "started " + this); } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dsl/flowservices/FlowServiceTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dsl/flowservices/FlowServiceTests.java index cd0b5dcfe8e..d525f3fa705 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/dsl/flowservices/FlowServiceTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/dsl/flowservices/FlowServiceTests.java @@ -55,6 +55,7 @@ import org.springframework.messaging.MessageChannel; import org.springframework.messaging.PollableChannel; import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.support.GenericMessage; import org.springframework.scheduling.TriggerContext; import org.springframework.stereotype.Component; import org.springframework.test.annotation.DirtiesContext; @@ -118,6 +119,34 @@ public void testGatewayExplicitReplyChannel() { assertThat(message.getPayload()).isEqualTo("FOO"); } + @Autowired + @Qualifier("delaysBetweenPollsInput") + private MessageChannel delaysBetweenPollsInput; + + @Autowired + @Qualifier("delaysBetweenPollsOutput") + private PollableChannel delaysBetweenPollsOutput; + + @Test + public void noDoubleStartForEndpoints() { + this.delaysBetweenPollsInput.send(new GenericMessage<>("A,B")); + + Message receive1 = this.delaysBetweenPollsOutput.receive(10_000); + + assertThat(receive1).isNotNull() + .extracting(Message::getPayload) + .isEqualTo("A"); + + Message receive2 = this.delaysBetweenPollsOutput.receive(10_000); + + assertThat(receive2).isNotNull() + .extracting(Message::getPayload) + .isEqualTo("B"); + + assertThat(receive2.getHeaders().getTimestamp() - receive1.getHeaders().getTimestamp()) + .isGreaterThanOrEqualTo(500); + } + @Configuration @EnableIntegration @ComponentScan @@ -226,4 +255,23 @@ public String handle(String payload, @Header String foo) { } + @Component + public static class DelaysBetweenPollsAdapter extends IntegrationFlowAdapter { + + @ServiceActivator + public String handle(String payload) { + return payload; + } + + @Override + protected IntegrationFlowDefinition buildFlow() { + return from("delaysBetweenPollsInput") + .split(splitter -> splitter.delimiters(",")) + .channel(MessageChannels.queue()) + .handle(this, "handle", e -> e.poller(poller -> poller.fixedDelay(500).maxMessagesPerPoll(1))) + .channel(MessageChannels.queue("delaysBetweenPollsOutput")); + } + + } + } From b5ac71e8a6a0b81c08570908c170657da4277896 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Wed, 26 Oct 2022 14:05:54 -0400 Subject: [PATCH 2/4] * Change assert for message timestamps to `isCloseTo()` with percentage --- .../integration/dsl/flowservices/FlowServiceTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dsl/flowservices/FlowServiceTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dsl/flowservices/FlowServiceTests.java index d525f3fa705..981d786dc13 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/dsl/flowservices/FlowServiceTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/dsl/flowservices/FlowServiceTests.java @@ -25,6 +25,7 @@ import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; +import org.assertj.core.data.Percentage; import org.junit.jupiter.api.Test; import org.springframework.aop.framework.Advised; @@ -144,7 +145,7 @@ public void noDoubleStartForEndpoints() { .isEqualTo("B"); assertThat(receive2.getHeaders().getTimestamp() - receive1.getHeaders().getTimestamp()) - .isGreaterThanOrEqualTo(500); + .isCloseTo(500, Percentage.withPercentage(10)); } @Configuration From 7bd3e448b9fae991d0e0548d9c1d143499896809 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Wed, 26 Oct 2022 15:08:53 -0400 Subject: [PATCH 3/4] * Change `catch` in the `AbstractEndpoint.start()` to `RuntimeException`: the `doStart()` cannot throw unchecked exceptions by definition --- .../integration/endpoint/AbstractEndpoint.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java index 3a868024380..3fcc01e4a87 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java @@ -16,6 +16,7 @@ package org.springframework.integration.endpoint; +import java.lang.reflect.UndeclaredThrowableException; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -154,9 +155,9 @@ public final void start() { try { doStart(); } - catch (Exception ex) { + catch (RuntimeException ex) { this.active = false; - throw new IllegalStateException("Cannot start endpoint", ex); + throw ex; } this.running = true; logger.info(() -> "started " + this); From 1476c8147bea02330d66592cf666b4f263c92e9c Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Wed, 26 Oct 2022 15:15:15 -0400 Subject: [PATCH 4/4] * Fix imports in `AbstractEndpoint` --- .../springframework/integration/endpoint/AbstractEndpoint.java | 1 - 1 file changed, 1 deletion(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java index 3fcc01e4a87..8142d00ee04 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java @@ -16,7 +16,6 @@ package org.springframework.integration.endpoint; -import java.lang.reflect.UndeclaredThrowableException; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock;