From e1e76fb751f1ea250b9e30b07ba64f28b6bf3ba4 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Fri, 1 Sep 2023 10:29:19 -0400 Subject: [PATCH] Fix PostgresSubscribableChannel.notifyUpdate() When transaction is configured for the `PostgresSubscribableChannel.notifyUpdate()` and it is rolled back, the next poll in that loop will return the same message. Again and again if transaction is always rolled back. This leads to the condition when we never leave this loop even if we fully unsubscribed from this channel. The issue has need spotted after introducing `SKIP LOCKED` for `PostgresChannelMessageStoreQueryProvider` which leads to the locked record in DB in the mentioned above transaction. * Introduce `PostgresSubscribableChannel.hasHandlers` flag to check in the `notifyUpdate()` before performing poll query in DB. **Cherry-pick to `6.1.x` & `6.0.x`** --- .../channel/PostgresSubscribableChannel.java | 31 ++++++++++++------- ...resChannelMessageTableSubscriberTests.java | 8 +++-- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java index 67428a30a2f..3a975e8bbbf 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java @@ -67,6 +67,8 @@ public class PostgresSubscribableChannel extends AbstractSubscribableChannel private Executor executor; + private volatile boolean hasHandlers; + /** * Create a subscribable channel for a Postgres database. * @param jdbcChannelMessageStore The message store to use for the relevant region. @@ -128,6 +130,7 @@ public boolean subscribe(MessageHandler handler) { boolean subscribed = super.subscribe(handler); if (this.dispatcher.getHandlerCount() == 1) { this.messageTableSubscriber.subscribe(this); + this.hasHandlers = true; notifyUpdate(); } return subscribed; @@ -138,6 +141,7 @@ public boolean unsubscribe(MessageHandler handle) { boolean unsubscribed = super.unsubscribe(handle); if (this.dispatcher.getHandlerCount() == 0) { this.messageTableSubscriber.unsubscribe(this); + this.hasHandlers = false; } return unsubscribed; } @@ -159,18 +163,7 @@ public void notifyUpdate() { try { Optional> dispatchedMessage; do { - if (this.transactionTemplate != null) { - dispatchedMessage = - this.retryTemplate.execute(context -> - this.transactionTemplate.execute(status -> - pollMessage() - .map(this::dispatch))); - } - else { - dispatchedMessage = - pollMessage() - .map(message -> this.retryTemplate.execute(context -> dispatch(message))); - } + dispatchedMessage = askForMessage(); } while (dispatchedMessage.isPresent()); } catch (Exception ex) { @@ -179,6 +172,20 @@ public void notifyUpdate() { }); } + private Optional> askForMessage() { + if (this.hasHandlers) { + if (this.transactionTemplate != null) { + return this.retryTemplate.execute(context -> + this.transactionTemplate.execute(status -> pollMessage().map(this::dispatch))); + } + else { + return pollMessage() + .map(message -> this.retryTemplate.execute(context -> dispatch(message))); + } + } + return Optional.empty(); + } + private Optional> pollMessage() { return Optional.ofNullable(this.jdbcChannelMessageStore.pollMessageFromGroup(this.groupId)); } diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java index bd7571afcc3..d443946fa02 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java @@ -45,6 +45,7 @@ import org.springframework.jdbc.datasource.init.DataSourceInitializer; import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator; import org.springframework.jdbc.datasource.init.ScriptUtils; +import org.springframework.messaging.MessageHandler; import org.springframework.messaging.support.GenericMessage; import org.springframework.retry.support.RetryTemplate; import org.springframework.test.annotation.DirtiesContext; @@ -172,14 +173,16 @@ void testMessagesDispatchedInTransaction() throws InterruptedException { postgresSubscribableChannel.setTransactionManager(transactionManager); postgresChannelMessageTableSubscriber.start(); - postgresSubscribableChannel.subscribe(message -> { + MessageHandler messageHandler = + message -> { try { throw new RuntimeException("An error has occurred"); } finally { latch.countDown(); } - }); + }; + postgresSubscribableChannel.subscribe(messageHandler); messageStore.addMessageToGroup(groupId, new GenericMessage<>("1")); messageStore.addMessageToGroup(groupId, new GenericMessage<>("2")); @@ -188,6 +191,7 @@ void testMessagesDispatchedInTransaction() throws InterruptedException { // Stop subscriber to unlock records from TX for the next verification postgresChannelMessageTableSubscriber.stop(); + postgresSubscribableChannel.unsubscribe(messageHandler); assertThat(messageStore.messageGroupSize(groupId)).isEqualTo(2); assertThat(messageStore.pollMessageFromGroup(groupId).getPayload()).isEqualTo("1");