-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Description
Expected Behavior
Opening an issue as discussed on stackoverflow Spring Integration PostgresSubscribableChannel is not transactional.
The expectation is that if there is an exception in the downstream the message returns to the underlying message store.
Current Behavior
When replacing current QueueChannel + Transactional Polling flow with a PostgresSubscribableChannel the behavior of the flow changes, since messages are now lost if there is an exception in the downstream
Context
I have made the following modifications as a POC to get the desired behavior. This does change the semantics somewhat, as before each message (for a subscriber) was dispatched on a separate thread and now all the messages for (a subscriber) are emitted sequentially on the same thread
PostgresChannelMessageTableSubscriber
- Add another executor service that will be used for notification dispatch, initialized in the start method with
notificationThreadPoolSize
(defaults to 1)
if (subscriptionExecutorService == null) {
CustomizableThreadFactory threadFactory =
new CustomizableThreadFactory("postgres-channel-dispatch-");
threadFactory.setDaemon(true);
subscriptionExecutorService = Executors.newFixedThreadPool(notificationThreadPoolSize);
}
- Modify the subscriber notifications to happen asynchronously
subscriptions.forEach( it-> subscriptionExecutorService.submit(it::notifyUpdate));
- Add a method shutDownAndAwaitTermination as per ExecutorService javadoc with a 5 second timeout for graceful termination. Use this method to shutdown the
subscriptionExecutorService
in thestop
method
PostgresSubscribableChannel
- Inject a PlatformTransactionManager in the constructor and create a TransactionTemplate
this.transactionTemplate = new TransactionTemplate(platformTransactionManager);
- Remove the Executor from the UnicastDispatcher (and remove the method that allows setting the Executor).
- Add an AtomicBoolean field
errorOnNotify
- Modify the
notifyUpdate
method
@Override
public void notifyUpdate() {
try {
transactionTemplate.executeWithoutResult(it -> {
Message<?> message;
while ((message = this.jdbcChannelMessageStore.pollMessageFromGroup(this.groupId)) != null) {
this.dispatcher.dispatch(message);
}
});
} catch (Exception e) {
logger.error(e, "Exception during message dispatch");
errorOnNotify.set(true);
}
}
- Add a
@Scheduled
method to retry whenever there are errors ( to be replaced by aTaskScheduler
with a customizablePeriodicTrigger
)
@Scheduled(fixedDelay = 10000)
public void retryOnErrors() {
if (errorOnNotify.compareAndSet(true, false)) {
notifyUpdate();
}
}