Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public class PostgresSubscribableChannel extends AbstractSubscribableChannel

private Executor executor;

private volatile boolean hasHandlers;

/**
* Create a subscribable channel for a Postgres database.
* @param jdbcChannelMessageStore The message store to use for the relevant region.
Expand Down Expand Up @@ -128,6 +130,7 @@ public boolean subscribe(MessageHandler handler) {
boolean subscribed = super.subscribe(handler);
if (this.dispatcher.getHandlerCount() == 1) {
this.messageTableSubscriber.subscribe(this);
this.hasHandlers = true;
notifyUpdate();
}
return subscribed;
Expand All @@ -138,6 +141,7 @@ public boolean unsubscribe(MessageHandler handle) {
boolean unsubscribed = super.unsubscribe(handle);
if (this.dispatcher.getHandlerCount() == 0) {
this.messageTableSubscriber.unsubscribe(this);
this.hasHandlers = false;
}
return unsubscribed;
}
Expand All @@ -159,18 +163,7 @@ public void notifyUpdate() {
try {
Optional<Message<?>> dispatchedMessage;
do {
if (this.transactionTemplate != null) {
dispatchedMessage =
this.retryTemplate.execute(context ->
this.transactionTemplate.execute(status ->
pollMessage()
.map(this::dispatch)));
}
else {
dispatchedMessage =
pollMessage()
.map(message -> this.retryTemplate.execute(context -> dispatch(message)));
}
dispatchedMessage = askForMessage();
} while (dispatchedMessage.isPresent());
}
catch (Exception ex) {
Expand All @@ -179,6 +172,20 @@ public void notifyUpdate() {
});
}

private Optional<Message<?>> askForMessage() {
if (this.hasHandlers) {
if (this.transactionTemplate != null) {
return this.retryTemplate.execute(context ->
this.transactionTemplate.execute(status -> pollMessage().map(this::dispatch)));
}
else {
return pollMessage()
.map(message -> this.retryTemplate.execute(context -> dispatch(message)));
}
}
return Optional.empty();
}

private Optional<Message<?>> pollMessage() {
return Optional.ofNullable(this.jdbcChannelMessageStore.pollMessageFromGroup(this.groupId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.springframework.jdbc.datasource.init.DataSourceInitializer;
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;
import org.springframework.jdbc.datasource.init.ScriptUtils;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.test.annotation.DirtiesContext;
Expand Down Expand Up @@ -172,14 +173,16 @@ void testMessagesDispatchedInTransaction() throws InterruptedException {
postgresSubscribableChannel.setTransactionManager(transactionManager);

postgresChannelMessageTableSubscriber.start();
postgresSubscribableChannel.subscribe(message -> {
MessageHandler messageHandler =
message -> {
try {
throw new RuntimeException("An error has occurred");
}
finally {
latch.countDown();
}
});
};
postgresSubscribableChannel.subscribe(messageHandler);

messageStore.addMessageToGroup(groupId, new GenericMessage<>("1"));
messageStore.addMessageToGroup(groupId, new GenericMessage<>("2"));
Expand All @@ -188,6 +191,7 @@ void testMessagesDispatchedInTransaction() throws InterruptedException {

// Stop subscriber to unlock records from TX for the next verification
postgresChannelMessageTableSubscriber.stop();
postgresSubscribableChannel.unsubscribe(messageHandler);

assertThat(messageStore.messageGroupSize(groupId)).isEqualTo(2);
assertThat(messageStore.pollMessageFromGroup(groupId).getPayload()).isEqualTo("1");
Expand Down