Skip to content

Commit d6ffa38

Browse files
scheduleAtFixedRate would hang (#42993)
Though not in use in elasticsearch currently, it seems surprising that ThreadPool.scheduler().scheduleAtFixedRate would hang. A recurring scheduled task is never completed (except on failure) and we test for exceptions using RunnableFuture.get(), which hangs for periodic tasks. Fixed by checking that task is done before calling .get().
1 parent 997fa05 commit d6ffa38

File tree

3 files changed

+19
-1
lines changed

3 files changed

+19
-1
lines changed

server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ public static EsThreadPoolExecutor newAutoQueueFixed(String name, int size, int
124124
*/
125125
public static Throwable rethrowErrors(Runnable runnable) {
126126
if (runnable instanceof RunnableFuture) {
127+
assert ((RunnableFuture) runnable).isDone();
127128
try {
128129
((RunnableFuture) runnable).get();
129130
} catch (final Exception e) {

server/src/main/java/org/elasticsearch/threadpool/Scheduler.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.concurrent.Delayed;
3434
import java.util.concurrent.Future;
3535
import java.util.concurrent.RejectedExecutionHandler;
36+
import java.util.concurrent.RunnableFuture;
3637
import java.util.concurrent.ScheduledFuture;
3738
import java.util.concurrent.ScheduledThreadPoolExecutor;
3839
import java.util.concurrent.ThreadFactory;
@@ -276,7 +277,11 @@ protected void afterExecute(Runnable r, Throwable t) {
276277
if (t != null) return;
277278
// Scheduler only allows Runnable's so we expect no checked exceptions here. If anyone uses submit directly on `this`, we
278279
// accept the wrapped exception in the output.
279-
ExceptionsHelper.reThrowIfNotNull(EsExecutors.rethrowErrors(r));
280+
if (r instanceof RunnableFuture && ((RunnableFuture<?>) r).isDone()) {
281+
// only check this if task is done, which it always is except for periodic tasks. Periodic tasks will hang on
282+
// RunnableFuture.get()
283+
ExceptionsHelper.reThrowIfNotNull(EsExecutors.rethrowErrors(r));
284+
}
280285
}
281286
}
282287
}

server/src/test/java/org/elasticsearch/threadpool/SchedulerTests.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,4 +153,16 @@ public void testScheduledOnScheduler() throws InterruptedException {
153153
Scheduler.terminate(executor, 10, TimeUnit.SECONDS);
154154
}
155155
}
156+
157+
public void testScheduleAtFixedRate() throws InterruptedException {
158+
ScheduledThreadPoolExecutor executor = Scheduler.initScheduler(Settings.EMPTY);
159+
try {
160+
CountDownLatch missingExecutions = new CountDownLatch(randomIntBetween(1, 10));
161+
executor.scheduleAtFixedRate(missingExecutions::countDown,
162+
randomIntBetween(1, 10), randomIntBetween(1, 10), TimeUnit.MILLISECONDS);
163+
assertTrue(missingExecutions.await(30, TimeUnit.SECONDS));
164+
} finally {
165+
Scheduler.terminate(executor, 10, TimeUnit.SECONDS);
166+
}
167+
}
156168
}

0 commit comments

Comments
 (0)