Skip to content

Commit 20c66c5

Browse files
Bubble-up exceptions from scheduler (#38317)
Instead of logging warnings we now rethrow exceptions thrown inside scheduled/submitted tasks. This will still log them as warnings in production but has the added benefit that if they are thrown during unit/integration test runs, the test will be flagged as an error. This is a continuation of #38014 Fixed NPE that caused CCR tests (IndexFollowingIT and likely others) to fail. schedule could bubble rejected exception to uncaught exception handler when not using SAME executor if thread pool is terminated. Now ignore rejected exception silently if executor is shutdown.
1 parent a6ce671 commit 20c66c5

File tree

4 files changed

+33
-67
lines changed

4 files changed

+33
-67
lines changed

qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java

Lines changed: 13 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,13 @@
1919

2020
package org.elasticsearch.threadpool;
2121

22-
import org.apache.logging.log4j.Level;
23-
import org.apache.logging.log4j.LogManager;
24-
import org.apache.logging.log4j.Logger;
25-
import org.apache.logging.log4j.core.LogEvent;
26-
import org.elasticsearch.common.logging.Loggers;
2722
import org.elasticsearch.common.settings.Settings;
2823
import org.elasticsearch.common.unit.TimeValue;
2924
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
3025
import org.elasticsearch.common.util.concurrent.EsExecutors;
3126
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
3227
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
3328
import org.elasticsearch.test.ESTestCase;
34-
import org.elasticsearch.test.MockLogAppender;
3529
import org.junit.After;
3630
import org.junit.Before;
3731

@@ -44,7 +38,6 @@
4438
import java.util.function.Consumer;
4539

4640
import static org.hamcrest.Matchers.containsString;
47-
import static org.hamcrest.Matchers.equalTo;
4841
import static org.hamcrest.Matchers.hasToString;
4942
import static org.hamcrest.Matchers.instanceOf;
5043

@@ -367,63 +360,28 @@ private void runExecutionTest(
367360
uncaughtExceptionHandlerLatch.countDown();
368361
});
369362

370-
371363
final CountDownLatch supplierLatch = new CountDownLatch(1);
372364

