Skip to content

Commit 70cee18

Browse files
authored
Introduce StepListener (#37327)
This commit introduces StepListener which provides a simple way to write a flow consisting of multiple asynchronous steps without having nested callbacks. Relates #37291
1 parent 955d3ae commit 70cee18

File tree

3 files changed

+209
-1
lines changed

3 files changed

+209
-1
lines changed
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action;
21+
22+
import org.elasticsearch.common.CheckedConsumer;
23+
import org.elasticsearch.common.util.concurrent.EsExecutors;
24+
import org.elasticsearch.common.util.concurrent.FutureUtils;
25+
import org.elasticsearch.common.util.concurrent.ListenableFuture;
26+
27+
import java.util.function.Consumer;
28+
29+
/**
30+
* A {@link StepListener} provides a simple way to write a flow consisting of
31+
* multiple asynchronous steps without having nested callbacks. For example:
32+
*
33+
* <pre>{@code
34+
* void asyncFlowMethod(... ActionListener<R> flowListener) {
35+
* StepListener<R1> step1 = new StepListener<>();
36+
* asyncStep1(..., step1);
37+
38+
* StepListener<R2> step2 = new StepListener<>();
39+
* step1.whenComplete(r1 -> {
40+
* asyncStep2(r1, ..., step2);
41+
* }, flowListener::onFailure);
42+
*
43+
* step2.whenComplete(r2 -> {
44+
* R1 r1 = step1.result();
45+
* R r = combine(r1, r2);
46+
* flowListener.onResponse(r);
47+
* }, flowListener::onFailure);
48+
* }
49+
* }</pre>
50+
*/
51+
52+
public final class StepListener<Response> implements ActionListener<Response> {
53+
private final ListenableFuture<Response> delegate;
54+
55+
public StepListener() {
56+
this.delegate = new ListenableFuture<>();
57+
}
58+
59+
@Override
60+
public void onResponse(Response response) {
61+
delegate.onResponse(response);
62+
}
63+
64+
@Override
65+
public void onFailure(Exception e) {
66+
delegate.onFailure(e);
67+
}
68+
69+
/**
70+
* Registers the given actions which are called when this step is completed. If this step is completed successfully,
71+
* the {@code onResponse} is called with the result; otherwise the {@code onFailure} is called with the failure.
72+
*
73+
* @param onResponse is called when this step is completed successfully
74+
* @param onFailure is called when this step is completed with a failure
75+
*/
76+
public void whenComplete(CheckedConsumer<Response, Exception> onResponse, Consumer<Exception> onFailure) {
77+
delegate.addListener(ActionListener.wrap(onResponse, onFailure), EsExecutors.newDirectExecutorService(), null);
78+
}
79+
80+
/**
81+
* Gets the result of this step. This method will throw {@link IllegalStateException} if this step is not completed yet.
82+
*/
83+
public Response result() {
84+
if (delegate.isDone() == false) {
85+
throw new IllegalStateException("step is not completed yet");
86+
}
87+
return FutureUtils.get(delegate);
88+
}
89+
}

server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,13 @@ public void addListener(ActionListener<V> listener, ExecutorService executor, Th
6060
if (done) {
6161
run = true;
6262
} else {
63-
listeners.add(new Tuple<>(ContextPreservingActionListener.wrapPreservingContext(listener, threadContext), executor));
63+
final ActionListener<V> wrappedListener;
64+
if (threadContext == null) {
65+
wrappedListener = listener;
66+
} else {
67+
wrappedListener = ContextPreservingActionListener.wrapPreservingContext(listener, threadContext);
68+
}
69+
listeners.add(new Tuple<>(wrappedListener, executor));
6470
run = false;
6571
}
6672
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action;
21+
22+
import org.elasticsearch.test.ESTestCase;
23+
import org.elasticsearch.threadpool.TestThreadPool;
24+
import org.elasticsearch.threadpool.ThreadPool;
25+
import org.junit.After;
26+
import org.junit.Before;
27+
28+
import java.util.concurrent.CountDownLatch;
29+
import java.util.concurrent.atomic.AtomicInteger;
30+
import java.util.function.Consumer;
31+
32+
import static org.hamcrest.Matchers.equalTo;
33+
34+
public class StepListenerTests extends ESTestCase {
35+
private ThreadPool threadPool;
36+
37+
@Before
38+
public void setUpThreadPool() {
39+
threadPool = new TestThreadPool(getTestName());
40+
}
41+
42+
@After
43+
public void tearDownThreadPool() {
44+
terminate(threadPool);
45+
}
46+
47+
public void testSimpleSteps() throws Exception {
48+
CountDownLatch latch = new CountDownLatch(1);
49+
Consumer<Exception> onFailure = e -> {
50+
latch.countDown();
51+
fail("test a happy path");
52+
};
53+
54+
StepListener<String> step1 = new StepListener<>(); //[a]sync provide a string
55+
executeAction(() -> step1.onResponse("hello"));
56+
StepListener<Integer> step2 = new StepListener<>(); //[a]sync calculate the length of the string
57+
step1.whenComplete(str -> executeAction(() -> step2.onResponse(str.length())), onFailure);
58+
step2.whenComplete(length -> executeAction(latch::countDown), onFailure);
59+
latch.await();
60+
assertThat(step1.result(), equalTo("hello"));
61+
assertThat(step2.result(), equalTo(5));
62+
}
63+
64+
public void testAbortOnFailure() throws Exception {
65+
CountDownLatch latch = new CountDownLatch(1);
66+
int failedStep = randomBoolean() ? 1 : 2;
67+
AtomicInteger failureNotified = new AtomicInteger();
68+
Consumer<Exception> onFailure = e -> {
69+
failureNotified.getAndIncrement();
70+
latch.countDown();
71+
assertThat(e.getMessage(), equalTo("failed at step " + failedStep));
72+
};
73+
74+
StepListener<String> step1 = new StepListener<>(); //[a]sync provide a string
75+
if (failedStep == 1) {
76+
executeAction(() -> step1.onFailure(new RuntimeException("failed at step 1")));
77+
} else {
78+
executeAction(() -> step1.onResponse("hello"));
79+
}
80+
81+
StepListener<Integer> step2 = new StepListener<>(); //[a]sync calculate the length of the string
82+
step1.whenComplete(str -> {
83+
if (failedStep == 2) {
84+
executeAction(() -> step2.onFailure(new RuntimeException("failed at step 2")));
85+
} else {
86+
executeAction(() -> step2.onResponse(str.length()));
87+
}
88+
}, onFailure);
89+
90+
step2.whenComplete(length -> latch.countDown(), onFailure);
91+
latch.await();
92+
assertThat(failureNotified.get(), equalTo(1));
93+
94+
if (failedStep == 1) {
95+
assertThat(expectThrows(RuntimeException.class, step1::result).getMessage(),
96+
equalTo("failed at step 1"));
97+
assertThat(expectThrows(RuntimeException.class, step2::result).getMessage(),
98+
equalTo("step is not completed yet"));
99+
} else {
100+
assertThat(step1.result(), equalTo("hello"));
101+
assertThat(expectThrows(RuntimeException.class, step2::result).getMessage(),
102+
equalTo("failed at step 2"));
103+
}
104+
}
105+
106+
private void executeAction(Runnable runnable) {
107+
if (randomBoolean()) {
108+
threadPool.generic().execute(runnable);
109+
} else {
110+
runnable.run();
111+
}
112+
}
113+
}

0 commit comments

Comments
 (0)