Skip to content

Commit 855b64b

Browse files
authored
Add non-dispatching listenable action future (#24412)
Currently the only implementation of `ListenableActionFuture` requires dispatching listener execution to a thread pool. This commit renames that variant to `DispatchingListenableActionFuture` and converts `AbstractListenableActionFuture` to be a variant that does not require dispatching. That class is now named `PlainListenableActionFuture`.
1 parent 7311aaa commit 855b64b

File tree

5 files changed

+118
-114
lines changed

5 files changed

+118
-114
lines changed

core/src/main/java/org/elasticsearch/action/support/AbstractListenableActionFuture.java

Lines changed: 0 additions & 106 deletions
This file was deleted.

core/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java

Lines changed: 108 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,120 @@
1919

2020
package org.elasticsearch.action.support;
2121

22+
import org.apache.logging.log4j.Logger;
23+
import org.elasticsearch.action.ActionListener;
24+
import org.elasticsearch.action.ListenableActionFuture;
25+
import org.elasticsearch.common.logging.Loggers;
2226
import org.elasticsearch.threadpool.ThreadPool;
2327

24-
public class PlainListenableActionFuture<T> extends AbstractListenableActionFuture<T, T> {
28+
import java.util.ArrayList;
29+
import java.util.List;
2530

26-
public PlainListenableActionFuture(ThreadPool threadPool) {
27-
super(threadPool);
31+
public class PlainListenableActionFuture<T> extends AdapterActionFuture<T, T> implements ListenableActionFuture<T> {
32+
33+
volatile Object listeners;
34+
boolean executedListeners = false;
35+
36+
private PlainListenableActionFuture() {}
37+
38+
/**
39+
* This method returns a listenable future. The listeners will be called on completion of the future.
40+
* The listeners will be executed by the same thread that completes the future.
41+
*
42+
* @param <T> the result of the future
43+
* @return a listenable future
44+
*/
45+
public static <T> PlainListenableActionFuture<T> newListenableFuture() {
46+
return new PlainListenableActionFuture<>();
47+
}
48+
49+
/**
50+
* This method returns a listenable future. The listeners will be called on completion of the future.
51+
* The listeners will be executed on the LISTENER thread pool.
52+
* @param threadPool the thread pool used to execute listeners
53+
* @param <T> the result of the future
54+
* @return a listenable future
55+
*/
56+
public static <T> PlainListenableActionFuture<T> newDispatchingListenableFuture(ThreadPool threadPool) {
57+
return new DispatchingListenableActionFuture<>(threadPool);
2858
}
2959

3060
@Override
31-
protected T convert(T response) {
32-
return response;
61+
public void addListener(final ActionListener<T> listener) {
62+
internalAddListener(listener);
3363
}
3464

65+
@Override
66+
protected void done() {
67+
super.done();
68+
synchronized (this) {
69+
executedListeners = true;
70+
}
71+
Object listeners = this.listeners;
72+
if (listeners != null) {
73+
if (listeners instanceof List) {
74+
List list = (List) listeners;
75+
for (Object listener : list) {
76+
executeListener((ActionListener<T>) listener);
77+
}
78+
} else {
79+
executeListener((ActionListener<T>) listeners);
80+
}
81+
}
82+
}
83+
84+
@Override
85+
protected T convert(T listenerResponse) {
86+
return listenerResponse;
87+
}
88+
89+
private void internalAddListener(ActionListener<T> listener) {
90+
boolean executeImmediate = false;
91+
synchronized (this) {
92+
if (executedListeners) {
93+
executeImmediate = true;
94+
} else {
95+
Object listeners = this.listeners;
96+
if (listeners == null) {
97+
listeners = listener;
98+
} else if (listeners instanceof List) {
99+
((List) this.listeners).add(listener);
100+
} else {
101+
Object orig = listeners;
102+
listeners = new ArrayList<>(2);
103+
((List) listeners).add(orig);
104+
((List) listeners).add(listener);
105+
}
106+
this.listeners = listeners;
107+
}
108+
}
109+
if (executeImmediate) {
110+
executeListener(listener);
111+
}
112+
}
113+
114+
private void executeListener(final ActionListener<T> listener) {
115+
try {
116+
// we use a timeout of 0 to by pass assertion forbidding to call actionGet() (blocking) on a network thread.
117+
// here we know we will never block
118+
listener.onResponse(actionGet(0));
119+
} catch (Exception e) {
120+
listener.onFailure(e);
121+
}
122+
}
123+
124+
private static final class DispatchingListenableActionFuture<T> extends PlainListenableActionFuture<T> {
125+
126+
private static final Logger logger = Loggers.getLogger(DispatchingListenableActionFuture.class);
127+
private final ThreadPool threadPool;
128+
129+
private DispatchingListenableActionFuture(ThreadPool threadPool) {
130+
this.threadPool = threadPool;
131+
}
132+
133+
@Override
134+
public void addListener(final ActionListener<T> listener) {
135+
super.addListener(new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, listener, false));
136+
}
137+
}
35138
}

core/src/test/java/org/elasticsearch/action/support/ListenableActionFutureTests.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,12 @@ public class ListenableActionFutureTests extends ESTestCase {
3434
public void testListenerIsCallableFromNetworkThreads() throws Throwable {
3535
ThreadPool threadPool = new TestThreadPool("testListenerIsCallableFromNetworkThreads");
3636
try {
37-
final PlainListenableActionFuture<Object> future = new PlainListenableActionFuture<>(threadPool);
37+
final PlainListenableActionFuture<Object> future;
38+
if (randomBoolean()) {
39+
future = PlainListenableActionFuture.newDispatchingListenableFuture(threadPool);
40+
} else {
41+
future = PlainListenableActionFuture.newListenableFuture();
42+
}
3843
final CountDownLatch listenerCalled = new CountDownLatch(1);
3944
final AtomicReference<Throwable> error = new AtomicReference<>();
4045
final Object response = new Object();

core/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ protected void doExecute(TestRequest request, ActionListener<TestResponse> liste
9191
}
9292
}
9393

94-
PlainActionFuture<TestResponse> future = new PlainActionFuture<>();
94+
PlainActionFuture<TestResponse> future = PlainActionFuture.newFuture();
95+
9596
transportAction.execute(new TestRequest(), future);
9697
try {
9798
assertThat(future.get(), notNullValue());
@@ -104,6 +105,7 @@ protected void doExecute(TestRequest request, ActionListener<TestResponse> liste
104105
for (ActionFilter actionFilter : actionFilters.filters()) {
105106
testFiltersByLastExecution.add((RequestTestFilter) actionFilter);
106107
}
108+
107109
testFiltersByLastExecution.sort(Comparator.comparingInt(o -> o.executionToken));
108110

109111
ArrayList<RequestTestFilter> finalTestFilters = new ArrayList<>();

core/src/test/java/org/elasticsearch/client/transport/TransportClientRetryIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void testRetry() throws IOException, ExecutionException, InterruptedExcep
6969
if (randomBoolean()) {
7070
clusterState = client.admin().cluster().state(clusterStateRequest).get().getState();
7171
} else {
72-
PlainActionFuture<ClusterStateResponse> future = new PlainActionFuture<>();
72+
PlainActionFuture<ClusterStateResponse> future = PlainActionFuture.newFuture();
7373
client.admin().cluster().state(clusterStateRequest, future);
7474
clusterState = future.get().getState();
7575
}

0 commit comments

Comments
 (0)