Skip to content

Commit 21f6405

Browse files
committed
Protect scheduler engine against throwing listeners (#32998)
There are two problems with the scheduler engine today. Both relate to listeners that throw. The first problem is that any triggered listener that throws a plain old exception will cause no additional listeners to be triggered for the event, and will also cause the scheduler to never be invoked again. This leads to lost events and is bad. The second problem is that any triggered listener that throws an error of the fatal kind will not lead to that error because caught by the uncaught exception handler. This is because the triggered listener is executed as a future task under a scheduled thread pool executor. A throwable there goes caught by the JDK framework and set as the outcome on the future task. Since we never inspect these tasks for their outcomes, nor is there a good place to do this, we have to handle these errors ourselves. To do this, we catch them and dispatch them to the uncaught exception handler via a forked thread. This is similar to our handling in Netty.
1 parent cc8161a commit 21f6405

File tree

5 files changed

+320
-9
lines changed

5 files changed

+320
-9
lines changed

server/src/main/java/org/elasticsearch/ExceptionsHelper.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,35 @@ public static boolean reThrowIfNotNull(@Nullable Throwable e) {
243243
return true;
244244
}
245245

246+
/**
247+
* If the specified cause is an unrecoverable error, this method will rethrow the cause on a separate thread so that it can not be
248+
* caught and bubbles up to the uncaught exception handler.
249+
*
250+
* @param throwable the throwable to test
251+
*/
252+
public static void dieOnError(Throwable throwable) {
253+
ExceptionsHelper.maybeError(throwable, logger).ifPresent(error -> {
254+
/*
255+
* Here be dragons. We want to rethrow this so that it bubbles up to the uncaught exception handler. Yet, sometimes the stack
256+
* contains statements that catch any throwable (e.g., Netty, and the JDK futures framework). This means that a rethrow here
257+
* will not bubble up to where we want it to. So, we fork a thread and throw the exception from there where we are sure the
258+
* stack does not contain statements that catch any throwable. We do not wrap the exception so as to not lose the original cause
259+
* during exit.
260+
*/
261+
try {
262+
// try to log the current stack trace
263+
final String formatted = ExceptionsHelper.formatStackTrace(Thread.currentThread().getStackTrace());
264+
logger.error("fatal error\n{}", formatted);
265+
} finally {
266+
new Thread(
267+
() -> {
268+
throw error;
269+
})
270+
.start();
271+
}
272+
});
273+
}
274+
246275
/**
247276
* Deduplicate the failures by exception message and index.
248277
*/

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,13 @@
33
* or more contributor license agreements. Licensed under the Elastic License;
44
* you may not use this file except in compliance with the Elastic License.
55
*/
6+
67
package org.elasticsearch.xpack.core.scheduler;
78

9+
import org.apache.logging.log4j.LogManager;
10+
import org.apache.logging.log4j.Logger;
11+
import org.apache.logging.log4j.message.ParameterizedMessage;
12+
import org.elasticsearch.ExceptionsHelper;
813
import org.elasticsearch.common.settings.Settings;
914
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
1015
import org.elasticsearch.common.util.concurrent.EsExecutors;
@@ -14,6 +19,7 @@
1419
import java.util.Collection;
1520
import java.util.List;
1621
import java.util.Map;
22+
import java.util.Objects;
1723
import java.util.concurrent.CopyOnWriteArrayList;
1824
import java.util.concurrent.Executors;
1925
import java.util.concurrent.ScheduledExecutorService;
@@ -89,13 +95,20 @@ public interface Schedule {
8995
}
9096

9197
private final Map<String, ActiveSchedule> schedules = ConcurrentCollections.newConcurrentMap();
92-
private final ScheduledExecutorService scheduler;
9398
private final Clock clock;
99+
private final ScheduledExecutorService scheduler;
100+
private final Logger logger;
94101
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
95102

96-
public SchedulerEngine(Settings settings, Clock clock) {
97-
this.clock = clock;
98-
this.scheduler = Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(settings, "trigger_engine_scheduler"));
103+
public SchedulerEngine(final Settings settings, final Clock clock) {
104+
this(settings, clock, LogManager.getLogger(SchedulerEngine.class));
105+
}
106+
107+
SchedulerEngine(final Settings settings, final Clock clock, final Logger logger) {
108+
this.clock = Objects.requireNonNull(clock, "clock");
109+
this.scheduler = Executors.newScheduledThreadPool(
110+
1, EsExecutors.daemonThreadFactory(Objects.requireNonNull(settings, "settings"), "trigger_engine_scheduler"));
111+
this.logger = Objects.requireNonNull(logger, "logger");
99112
}
100113

101114
public void register(Listener listener) {
@@ -144,10 +157,15 @@ public int jobCount() {
144157
return schedules.size();
145158
}
146159

147-
protected void notifyListeners(String name, long triggeredTime, long scheduledTime) {
160+
protected void notifyListeners(final String name, final long triggeredTime, final long scheduledTime) {
148161
final Event event = new Event(name, triggeredTime, scheduledTime);
149-
for (Listener listener : listeners) {
150-
listener.triggered(event);
162+
for (final Listener listener : listeners) {
163+
try {
164+
listener.triggered(event);
165+
} catch (final Exception e) {
166+
// do not allow exceptions to escape this method; we should continue to notify listeners and schedule the next run
167+
logger.warn(new ParameterizedMessage("listener failed while handling triggered event [{}]", name), e);
168+
}
151169
}
152170
}
153171

@@ -169,8 +187,20 @@ class ActiveSchedule implements Runnable {
169187

170188
@Override
171189
public void run() {
172-
long triggeredTime = clock.millis();
173-
notifyListeners(name, triggeredTime, scheduledTime);
190+
final long triggeredTime = clock.millis();
191+
try {
192+
notifyListeners(name, triggeredTime, scheduledTime);
193+
} catch (final Throwable t) {
194+
/*
195+
* Allowing the throwable to escape here will lead to be it being caught in FutureTask#run and set as the outcome of this
196+
* task; however, we never inspect the the outcomes of these scheduled tasks and so allowing the throwable to escape
197+
* unhandled here could lead to use losing fatal errors. Instead, we rely on ExceptionsHelper#dieOnError to appropriately
198+
* dispatch any error to the uncaught exception handler. We should never see an exception here as these do not escape from
199+
* SchedulerEngine#notifyListeners.
200+
*/
201+
ExceptionsHelper.dieOnError(t);
202+
throw t;
203+
}
174204
scheduleNextRun(triggeredTime);
175205
}
176206

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.core.scheduler;
8+
9+
import org.apache.logging.log4j.Logger;
10+
import org.apache.logging.log4j.message.ParameterizedMessage;
11+
import org.elasticsearch.common.collect.Tuple;
12+
import org.elasticsearch.common.settings.Settings;
13+
import org.elasticsearch.test.ESTestCase;
14+
import org.mockito.ArgumentCaptor;
15+
16+
import java.time.Clock;
17+
import java.util.ArrayList;
18+
import java.util.Collections;
19+
import java.util.List;
20+
import java.util.concurrent.CountDownLatch;
21+
import java.util.concurrent.atomic.AtomicBoolean;
22+
import java.util.concurrent.atomic.AtomicInteger;
23+
24+
import static org.hamcrest.Matchers.arrayWithSize;
25+
import static org.hamcrest.Matchers.equalTo;
26+
import static org.hamcrest.Matchers.instanceOf;
27+
import static org.mockito.Mockito.mock;
28+
import static org.mockito.Mockito.times;
29+
import static org.mockito.Mockito.verify;
30+
import static org.mockito.Mockito.verifyNoMoreInteractions;
31+
32+
public class SchedulerEngineTests extends ESTestCase {
33+
34+
public void testListenersThrowingExceptionsDoNotCauseOtherListenersToBeSkipped() throws InterruptedException {
35+
final Logger mockLogger = mock(Logger.class);
36+
final SchedulerEngine engine = new SchedulerEngine(Settings.EMPTY, Clock.systemUTC(), mockLogger);
37+
try {
38+
final List<Tuple<SchedulerEngine.Listener, AtomicBoolean>> listeners = new ArrayList<>();
39+
final int numberOfListeners = randomIntBetween(1, 32);
40+
int numberOfFailingListeners = 0;
41+
final CountDownLatch latch = new CountDownLatch(numberOfListeners);
42+
for (int i = 0; i < numberOfListeners; i++) {
43+
final AtomicBoolean trigger = new AtomicBoolean();
44+
final SchedulerEngine.Listener listener;
45+
if (randomBoolean()) {
46+
listener = event -> {
47+
if (trigger.compareAndSet(false, true)) {
48+
latch.countDown();
49+
} else {
50+
fail("listener invoked twice");
51+
}
52+
};
53+
} else {
54+
numberOfFailingListeners++;
55+
listener = event -> {
56+
if (trigger.compareAndSet(false, true)) {
57+
latch.countDown();
58+
throw new RuntimeException(getTestName());
59+
} else {
60+
fail("listener invoked twice");
61+
}
62+
};
63+
}
64+
listeners.add(Tuple.tuple(listener, trigger));
65+
}
66+
67+
// randomize the order and register the listeners
68+
Collections.shuffle(listeners, random());
69+
listeners.stream().map(Tuple::v1).forEach(engine::register);
70+
71+
final AtomicBoolean scheduled = new AtomicBoolean();
72+
engine.add(new SchedulerEngine.Job(
73+
getTestName(),
74+
(startTime, now) -> {
75+
// only allow one triggering of the listeners
76+
if (scheduled.compareAndSet(false, true)) {
77+
return 0;
78+
} else {
79+
return -1;
80+
}
81+
}));
82+
83+
latch.await();
84+
85+
// now check that every listener was invoked
86+
assertTrue(listeners.stream().map(Tuple::v2).allMatch(AtomicBoolean::get));
87+
if (numberOfFailingListeners > 0) {
88+
assertFailedListenerLogMessage(mockLogger, numberOfFailingListeners);
89+
}
90+
verifyNoMoreInteractions(mockLogger);
91+
} finally {
92+
engine.stop();
93+
}
94+
}
95+
96+
public void testListenersThrowingExceptionsDoNotCauseNextScheduledTaskToBeSkipped() throws InterruptedException {
97+
final Logger mockLogger = mock(Logger.class);
98+
final SchedulerEngine engine = new SchedulerEngine(Settings.EMPTY, Clock.systemUTC(), mockLogger);
99+
try {
100+
final List<Tuple<SchedulerEngine.Listener, AtomicInteger>> listeners = new ArrayList<>();
101+
final int numberOfListeners = randomIntBetween(1, 32);
102+
final int numberOfSchedules = randomIntBetween(1, 32);
103+
final CountDownLatch listenersLatch = new CountDownLatch(numberOfSchedules * numberOfListeners);
104+
for (int i = 0; i < numberOfListeners; i++) {
105+
final AtomicInteger triggerCount = new AtomicInteger();
106+
final SchedulerEngine.Listener listener = event -> {
107+
if (triggerCount.incrementAndGet() <= numberOfSchedules) {
108+
listenersLatch.countDown();
109+
throw new RuntimeException(getTestName());
110+
} else {
111+
fail("listener invoked more than [" + numberOfSchedules + "] times");
112+
}
113+
};
114+
listeners.add(Tuple.tuple(listener, triggerCount));
115+
engine.register(listener);
116+
}
117+
118+
// latch for each invocation of nextScheduledTimeAfter, once for each scheduled run, and then a final time when we disable
119+
final CountDownLatch latch = new CountDownLatch(1 + numberOfSchedules);
120+
engine.add(new SchedulerEngine.Job(
121+
getTestName(),
122+
(startTime, now) -> {
123+
if (latch.getCount() >= 2) {
124+
latch.countDown();
125+
return 0;
126+
} else if (latch.getCount() == 1) {
127+
latch.countDown();
128+
return -1;
129+
} else {
130+
throw new AssertionError("nextScheduledTimeAfter invoked more than the expected number of times");
131+
}
132+
}));
133+
134+
listenersLatch.await();
135+
assertTrue(listeners.stream().map(Tuple::v2).allMatch(count -> count.get() == numberOfSchedules));
136+
latch.await();
137+
assertFailedListenerLogMessage(mockLogger, numberOfListeners * numberOfSchedules);
138+
verifyNoMoreInteractions(mockLogger);
139+
} finally {
140+
engine.stop();
141+
}
142+
}
143+
144+
private void assertFailedListenerLogMessage(Logger mockLogger, int times) {
145+
final ArgumentCaptor<ParameterizedMessage> messageCaptor = ArgumentCaptor.forClass(ParameterizedMessage.class);
146+
final ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
147+
verify(mockLogger, times(times)).warn(messageCaptor.capture(), throwableCaptor.capture());
148+
for (final ParameterizedMessage message : messageCaptor.getAllValues()) {
149+
assertThat(message.getFormat(), equalTo("listener failed while handling triggered event [{}]"));
150+
assertThat(message.getParameters(), arrayWithSize(1));
151+
assertThat(message.getParameters()[0], equalTo(getTestName()));
152+
}
153+
for (final Throwable throwable : throwableCaptor.getAllValues()) {
154+
assertThat(throwable, instanceOf(RuntimeException.class));
155+
assertThat(throwable.getMessage(), equalTo(getTestName()));
156+
}
157+
}
158+
159+
}

x-pack/qa/evil-tests/build.gradle

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
apply plugin: 'elasticsearch.standalone-test'
2+
3+
dependencies {
4+
testCompile project(path: xpackModule('core'), configuration: 'shadow')
5+
}
6+
7+
test {
8+
systemProperty 'tests.security.manager', 'false'
9+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.core.scheduler;
8+
9+
import org.apache.logging.log4j.Logger;
10+
import org.elasticsearch.common.settings.Settings;
11+
import org.elasticsearch.test.ESTestCase;
12+
13+
import java.time.Clock;
14+
import java.util.concurrent.CountDownLatch;
15+
import java.util.concurrent.atomic.AtomicBoolean;
16+
import java.util.concurrent.atomic.AtomicReference;
17+
18+
import static org.hamcrest.Matchers.containsString;
19+
import static org.hamcrest.Matchers.equalTo;
20+
import static org.hamcrest.Matchers.hasToString;
21+
import static org.hamcrest.Matchers.instanceOf;
22+
import static org.hamcrest.Matchers.not;
23+
import static org.mockito.Mockito.mock;
24+
import static org.mockito.Mockito.verifyNoMoreInteractions;
25+
26+
public class EvilSchedulerEngineTests extends ESTestCase {
27+
28+
public void testOutOfMemoryErrorWhileTriggeredIsRethrownAndIsUncaught() throws InterruptedException {
29+
final AtomicReference<Throwable> maybeFatal = new AtomicReference<>();
30+
final CountDownLatch uncaughtLatuch = new CountDownLatch(1);
31+
final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler();
32+
try {
33+
/*
34+
* We want to test that the out of memory error thrown from the scheduler engine goes uncaught on another thread; this gives us
35+
* confidence that an error thrown during a triggered event will lead to the node being torn down.
36+
*/
37+
final AtomicReference<Thread> maybeThread = new AtomicReference<>();
38+
Thread.setDefaultUncaughtExceptionHandler((t, e) -> {
39+
maybeFatal.set(e);
40+
maybeThread.set(Thread.currentThread());
41+
uncaughtLatuch.countDown();
42+
});
43+
final Logger mockLogger = mock(Logger.class);
44+
final SchedulerEngine engine = new SchedulerEngine(Settings.EMPTY, Clock.systemUTC(), mockLogger);
45+
try {
46+
final AtomicBoolean trigger = new AtomicBoolean();
47+
engine.register(event -> {
48+
if (trigger.compareAndSet(false, true)) {
49+
throw new OutOfMemoryError("640K ought to be enough for anybody");
50+
} else {
51+
fail("listener invoked twice");
52+
}
53+
});
54+
final CountDownLatch schedulerLatch = new CountDownLatch(1);
55+
engine.add(new SchedulerEngine.Job(
56+
getTestName(),
57+
(startTime, now) -> {
58+
if (schedulerLatch.getCount() == 1) {
59+
schedulerLatch.countDown();
60+
return 0;
61+
} else {
62+
throw new AssertionError("nextScheduledTimeAfter invoked more than the expected number of times");
63+
}
64+
}));
65+
66+
uncaughtLatuch.await();
67+
assertTrue(trigger.get());
68+
assertNotNull(maybeFatal.get());
69+
assertThat(maybeFatal.get(), instanceOf(OutOfMemoryError.class));
70+
assertThat(maybeFatal.get(), hasToString(containsString("640K ought to be enough for anybody")));
71+
assertNotNull(maybeThread.get());
72+
assertThat(maybeThread.get(), not(equalTo(Thread.currentThread()))); // the error should be rethrown on another thread
73+
schedulerLatch.await();
74+
verifyNoMoreInteractions(mockLogger); // we never logged anything
75+
} finally {
76+
engine.stop();
77+
}
78+
} finally {
79+
// restore the uncaught exception handler
80+
Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler);
81+
}
82+
}
83+
84+
}

0 commit comments

Comments
 (0)