Skip to content

Commit 3bd2efa

Browse files
authored
Await termination after shutting down executors
When terminating an executor service or a thread pool, we first shutdown. Then, we do a timed await termination. If the await termination fails because there are still tasks running, we then shutdown now. However, this method does not wait for actively executing tasks to terminate, so we should again wait for termination of these tasks before returning. This commit does that. Relates #23889
1 parent 6234a49 commit 3bd2efa

File tree

1 file changed

+32
-14
lines changed

1 file changed

+32
-14
lines changed

core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -679,14 +679,23 @@ static final class Fields {
679679
public static boolean terminate(ExecutorService service, long timeout, TimeUnit timeUnit) {
680680
if (service != null) {
681681
service.shutdown();
682-
try {
683-
if (service.awaitTermination(timeout, timeUnit)) {
684-
return true;
685-
}
686-
} catch (InterruptedException e) {
687-
Thread.currentThread().interrupt();
688-
}
682+
if (awaitTermination(service, timeout, timeUnit)) return true;
689683
service.shutdownNow();
684+
return awaitTermination(service, timeout, timeUnit);
685+
}
686+
return false;
687+
}
688+
689+
private static boolean awaitTermination(
690+
final ExecutorService service,
691+
final long timeout,
692+
final TimeUnit timeUnit) {
693+
try {
694+
if (service.awaitTermination(timeout, timeUnit)) {
695+
return true;
696+
}
697+
} catch (final InterruptedException e) {
698+
Thread.currentThread().interrupt();
690699
}
691700
return false;
692701
}
@@ -699,22 +708,31 @@ public static boolean terminate(ThreadPool pool, long timeout, TimeUnit timeUnit
699708
if (pool != null) {
700709
try {
701710
pool.shutdown();
702-
try {
703-
if (pool.awaitTermination(timeout, timeUnit)) {
704-
return true;
705-
}
706-
} catch (InterruptedException e) {
707-
Thread.currentThread().interrupt();
708-
}
711+
if (awaitTermination(pool, timeout, timeUnit)) return true;
709712
// last resort
710713
pool.shutdownNow();
714+
return awaitTermination(pool, timeout, timeUnit);
711715
} finally {
712716
IOUtils.closeWhileHandlingException(pool);
713717
}
714718
}
715719
return false;
716720
}
717721

722+
private static boolean awaitTermination(
723+
final ThreadPool pool,
724+
final long timeout,
725+
final TimeUnit timeUnit) {
726+
try {
727+
if (pool.awaitTermination(timeout, timeUnit)) {
728+
return true;
729+
}
730+
} catch (InterruptedException e) {
731+
Thread.currentThread().interrupt();
732+
}
733+
return false;
734+
}
735+
718736
@Override
719737
public void close() throws IOException {
720738
threadContext.close();

0 commit comments

Comments
 (0)