diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java index 55b051ecb7e..8142d00ee04 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java @@ -149,9 +149,15 @@ public final boolean isRunning() { public final void start() { this.lifecycleLock.lock(); try { - if (!this.running) { + if (!this.running && !this.active) { this.active = true; - doStart(); + try { + doStart(); + } + catch (RuntimeException ex) { + this.active = false; + throw ex; + } this.running = true; logger.info(() -> "started " + this); } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dsl/flowservices/FlowServiceTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dsl/flowservices/FlowServiceTests.java index cd0b5dcfe8e..981d786dc13 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/dsl/flowservices/FlowServiceTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/dsl/flowservices/FlowServiceTests.java @@ -25,6 +25,7 @@ import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; +import org.assertj.core.data.Percentage; import org.junit.jupiter.api.Test; import org.springframework.aop.framework.Advised; @@ -55,6 +56,7 @@ import org.springframework.messaging.MessageChannel; import org.springframework.messaging.PollableChannel; import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.support.GenericMessage; import org.springframework.scheduling.TriggerContext; import org.springframework.stereotype.Component; import org.springframework.test.annotation.DirtiesContext; @@ -118,6 +120,34 @@ public void testGatewayExplicitReplyChannel() { assertThat(message.getPayload()).isEqualTo("FOO"); } + @Autowired + @Qualifier("delaysBetweenPollsInput") + private MessageChannel delaysBetweenPollsInput; + + @Autowired + @Qualifier("delaysBetweenPollsOutput") + private PollableChannel delaysBetweenPollsOutput; + + @Test + public void noDoubleStartForEndpoints() { + this.delaysBetweenPollsInput.send(new GenericMessage<>("A,B")); + + Message receive1 = this.delaysBetweenPollsOutput.receive(10_000); + + assertThat(receive1).isNotNull() + .extracting(Message::getPayload) + .isEqualTo("A"); + + Message receive2 = this.delaysBetweenPollsOutput.receive(10_000); + + assertThat(receive2).isNotNull() + .extracting(Message::getPayload) + .isEqualTo("B"); + + assertThat(receive2.getHeaders().getTimestamp() - receive1.getHeaders().getTimestamp()) + .isCloseTo(500, Percentage.withPercentage(10)); + } + @Configuration @EnableIntegration @ComponentScan @@ -226,4 +256,23 @@ public String handle(String payload, @Header String foo) { } + @Component + public static class DelaysBetweenPollsAdapter extends IntegrationFlowAdapter { + + @ServiceActivator + public String handle(String payload) { + return payload; + } + + @Override + protected IntegrationFlowDefinition buildFlow() { + return from("delaysBetweenPollsInput") + .split(splitter -> splitter.delimiters(",")) + .channel(MessageChannels.queue()) + .handle(this, "handle", e -> e.poller(poller -> poller.fixedDelay(500).maxMessagesPerPoll(1))) + .channel(MessageChannels.queue("delaysBetweenPollsOutput")); + } + + } + }