Skip to content

Commit 642278d

Browse files
authored
GH-9061: renew connection in PostgresChannelMessageTableSubscriber
Fixes: #9061 `PostgresChannelMessageTableSubscriber` never renews the connection. This causes problems on DB failover. With this change the connection is renewed when notifications are not received for a certain time. **Auto-cherry-pick to `6.2.x` & `6.1.x`**
1 parent 627cde2 commit 642278d

File tree

2 files changed

+77
-17
lines changed

2 files changed

+77
-17
lines changed

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriber.java

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.sql.SQLException;
2020
import java.sql.Statement;
21+
import java.time.Duration;
2122
import java.util.Map;
2223
import java.util.Set;
2324
import java.util.concurrent.CompletableFuture;
@@ -62,6 +63,7 @@
6263
* @author Artem Bilan
6364
* @author Igor Lovich
6465
* @author Christian Tzolov
66+
* @author Johannes Edmeier
6567
*
6668
* @since 6.0
6769
*/
@@ -86,6 +88,8 @@ public final class PostgresChannelMessageTableSubscriber implements SmartLifecyc
8688
@Nullable
8789
private volatile PgConnection connection;
8890

91+
private Duration notificationTimeout = Duration.ofSeconds(60);
92+
8993
/**
9094
* Create a new subscriber using the {@link JdbcChannelMessageStore#DEFAULT_TABLE_PREFIX}.
9195
* @param connectionSupplier The connection supplier for the targeted Postgres database.
@@ -116,6 +120,19 @@ public void setTaskExecutor(AsyncTaskExecutor taskExecutor) {
116120
this.taskExecutor = taskExecutor;
117121
}
118122

123+
/**
124+
* Set the timeout for the notification polling.
125+
* If for the specified duration no notificiation are received the underlying connection is closed and re-established.
126+
* Setting a value of {@code Duration.ZERO} will disable the timeout and wait forever.
127+
* This might cause problems in DB failover scenarios.
128+
* @param notificationTimeout the timeout for the notification polling.
129+
* @since 6.1.8
130+
*/
131+
public void setNotificationTimeout(Duration notificationTimeout) {
132+
Assert.notNull(notificationTimeout, "'notificationTimeout' must not be null.");
133+
this.notificationTimeout = notificationTimeout;
134+
}
135+
119136
/**
120137
* Add a new subscription to this subscriber.
121138
* @param subscription The subscription to register.
@@ -193,24 +210,28 @@ private void doStart(CountDownLatch startingLatch) {
193210
while (isActive()) {
194211
startingLatch.countDown();
195212

196-
PGNotification[] notifications = conn.getNotifications(0);
213+
PGNotification[] notifications = conn.getNotifications((int) this.notificationTimeout.toMillis());
197214
// Unfortunately, there is no good way of interrupting a notification
198215
// poll but by closing its connection.
199216
if (!isActive()) {
200217
return;
201218
}
202-
if (notifications != null) {
203-
for (PGNotification notification : notifications) {
204-
String parameter = notification.getParameter();
205-
Set<Subscription> subscriptions = this.subscriptionsMap.get(parameter);
206-
if (subscriptions == null) {
207-
continue;
208-
}
209-
for (Subscription subscription : subscriptions) {
210-
subscription.notifyUpdate();
211-
}
219+
if (notifications == null || notifications.length == 0) {
220+
//We did not receive any notifications within the timeout period.
221+
//We will close the connection and re-establish it.
222+
break;
223+
}
224+
for (PGNotification notification : notifications) {
225+
String parameter = notification.getParameter();
226+
Set<Subscription> subscriptions = this.subscriptionsMap.get(parameter);
227+
if (subscriptions == null) {
228+
continue;
229+
}
230+
for (Subscription subscription : subscriptions) {
231+
subscription.notifyUpdate();
212232
}
213233
}
234+
214235
}
215236
}
216237
finally {

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package org.springframework.integration.jdbc.channel;
1818

1919
import java.sql.DriverManager;
20+
import java.sql.SQLException;
21+
import java.time.Duration;
2022
import java.util.ArrayList;
2123
import java.util.List;
2224
import java.util.concurrent.CountDownLatch;
@@ -62,6 +64,7 @@
6264
* @author Artem Bilan
6365
* @author Igor Lovich
6466
* @author Adama Sorho
67+
* @author Johannes Edmeier
6568
*
6669
* @since 6.0
6770
*/
@@ -102,15 +105,14 @@ CREATE FUNCTION INT_CHANNEL_MESSAGE_NOTIFY_FCT()
102105

103106
private String groupId;
104107

108+
private ConnectionSupplier connectionSupplier;
109+
105110
@BeforeEach
106111
void setUp(TestInfo testInfo) {
107112
// Not initiated as a bean to allow for registrations prior and post the life cycle
108-
this.postgresChannelMessageTableSubscriber =
109-
new PostgresChannelMessageTableSubscriber(() ->
110-
DriverManager.getConnection(POSTGRES_CONTAINER.getJdbcUrl(),
111-
POSTGRES_CONTAINER.getUsername(),
112-
POSTGRES_CONTAINER.getPassword())
113-
.unwrap(PgConnection.class));
113+
this.connectionSupplier = new ConnectionSupplier();
114+
this.postgresChannelMessageTableSubscriber = new PostgresChannelMessageTableSubscriber(connectionSupplier);
115+
this.postgresChannelMessageTableSubscriber.setNotificationTimeout(Duration.ofSeconds(5));
114116

115117
this.taskExecutor = new ThreadPoolTaskExecutor();
116118
this.taskExecutor.setCorePoolSize(10);
@@ -261,6 +263,26 @@ void testRetryOnErrorDuringDispatch(boolean transactionsEnabled) throws Interrup
261263
assertThat(payloads).containsExactly("1");
262264
}
263265

266+
@Test
267+
public void testRenewConnection() throws Exception {
268+
CountDownLatch latch = new CountDownLatch(2);
269+
List<Object> payloads = new ArrayList<>();
270+
CountDownLatch connectionLatch = new CountDownLatch(2);
271+
connectionSupplier.onGetConnection = connectionLatch::countDown;
272+
postgresChannelMessageTableSubscriber.start();
273+
postgresSubscribableChannel.subscribe(message -> {
274+
payloads.add(message.getPayload());
275+
latch.countDown();
276+
});
277+
278+
assertThat(connectionLatch.await(10, TimeUnit.SECONDS)).isTrue();
279+
280+
messageStore.addMessageToGroup(groupId, new GenericMessage<>("1"));
281+
messageStore.addMessageToGroup(groupId, new GenericMessage<>("2"));
282+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
283+
assertThat(payloads).containsExactlyInAnyOrder("1", "2");
284+
}
285+
264286
@Configuration
265287
@EnableIntegration
266288
public static class Config {
@@ -300,4 +322,21 @@ public JdbcChannelMessageStore jdbcChannelMessageStore(DataSource dataSource) {
300322

301323
}
302324

325+
private static class ConnectionSupplier implements PgConnectionSupplier {
326+
327+
Runnable onGetConnection;
328+
329+
@Override
330+
public PgConnection get() throws SQLException {
331+
var conn = DriverManager.getConnection(POSTGRES_CONTAINER.getJdbcUrl(),
332+
POSTGRES_CONTAINER.getUsername(),
333+
POSTGRES_CONTAINER.getPassword())
334+
.unwrap(PgConnection.class);
335+
if (this.onGetConnection != null) {
336+
this.onGetConnection.run();
337+
}
338+
return conn;
339+
}
340+
341+
}
303342
}

0 commit comments

Comments
 (0)