From f359d5b1d845d6c3f16f5a4aea18bfc5b711a43f Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 20 Aug 2018 15:16:02 -0400 Subject: [PATCH 1/5] Protect scheduler engine against throwing listeners There are two problems with the scheduler engine today. Both relate to listeners that throw. The first problem is that any triggered listener that throws a plain old exception will cause no additional listeners to be triggered for the event, and will also cause the scheduler to never be invoked again. This leads to lost events and is bad. The second problem is that any triggered listener that throws an error of the fatal kind will not lead to that error because caught by the uncaught exception handler. This is because the triggered listener is executed as a future task under a scheduled thread pool executor. A throwable there goes caught by the JDK framework and set as the outcome on the future task. Since we never inspect these tasks for their outcomes, nor is there a good place to do this, we have to handle these errors ourselves. To do this, we catch them and dispatch them to the uncaught exception handler via a forked thread. This is similar to our handling in Netty. --- .../org/elasticsearch/ExceptionsHelper.java | 22 +-- .../xpack/core/scheduler/SchedulerEngine.java | 44 +++++- .../core/scheduler/SchedulerEngineTests.java | 141 ++++++++++++++++++ x-pack/qa/evil-tests/build.gradle | 9 ++ .../scheduler/EvilSchedulerEngineTests.java | 84 +++++++++++ 5 files changed, 281 insertions(+), 19 deletions(-) create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngineTests.java create mode 100644 x-pack/qa/evil-tests/build.gradle create mode 100644 x-pack/qa/evil-tests/src/test/java/org/elasticsearch/xpack/core/scheduler/EvilSchedulerEngineTests.java diff --git a/server/src/main/java/org/elasticsearch/ExceptionsHelper.java b/server/src/main/java/org/elasticsearch/ExceptionsHelper.java index 59eb8b60dadba..09347f519fb22 100644 --- a/server/src/main/java/org/elasticsearch/ExceptionsHelper.java +++ b/server/src/main/java/org/elasticsearch/ExceptionsHelper.java @@ -250,13 +250,13 @@ public static boolean reThrowIfNotNull(@Nullable Throwable e) { * @param throwable the throwable to test */ public static void dieOnError(Throwable throwable) { - final Optional maybeError = ExceptionsHelper.maybeError(throwable, logger); - if (maybeError.isPresent()) { + ExceptionsHelper.maybeError(throwable, logger).ifPresent(error -> { /* - * Here be dragons. We want to rethrow this so that it bubbles up to the uncaught exception handler. Yet, Netty wraps too many - * invocations of user-code in try/catch blocks that swallow all throwables. This means that a rethrow here will not bubble up - * to where we want it to. So, we fork a thread and throw the exception from there where Netty can not get to it. We do not wrap - * the exception so as to not lose the original cause during exit. + * Here be dragons. We want to rethrow this so that it bubbles up to the uncaught exception handler. Yet, sometimes the stack + * contains statements that catch any throwable (e.g., Netty, and the JDK futures framework). This means that a rethrow here + * will not bubble up to where we want it to. So, we fork a thread and throw the exception from there where we are sure the + * stack does not contain statements that catch any throwable. We do not wrap the exception so as to not lose the original cause + * during exit. */ try { // try to log the current stack trace @@ -264,12 +264,12 @@ public static void dieOnError(Throwable throwable) { logger.error("fatal error\n{}", formatted); } finally { new Thread( - () -> { - throw maybeError.get(); - }) - .start(); + () -> { + throw error; + }) + .start(); } - } + }); } /** diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java index ffc0257313b36..97bb867bf8bb0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java @@ -3,8 +3,12 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ + package org.elasticsearch.xpack.core.scheduler; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -14,6 +18,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -89,13 +94,19 @@ public interface Schedule { } private final Map schedules = ConcurrentCollections.newConcurrentMap(); - private final ScheduledExecutorService scheduler; private final Clock clock; + private final ScheduledExecutorService scheduler; + private final Logger logger; private final List listeners = new CopyOnWriteArrayList<>(); - public SchedulerEngine(Settings settings, Clock clock) { - this.clock = clock; - this.scheduler = Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(settings, "trigger_engine_scheduler")); + public SchedulerEngine(final Settings settings, final Clock clock) { + this(settings, clock, LogManager.getLogger(SchedulerEngine.class)); + } + + SchedulerEngine(final Settings settings, final Clock clock, final Logger logger) { + this.clock = Objects.requireNonNull(clock, "clock"); + this.scheduler = Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(settings, "trigger_engine_scheduler")); + this.logger = Objects.requireNonNull(logger, "logger"); } public void register(Listener listener) { @@ -146,8 +157,13 @@ public int jobCount() { protected void notifyListeners(String name, long triggeredTime, long scheduledTime) { final Event event = new Event(name, triggeredTime, scheduledTime); - for (Listener listener : listeners) { - listener.triggered(event); + for (final Listener listener : listeners) { + try { + listener.triggered(event); + } catch (final Exception e) { + // do not allow exceptions to escape this method; we should continue to notify listeners and schedule the next run + logger.warn("listener failed while handling triggered event [{}]", name); + } } } @@ -169,8 +185,20 @@ class ActiveSchedule implements Runnable { @Override public void run() { - long triggeredTime = clock.millis(); - notifyListeners(name, triggeredTime, scheduledTime); + final long triggeredTime = clock.millis(); + try { + notifyListeners(name, triggeredTime, scheduledTime); + } catch (final Throwable t) { + /* + * Allowing the throwable to escape here will lead to be it being caught in FutureTask#run and set as the outcome of this + * task; however, we never inspect the the outcomes of these scheduled tasks and so allowing the throwable to escape + * unhandled here could lead to use losing fatal errors. Instead, we rely on ExceptionsHelper#dieOnError to appropriately + * dispatch any error to the uncaught exception handler. We should never see an exception here as these do not escape from + * SchedulerEngine#notifyListeners. + */ + ExceptionsHelper.dieOnError(t); + throw t; + } scheduleNextRun(triggeredTime); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngineTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngineTests.java new file mode 100644 index 0000000000000..fd8333ebc2693 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngineTests.java @@ -0,0 +1,141 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.scheduler; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; + +import java.time.Clock; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +public class SchedulerEngineTests extends ESTestCase { + + public void testListenersThrowingExceptionsDoNotCauseOtherListenersToBeSkipped() throws InterruptedException { + final Logger mockLogger = mock(Logger.class); + final SchedulerEngine engine = new SchedulerEngine(Settings.EMPTY, Clock.systemUTC(), mockLogger); + try { + final List> listeners = new ArrayList<>(); + final int numberOfListeners = randomIntBetween(1, 32); + int numberOfFailingListeners = 0; + final CountDownLatch latch = new CountDownLatch(numberOfListeners); + for (int i = 0; i < numberOfListeners; i++) { + final AtomicBoolean trigger = new AtomicBoolean(); + final SchedulerEngine.Listener listener; + if (randomBoolean()) { + listener = event -> { + if (trigger.compareAndSet(false, true)) { + latch.countDown(); + } else { + fail("listener invoked twice"); + } + }; + } else { + numberOfFailingListeners++; + listener = event -> { + if (trigger.compareAndSet(false, true)) { + latch.countDown(); + throw new RuntimeException(); + } else { + fail("listener invoked twice"); + } + }; + } + listeners.add(Tuple.tuple(listener, trigger)); + } + + // randomize the order and register the listeners + Collections.shuffle(listeners, random()); + listeners.stream().map(Tuple::v1).forEach(engine::register); + + final AtomicBoolean scheduled = new AtomicBoolean(); + engine.add(new SchedulerEngine.Job( + getTestName(), + (startTime, now) -> { + // only allow one triggering of the listeners + if (scheduled.compareAndSet(false, true)) { + return 0; + } else { + return -1; + } + })); + + latch.await(); + + // now check that every listener was invoked + assertTrue(listeners.stream().map(Tuple::v2).allMatch(AtomicBoolean::get)); + if (numberOfFailingListeners > 0) { + verify(mockLogger, times(numberOfFailingListeners)) + .warn("listener failed while handling triggered event [{}]", getTestName()); + } + verifyNoMoreInteractions(mockLogger); + } finally { + engine.stop(); + } + } + + public void testListenersThrowingExceptionsDoNotCauseNextScheduledTaskToBeSkipped() throws InterruptedException { + final Logger mockLogger = mock(Logger.class); + final SchedulerEngine engine = new SchedulerEngine(Settings.EMPTY, Clock.systemUTC(), mockLogger); + try { + final List> listeners = new ArrayList<>(); + final int numberOfListeners = randomIntBetween(1, 32); + final int numberOfSchedules = randomIntBetween(1, 32); + final CountDownLatch listenersLatch = new CountDownLatch(numberOfSchedules * numberOfListeners); + for (int i = 0; i < numberOfListeners; i++) { + final AtomicInteger triggerCount = new AtomicInteger(); + final SchedulerEngine.Listener listener = event -> { + if (triggerCount.incrementAndGet() <= numberOfSchedules) { + listenersLatch.countDown(); + throw new RuntimeException(); + } else { + fail("listener invoked more than [" + numberOfSchedules + "] times"); + } + }; + listeners.add(Tuple.tuple(listener, triggerCount)); + engine.register(listener); + } + + // latch for each invocation of nextScheduledTimeAfter, once for each scheduled run, and then a final time when we disable + final CountDownLatch latch = new CountDownLatch(1 + numberOfSchedules); + engine.add(new SchedulerEngine.Job( + getTestName(), + (startTime, now) -> { + if (latch.getCount() >= 2) { + latch.countDown(); + return 0; + } else if (latch.getCount() == 1) { + latch.countDown(); + return -1; + } else { + throw new AssertionError("nextScheduledTimeAfter invoked more than the expected number of times"); + } + })); + + listenersLatch.await(); + assertTrue(listeners.stream().map(Tuple::v2).allMatch(count -> count.get() == numberOfSchedules)); + latch.await(); + verify(mockLogger, times(numberOfListeners * numberOfSchedules)) + .warn("listener failed while handling triggered event [{}]", getTestName()); + verifyNoMoreInteractions(mockLogger); + } finally { + engine.stop(); + } + } + +} \ No newline at end of file diff --git a/x-pack/qa/evil-tests/build.gradle b/x-pack/qa/evil-tests/build.gradle new file mode 100644 index 0000000000000..9b6055ffad7d9 --- /dev/null +++ b/x-pack/qa/evil-tests/build.gradle @@ -0,0 +1,9 @@ +apply plugin: 'elasticsearch.standalone-test' + +dependencies { + testCompile project(path: xpackModule('core'), configuration: 'shadow') +} + +test { + systemProperty 'tests.security.manager', 'false' +} diff --git a/x-pack/qa/evil-tests/src/test/java/org/elasticsearch/xpack/core/scheduler/EvilSchedulerEngineTests.java b/x-pack/qa/evil-tests/src/test/java/org/elasticsearch/xpack/core/scheduler/EvilSchedulerEngineTests.java new file mode 100644 index 0000000000000..2dfd314ffb06e --- /dev/null +++ b/x-pack/qa/evil-tests/src/test/java/org/elasticsearch/xpack/core/scheduler/EvilSchedulerEngineTests.java @@ -0,0 +1,84 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.scheduler; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; + +import java.time.Clock; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.not; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +public class EvilSchedulerEngineTests extends ESTestCase { + + public void testOutOfMemoryErrorWhileTriggeredIsRethrownAndIsUncaught() throws InterruptedException { + final AtomicReference maybeFatal = new AtomicReference<>(); + final CountDownLatch uncaughtLatuch = new CountDownLatch(1); + final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler(); + try { + /* + * We want to test that the out of memory error thrown from the scheduler engine goes uncaught on another thread; this gives us + * confidence that an error thrown during a triggered event will lead to the node being torn down. + */ + final AtomicReference maybeThread = new AtomicReference<>(); + Thread.setDefaultUncaughtExceptionHandler((t, e) -> { + maybeFatal.set(e); + maybeThread.set(Thread.currentThread()); + uncaughtLatuch.countDown(); + }); + final Logger mockLogger = mock(Logger.class); + final SchedulerEngine engine = new SchedulerEngine(Settings.EMPTY, Clock.systemUTC(), mockLogger); + try { + final AtomicBoolean trigger = new AtomicBoolean(); + engine.register(event -> { + if (trigger.compareAndSet(false, true)) { + throw new OutOfMemoryError("640K ought to be enough for anybody"); + } else { + fail("listener invoked twice"); + } + }); + final CountDownLatch schedulerLatch = new CountDownLatch(1); + engine.add(new SchedulerEngine.Job( + getTestName(), + (startTime, now) -> { + if (schedulerLatch.getCount() == 1) { + schedulerLatch.countDown(); + return 0; + } else { + throw new AssertionError("nextScheduledTimeAfter invoked more than the expected number of times"); + } + })); + + uncaughtLatuch.await(); + assertTrue(trigger.get()); + assertNotNull(maybeFatal.get()); + assertThat(maybeFatal.get(), instanceOf(OutOfMemoryError.class)); + assertThat(maybeFatal.get(), hasToString(containsString("640K ought to be enough for anybody"))); + assertNotNull(maybeThread.get()); + assertThat(maybeThread.get(), not(equalTo(Thread.currentThread()))); // the error should be rethrown on another thread + schedulerLatch.await(); + verifyNoMoreInteractions(mockLogger); // we never logged anything + } finally { + engine.stop(); + } + } finally { + // restore the uncaught exception handler + Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler); + } + } + +} From 1edd8a29ea16f0996ffe580cc08652b1a47bb00b Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 20 Aug 2018 15:23:43 -0400 Subject: [PATCH 2/5] Add missing newline --- .../xpack/core/scheduler/SchedulerEngineTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngineTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngineTests.java index fd8333ebc2693..1b5c251ee5320 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngineTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngineTests.java @@ -138,4 +138,4 @@ public void testListenersThrowingExceptionsDoNotCauseNextScheduledTaskToBeSkippe } } -} \ No newline at end of file +} From 32b8a1f8c47b9fd01ac1429cad1826e64f621464 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 20 Aug 2018 15:26:57 -0400 Subject: [PATCH 3/5] Early guard for NPE --- .../elasticsearch/xpack/core/scheduler/SchedulerEngine.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java index 97bb867bf8bb0..c5821b9a0a226 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java @@ -105,7 +105,8 @@ public SchedulerEngine(final Settings settings, final Clock clock) { SchedulerEngine(final Settings settings, final Clock clock, final Logger logger) { this.clock = Objects.requireNonNull(clock, "clock"); - this.scheduler = Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(settings, "trigger_engine_scheduler")); + this.scheduler = Executors.newScheduledThreadPool( + 1, EsExecutors.daemonThreadFactory(Objects.requireNonNull(settings, "settings"), "trigger_engine_scheduler")); this.logger = Objects.requireNonNull(logger, "logger"); } From 2c9fa09feece78f87158038229cf3228b2c14d0d Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 20 Aug 2018 15:27:50 -0400 Subject: [PATCH 4/5] Last cleanup --- .../org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java index c5821b9a0a226..aed630b538ba0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java @@ -156,7 +156,7 @@ public int jobCount() { return schedules.size(); } - protected void notifyListeners(String name, long triggeredTime, long scheduledTime) { + protected void notifyListeners(final String name, final long triggeredTime, final long scheduledTime) { final Event event = new Event(name, triggeredTime, scheduledTime); for (final Listener listener : listeners) { try { From 85283277a14e5721432155db236ad11b63ade548 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 20 Aug 2018 18:13:13 -0400 Subject: [PATCH 5/5] Log exception --- .../xpack/core/scheduler/SchedulerEngine.java | 3 +- .../core/scheduler/SchedulerEngineTests.java | 30 +++++++++++++++---- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java index aed630b538ba0..71abb61bdcabe 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; @@ -163,7 +164,7 @@ protected void notifyListeners(final String name, final long triggeredTime, fina listener.triggered(event); } catch (final Exception e) { // do not allow exceptions to escape this method; we should continue to notify listeners and schedule the next run - logger.warn("listener failed while handling triggered event [{}]", name); + logger.warn(new ParameterizedMessage("listener failed while handling triggered event [{}]", name), e); } } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngineTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngineTests.java index 1b5c251ee5320..869a320fb6386 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngineTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngineTests.java @@ -7,9 +7,11 @@ package org.elasticsearch.xpack.core.scheduler; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; +import org.mockito.ArgumentCaptor; import java.time.Clock; import java.util.ArrayList; @@ -19,6 +21,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import static org.hamcrest.Matchers.arrayWithSize; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -50,7 +55,7 @@ public void testListenersThrowingExceptionsDoNotCauseOtherListenersToBeSkipped() listener = event -> { if (trigger.compareAndSet(false, true)) { latch.countDown(); - throw new RuntimeException(); + throw new RuntimeException(getTestName()); } else { fail("listener invoked twice"); } @@ -80,8 +85,7 @@ public void testListenersThrowingExceptionsDoNotCauseOtherListenersToBeSkipped() // now check that every listener was invoked assertTrue(listeners.stream().map(Tuple::v2).allMatch(AtomicBoolean::get)); if (numberOfFailingListeners > 0) { - verify(mockLogger, times(numberOfFailingListeners)) - .warn("listener failed while handling triggered event [{}]", getTestName()); + assertFailedListenerLogMessage(mockLogger, numberOfFailingListeners); } verifyNoMoreInteractions(mockLogger); } finally { @@ -102,7 +106,7 @@ public void testListenersThrowingExceptionsDoNotCauseNextScheduledTaskToBeSkippe final SchedulerEngine.Listener listener = event -> { if (triggerCount.incrementAndGet() <= numberOfSchedules) { listenersLatch.countDown(); - throw new RuntimeException(); + throw new RuntimeException(getTestName()); } else { fail("listener invoked more than [" + numberOfSchedules + "] times"); } @@ -130,12 +134,26 @@ public void testListenersThrowingExceptionsDoNotCauseNextScheduledTaskToBeSkippe listenersLatch.await(); assertTrue(listeners.stream().map(Tuple::v2).allMatch(count -> count.get() == numberOfSchedules)); latch.await(); - verify(mockLogger, times(numberOfListeners * numberOfSchedules)) - .warn("listener failed while handling triggered event [{}]", getTestName()); + assertFailedListenerLogMessage(mockLogger, numberOfListeners * numberOfSchedules); verifyNoMoreInteractions(mockLogger); } finally { engine.stop(); } } + private void assertFailedListenerLogMessage(Logger mockLogger, int times) { + final ArgumentCaptor messageCaptor = ArgumentCaptor.forClass(ParameterizedMessage.class); + final ArgumentCaptor throwableCaptor = ArgumentCaptor.forClass(Throwable.class); + verify(mockLogger, times(times)).warn(messageCaptor.capture(), throwableCaptor.capture()); + for (final ParameterizedMessage message : messageCaptor.getAllValues()) { + assertThat(message.getFormat(), equalTo("listener failed while handling triggered event [{}]")); + assertThat(message.getParameters(), arrayWithSize(1)); + assertThat(message.getParameters()[0], equalTo(getTestName())); + } + for (final Throwable throwable : throwableCaptor.getAllValues()) { + assertThat(throwable, instanceOf(RuntimeException.class)); + assertThat(throwable.getMessage(), equalTo(getTestName())); + } + } + }