From 20fcc9eacaa9d1cdc7f111e58588c9f2f2f8c449 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 4 Apr 2017 01:20:27 -0400 Subject: [PATCH] Await termination after shutting down executors When terminating an executor service or a thread pool, we first shutdown. Then, we do a timed await termination. If the await termination fails because there are still tasks running, we then shutdown now. However, this method does not wait for actively executing tasks to terminate, so we should again wait for termination of these tasks before returning. This commit does that. --- .../elasticsearch/threadpool/ThreadPool.java | 46 +++++++++++++------ 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index b68037b8dc6f2..f72956c4202e3 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -679,14 +679,23 @@ static final class Fields { public static boolean terminate(ExecutorService service, long timeout, TimeUnit timeUnit) { if (service != null) { service.shutdown(); - try { - if (service.awaitTermination(timeout, timeUnit)) { - return true; - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + if (awaitTermination(service, timeout, timeUnit)) return true; service.shutdownNow(); + return awaitTermination(service, timeout, timeUnit); + } + return false; + } + + private static boolean awaitTermination( + final ExecutorService service, + final long timeout, + final TimeUnit timeUnit) { + try { + if (service.awaitTermination(timeout, timeUnit)) { + return true; + } + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); } return false; } @@ -699,15 +708,10 @@ public static boolean terminate(ThreadPool pool, long timeout, TimeUnit timeUnit if (pool != null) { try { pool.shutdown(); - try { - if (pool.awaitTermination(timeout, timeUnit)) { - return true; - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + if (awaitTermination(pool, timeout, timeUnit)) return true; // last resort pool.shutdownNow(); + return awaitTermination(pool, timeout, timeUnit); } finally { IOUtils.closeWhileHandlingException(pool); } @@ -715,6 +719,20 @@ public static boolean terminate(ThreadPool pool, long timeout, TimeUnit timeUnit return false; } + private static boolean awaitTermination( + final ThreadPool pool, + final long timeout, + final TimeUnit timeUnit) { + try { + if (pool.awaitTermination(timeout, timeUnit)) { + return true; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return false; + } + @Override public void close() throws IOException { threadContext.close();