Skip to content

Commit ba671e4

Browse files
authored
Use rejection exception in ThreadedActionListener (#94363)
Today if the exceptional completion of a `ThreadedActionListener` is rejected from its executor then the listener is completed with the original exception on the completing thread, with the exception representing the rejection added to its suppressed exceptions list. In practice this is a little trappy, we may want to handle the rejection differently but won't always remember to check the suppressed exceptions list for a rejection. This commit reverses the order of exceptions passed to the delegate listener in this case: the rejection exception is at the top level, with the original exception added to its suppressed exceptions list.
1 parent 4adc2ae commit ba671e4

File tree

2 files changed

+37
-7
lines changed

2 files changed

+37
-7
lines changed

server/src/main/java/org/elasticsearch/action/support/ThreadedActionListener.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,13 @@ protected void doRun() {
7171
}
7272

7373
@Override
74-
public void onRejection(Exception e2) {
75-
e.addSuppressed(e2);
74+
public void onRejection(Exception rejectionException) {
75+
rejectionException.addSuppressed(e);
7676
try {
77-
delegate.onFailure(e);
78-
} catch (Exception e3) {
79-
e.addSuppressed(e3);
80-
onFailure(e);
77+
delegate.onFailure(rejectionException);
78+
} catch (Exception doubleFailure) {
79+
rejectionException.addSuppressed(doubleFailure);
80+
onFailure(rejectionException);
8181
}
8282
}
8383

server/src/test/java/org/elasticsearch/action/support/ThreadedActionListenerTests.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.action.ActionListener;
1313
import org.elasticsearch.common.settings.Settings;
1414
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
15+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
1516
import org.elasticsearch.core.TimeValue;
1617
import org.elasticsearch.test.ESTestCase;
1718
import org.elasticsearch.threadpool.FixedExecutorBuilder;
@@ -50,7 +51,36 @@ public void testRejectionHandling() throws InterruptedException {
5051
final var listener = new ThreadedActionListener<Void>(
5152
threadPool.executor(pool),
5253
(pool.equals("fixed-bounded-queue") || pool.startsWith("scaling")) && rarely(),
53-
ActionListener.running(countdownLatch::countDown)
54+
ActionListener.runAfter(new ActionListener<>() {
55+
@Override
56+
public void onResponse(Void ignored) {}
57+
58+
@Override
59+
public void onFailure(Exception e) {
60+
assertNull(e.getCause());
61+
if (e instanceof EsRejectedExecutionException esRejectedExecutionException) {
62+
assertTrue(esRejectedExecutionException.isExecutorShutdown());
63+
if (e.getSuppressed().length == 0) {
64+
return;
65+
}
66+
assertEquals(1, e.getSuppressed().length);
67+
if (e.getSuppressed()[0]instanceof ElasticsearchException elasticsearchException) {
68+
e = elasticsearchException;
69+
assertNull(e.getCause());
70+
} else {
71+
throw new AssertionError("unexpected", e);
72+
}
73+
}
74+
75+
if (e instanceof ElasticsearchException) {
76+
assertEquals("simulated", e.getMessage());
77+
assertEquals(0, e.getSuppressed().length);
78+
} else {
79+
throw new AssertionError("unexpected", e);
80+
}
81+
82+
}
83+
}, countdownLatch::countDown)
5484
);
5585
synchronized (closeFlag) {
5686
if (closeFlag.get() && shutdownUnsafePools.contains(pool)) {

0 commit comments

Comments
 (0)