373-
Runnable job = () -> {
374-
try {
375-
runnable.run();
376-
} finally {
377-
supplierLatch.countDown();
378-
}
379-
};
380-
381-
// snoop on logging to also handle the cases where exceptions are simply logged in Scheduler.
382-
final Logger schedulerLogger = LogManager.getLogger(Scheduler.SafeScheduledThreadPoolExecutor.class);
383-
final MockLogAppender appender = new MockLogAppender();
384-
appender.addExpectation(
385-
new MockLogAppender.LoggingExpectation() {
386-
@Override
387-
public void match(LogEvent event) {
388-
if (event.getLevel() == Level.WARN) {
389-
assertThat("no other warnings than those expected",
390-
event.getMessage().getFormattedMessage(),
391-
equalTo("uncaught exception in scheduled thread [" + Thread.currentThread().getName() + "]"));
392-
assertTrue(expectThrowable);
393-
assertNotNull(event.getThrown());
394-
assertTrue("only one message allowed", throwableReference.compareAndSet(null, event.getThrown()));
395-
uncaughtExceptionHandlerLatch.countDown();
396-
}
397-
}
398-
399-
@Override
400-
public void assertMatched() {
365+
try {
366+
runner.accept(() -> {
367+
try {
368+
runnable.run();
369+
} finally {
370+
supplierLatch.countDown();
401371
}
402372
});
373+
} catch (Throwable t) {
374+
consumer.accept(Optional.of(t));
375+
return;
376+
}
403377

404-
appender.start();
405-
Loggers.addAppender(schedulerLogger, appender);
406-
try {
407-
try {
408-
runner.accept(job);
409-
} catch (Throwable t) {
410-
consumer.accept(Optional.of(t));
411-
return;
412-
}
413-
414-
supplierLatch.await();
378+
supplierLatch.await();
415379

416-
if (expectThrowable) {
417-
uncaughtExceptionHandlerLatch.await();
418-
}
419-
} finally {
420-
Loggers.removeAppender(schedulerLogger, appender);
421-
appender.stop();
380+
if (expectThrowable) {
381+
uncaughtExceptionHandlerLatch.await();
422382
}
423383

424384
consumer.accept(Optional.ofNullable(throwableReference.get()));
425-
} catch (IllegalAccessException e) {
426-
throw new RuntimeException(e);
427385
} finally {
428386
Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler);
429387
}

server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ synchronized void add(final long waitingForGlobalCheckpoint, final GlobalCheckpo
135135
* before we could be cancelled by the notification. In this case, our listener here would
136136
* not be in the map and we should not fire the timeout logic.
137137
*/
138-
removed = listeners.remove(listener).v2() != null;
138+
removed = listeners.remove(listener) != null;
139139
}
140140
if (removed) {
141141
final TimeoutException e = new TimeoutException(timeout.getStringRep());

server/src/main/java/org/elasticsearch/threadpool/Scheduler.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
import org.apache.logging.log4j.LogManager;
2323
import org.apache.logging.log4j.Logger;
24-
import org.apache.logging.log4j.message.ParameterizedMessage;
24+
import org.elasticsearch.ExceptionsHelper;
2525
import org.elasticsearch.common.SuppressForbidden;
2626
import org.elasticsearch.common.settings.Settings;
2727
import org.elasticsearch.common.unit.TimeValue;
@@ -47,8 +47,8 @@ public interface Scheduler {
4747
/**
4848
* Create a scheduler that can be used client side. Server side, please use <code>ThreadPool.schedule</code> instead.
4949
*
50-
* Notice that if any scheduled jobs fail with an exception, they will be logged as a warning. This includes jobs started
51-
* using execute, submit and schedule.
50+
* Notice that if any scheduled jobs fail with an exception, these will bubble up to the uncaught exception handler where they will
51+
* be logged as a warning. This includes jobs started using execute, submit and schedule.
5252
* @param settings the settings to use
5353
* @return executor
5454
*/
@@ -250,7 +250,8 @@ public void onAfter() {
250250
}
251251

252252
/**
253-
* This subclass ensures to properly bubble up Throwable instances of type Error and logs exceptions thrown in submitted/scheduled tasks
253+
* This subclass ensures to properly bubble up Throwable instances of both type Error and Exception thrown in submitted/scheduled
254+
* tasks to the uncaught exception handler
254255
*/
255256
class SafeScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
256257
private static final Logger logger = LogManager.getLogger(SafeScheduledThreadPoolExecutor.class);
@@ -272,12 +273,10 @@ public SafeScheduledThreadPoolExecutor(int corePoolSize) {
272273

273274
@Override
274275
protected void afterExecute(Runnable r, Throwable t) {
275-
Throwable exception = EsExecutors.rethrowErrors(r);
276-
if (exception != null) {
277-
logger.warn(() ->
278-
new ParameterizedMessage("uncaught exception in scheduled thread [{}]", Thread.currentThread().getName()),
279-
exception);
280-
}
276+
if (t != null) return;
277+
// Scheduler only allows Runnable's so we expect no checked exceptions here. If anyone uses submit directly on `this`, we
278+
// accept the wrapped exception in the output.
279+
ExceptionsHelper.reThrowIfNotNull(EsExecutors.rethrowErrors(r));
281280
}
282281
}
283282
}

server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,16 @@ class ThreadedRunnable implements Runnable {
501501

502502
@Override
503503
public void run() {
504-
executor.execute(runnable);
504+
try {
505+
executor.execute(runnable);
506+
} catch (EsRejectedExecutionException e) {
507+
if (e.isExecutorShutdown()) {
508+
logger.debug(new ParameterizedMessage("could not schedule execution of [{}] on [{}] as executor is shut down",
509+
runnable, executor), e);
510+
} else {
511+
throw e;
512+
}
513+
}
505514
}
506515

507516
@Override

0 commit comments

Comments
 (0)