diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java index 67428a30a2f..3a975e8bbbf 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java @@ -67,6 +67,8 @@ public class PostgresSubscribableChannel extends AbstractSubscribableChannel private Executor executor; + private volatile boolean hasHandlers; + /** * Create a subscribable channel for a Postgres database. * @param jdbcChannelMessageStore The message store to use for the relevant region. @@ -128,6 +130,7 @@ public boolean subscribe(MessageHandler handler) { boolean subscribed = super.subscribe(handler); if (this.dispatcher.getHandlerCount() == 1) { this.messageTableSubscriber.subscribe(this); + this.hasHandlers = true; notifyUpdate(); } return subscribed; @@ -138,6 +141,7 @@ public boolean unsubscribe(MessageHandler handle) { boolean unsubscribed = super.unsubscribe(handle); if (this.dispatcher.getHandlerCount() == 0) { this.messageTableSubscriber.unsubscribe(this); + this.hasHandlers = false; } return unsubscribed; } @@ -159,18 +163,7 @@ public void notifyUpdate() { try { Optional> dispatchedMessage; do { - if (this.transactionTemplate != null) { - dispatchedMessage = - this.retryTemplate.execute(context -> - this.transactionTemplate.execute(status -> - pollMessage() - .map(this::dispatch))); - } - else { - dispatchedMessage = - pollMessage() - .map(message -> this.retryTemplate.execute(context -> dispatch(message))); - } + dispatchedMessage = askForMessage(); } while (dispatchedMessage.isPresent()); } catch (Exception ex) { @@ -179,6 +172,20 @@ public void notifyUpdate() { }); } + private Optional> askForMessage() { + if (this.hasHandlers) { + if (this.transactionTemplate != null) { + return this.retryTemplate.execute(context -> + this.transactionTemplate.execute(status -> pollMessage().map(this::dispatch))); + } + else { + return pollMessage() + .map(message -> this.retryTemplate.execute(context -> dispatch(message))); + } + } + return Optional.empty(); + } + private Optional> pollMessage() { return Optional.ofNullable(this.jdbcChannelMessageStore.pollMessageFromGroup(this.groupId)); } diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java index bd7571afcc3..d443946fa02 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java @@ -45,6 +45,7 @@ import org.springframework.jdbc.datasource.init.DataSourceInitializer; import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator; import org.springframework.jdbc.datasource.init.ScriptUtils; +import org.springframework.messaging.MessageHandler; import org.springframework.messaging.support.GenericMessage; import org.springframework.retry.support.RetryTemplate; import org.springframework.test.annotation.DirtiesContext; @@ -172,14 +173,16 @@ void testMessagesDispatchedInTransaction() throws InterruptedException { postgresSubscribableChannel.setTransactionManager(transactionManager); postgresChannelMessageTableSubscriber.start(); - postgresSubscribableChannel.subscribe(message -> { + MessageHandler messageHandler = + message -> { try { throw new RuntimeException("An error has occurred"); } finally { latch.countDown(); } - }); + }; + postgresSubscribableChannel.subscribe(messageHandler); messageStore.addMessageToGroup(groupId, new GenericMessage<>("1")); messageStore.addMessageToGroup(groupId, new GenericMessage<>("2")); @@ -188,6 +191,7 @@ void testMessagesDispatchedInTransaction() throws InterruptedException { // Stop subscriber to unlock records from TX for the next verification postgresChannelMessageTableSubscriber.stop(); + postgresSubscribableChannel.unsubscribe(messageHandler); assertThat(messageStore.messageGroupSize(groupId)).isEqualTo(2); assertThat(messageStore.pollMessageFromGroup(groupId).getPayload()).isEqualTo("1");