Skip to content

Commit 5160117

Browse files
committed
GH-10547: DefaultHeaderChannelRegistry: Fix race condition
Fixes: #10547 When stopping an application, the following error log is generated: ``` ERROR Unexpected error occurred in scheduled task org.springframework.core.task.TaskRejectedException: ExecutorService in shutdown state did not accept task: bean 'integrationHeaderChannelRegistry' at org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler.schedule(ThreadPoolTaskScheduler.java:430) Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@7bc9f638[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@3ecb6f85[Wrapped task = DelegatingErrorHandlingRunnable for bean 'integrationHeaderChannelRegistry']] rejected from org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler$1@57715118[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 1225] ``` * Check for `isRunning()` in the `DefaultHeaderChannelRegistry.run()` before scheduling a new reaper task * Increase latch wait timeout to 20 seconds in the `SftpSessionFactoryTests.sharedSessionConcurrentAccess()` **Auto-cherry-pick to `6.5.x` & `6.4.x`**
1 parent 0e08a6f commit 5160117

File tree

2 files changed

+14
-20
lines changed

2 files changed

+14
-20
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/DefaultHeaderChannelRegistry.java

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,11 @@ public class DefaultHeaderChannelRegistry extends IntegrationObjectSupport
5757

5858
private static final int DEFAULT_REAPER_DELAY = 60000;
5959

60-
protected static final AtomicLong id = new AtomicLong(); // NOSONAR
60+
protected static final AtomicLong id = new AtomicLong();
6161

62-
protected final Map<String, MessageChannelWrapper> channels = new ConcurrentHashMap<>(); // NOSONAR
62+
protected final Map<String, MessageChannelWrapper> channels = new ConcurrentHashMap<>();
6363

64-
protected final String uuid = UUID.randomUUID() + ":"; // NOSONAR
64+
protected final String uuid = UUID.randomUUID() + ":";
6565

6666
private boolean removeOnGet;
6767

@@ -119,20 +119,13 @@ public final int size() {
119119
return this.channels.size();
120120
}
121121

122-
@Override
123-
protected void onInit() {
124-
super.onInit();
125-
Assert.notNull(getTaskScheduler(), "a task scheduler is required");
126-
}
127-
128122
@Override
129123
public void start() {
130124
this.lock.lock();
131125
try {
132126
if (!this.running) {
133-
Assert.notNull(getTaskScheduler(), "a task scheduler is required");
134-
this.reaperScheduledFuture = getTaskScheduler()
135-
.schedule(this, Instant.now().plusMillis(this.reaperDelay));
127+
this.reaperScheduledFuture =
128+
getTaskScheduler().schedule(this, Instant.now().plusMillis(this.reaperDelay));
136129

137130
this.running = true;
138131
}
@@ -179,13 +172,12 @@ public Object channelToChannelName(@Nullable Object channel) {
179172
@Override
180173
@Nullable
181174
public Object channelToChannelName(@Nullable Object channel, long timeToLive) {
182-
if (!this.running && !this.explicitlyStopped && this.getTaskScheduler() != null) {
175+
if (!this.running && !this.explicitlyStopped) {
183176
start();
184177
}
185-
if (channel instanceof MessageChannel) {
178+
if (channel instanceof MessageChannel messageChannel) {
186179
String name = this.uuid + id.incrementAndGet();
187-
this.channels.put(name, new MessageChannelWrapper((MessageChannel) channel,
188-
System.currentTimeMillis() + timeToLive));
180+
this.channels.put(name, new MessageChannelWrapper(messageChannel, System.currentTimeMillis() + timeToLive));
189181
logger.debug(() -> "Registered " + channel + " as " + name);
190182
return name;
191183
}
@@ -249,8 +241,10 @@ public void run() {
249241
iterator.remove();
250242
}
251243
}
252-
this.reaperScheduledFuture = getTaskScheduler()
253-
.schedule(this, Instant.now().plusMillis(this.reaperDelay));
244+
if (isRunning()) {
245+
this.reaperScheduledFuture =
246+
getTaskScheduler().schedule(this, Instant.now().plusMillis(this.reaperDelay));
247+
}
254248

255249
logger.trace(() -> "Reaper completed; channels size=" + this.channels.size());
256250
}

spring-integration-sftp/src/test/java/org/springframework/integration/sftp/session/SftpSessionFactoryTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -331,15 +331,15 @@ protected SftpClient createSftpClient(ClientSession clientSession,
331331
});
332332
}
333333

334-
assertThat(executionLatch.await(10, TimeUnit.SECONDS)).isTrue();
334+
assertThat(executionLatch.await(20, TimeUnit.SECONDS)).isTrue();
335335
synchronized (errors) {
336336
assertThat(errors).isEmpty();
337337
}
338338

339339
assertThat(clientInstances).hasValue(1);
340340

341341
executorService.shutdown();
342-
assertThat(executorService.awaitTermination(10, TimeUnit.SECONDS)).isTrue();
342+
assertThat(executorService.awaitTermination(20, TimeUnit.SECONDS)).isTrue();
343343

344344
sftpSessionFactory.destroy();
345345
}

0 commit comments

Comments
 (0)