From b635dff500ae1c83bc91921a6de46d6da700d61b Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 10 Jan 2019 14:42:23 -0500 Subject: [PATCH 1/4] Introduce StepListener This commit introduces StepListener which provides a simple way to write a flow consisting of multiple asynchronous steps without having nested callbacks. --- .../elasticsearch/action/StepListener.java | 118 ++++++++++++++++++ .../action/StepListenerTests.java | 112 +++++++++++++++++ 2 files changed, 230 insertions(+) create mode 100644 server/src/main/java/org/elasticsearch/action/StepListener.java create mode 100644 server/src/test/java/org/elasticsearch/action/StepListenerTests.java diff --git a/server/src/main/java/org/elasticsearch/action/StepListener.java b/server/src/main/java/org/elasticsearch/action/StepListener.java new file mode 100644 index 0000000000000..26b6bb2bf271b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/StepListener.java @@ -0,0 +1,118 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action; + +import org.elasticsearch.common.CheckedConsumer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; + +/** + * A {@link StepListener} provides a simple way to write a flow consisting of + * multiple asynchronous steps without having nested callbacks. For example: + * + *
{@code
+ *  StepListener step1 = new StepListener<>();
+ *  asyncStep1(..., step1);
+ *
+ *  StepListener step2 = new StepListener<>();
+ *  step1.whenComplete(r1 -> {
+ *      asyncStep2(r1, ..., step2);
+ *  }, onFailure);
+ *
+ *  step2.whenComplete(r2 -> {
+ *      R1 r1 = step1.result();
+ *      R r = combine(r1, r2);
+ *     outerListener.onResponse(r);
+ *  }, onFailure);
+ *
+ * }
+ */ + +public final class StepListener implements ActionListener { + private volatile boolean done = false; + private volatile Response result = null; + private volatile Exception error = null; + private final List> listeners = new ArrayList<>(); + + @Override + public void onResponse(Response response) { + if (done == false) { + final List> listeners; + synchronized (this) { + this.result = response; + this.done = true; + listeners = this.listeners; + } + ActionListener.onResponse(listeners, response); + } + } + + @Override + public void onFailure(Exception e) { + if (done == false) { + final List> listeners; + synchronized (this) { + this.error = e; + this.done = true; + listeners = this.listeners; + } + ActionListener.onFailure(listeners, e); + } + } + + /** + * Registers the given actions which are invoked this step is completed. If this step is completed successfully, + * the {@code onResponse} is called with the result; otherwise the {@code onFailure} is called with the failure. + * + * @param onResponse is called when this step is completed successfully + * @param onFailure is called when this step is completed with a failure + */ + public void whenComplete(CheckedConsumer onResponse, Consumer onFailure) { + final ActionListener listener = ActionListener.wrap(onResponse, onFailure); + if (done) { + if (error == null) { + ActionListener.onResponse(Collections.singletonList(listener), result); + } else { + ActionListener.onFailure(Collections.singletonList(listener), error); + } + } else { + synchronized (this) { + listeners.add(listener); + } + } + } + + /** + * Gets the result of this step. This method will throw {@link IllegalArgumentException} + * if this step is not completed yet or completed with a failure. + */ + public Response result() { + if (done == false) { + throw new IllegalStateException("step is not completed yet"); + } + if (error != null) { + throw new IllegalStateException("step is completed with a failure", error); + } + return result; + } +} diff --git a/server/src/test/java/org/elasticsearch/action/StepListenerTests.java b/server/src/test/java/org/elasticsearch/action/StepListenerTests.java new file mode 100644 index 0000000000000..2135af3d73011 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/StepListenerTests.java @@ -0,0 +1,112 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action; + +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +import static org.hamcrest.Matchers.equalTo; + +public class StepListenerTests extends ESTestCase { + private ThreadPool threadPool; + + @Before + public void setUpThreadPool() { + threadPool = new TestThreadPool(getTestName()); + } + + @After + public void tearDownThreadPool() { + terminate(threadPool); + } + + public void testSimpleSteps() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + Consumer onFailure = e -> { + latch.countDown(); + fail("test a happy path"); + }; + + StepListener step1 = new StepListener<>(); //[a]sync provide a string + executeAction(() -> step1.onResponse("hello")); + StepListener step2 = new StepListener<>(); //[a]sync calculate the length of the string + step1.whenComplete(str -> executeAction(() -> step2.onResponse(str.length())), onFailure); + step2.whenComplete(length -> executeAction(latch::countDown), onFailure); + latch.await(); + assertThat(step1.result(), equalTo("hello")); + assertThat(step2.result(), equalTo(5)); + } + + public void testAbortOnFailure() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + int failedStep = randomBoolean() ? 1 : 2; + AtomicInteger failureNotified = new AtomicInteger(); + Consumer onFailure = e -> { + failureNotified.getAndIncrement(); + latch.countDown(); + assertThat(e.getMessage(), equalTo("failed at step " + failedStep)); + }; + + StepListener step1 = new StepListener<>(); //[a]sync provide a string + if (failedStep == 1) { + executeAction(() -> step1.onFailure(new RuntimeException("failed at step 1"))); + } else { + executeAction(() -> step1.onResponse("hello")); + } + + StepListener step2 = new StepListener<>(); //[a]sync calculate the length of the string + step1.whenComplete(str -> { + if (failedStep == 2) { + executeAction(() -> step2.onFailure(new RuntimeException("failed at step 2"))); + } else { + executeAction(() -> step2.onResponse(str.length())); + } + }, onFailure); + + step2.whenComplete(length -> latch.countDown(), onFailure); + latch.await(); + assertThat(failureNotified.get(), equalTo(1)); + if (failedStep == 1) { + assertThat(expectThrows(IllegalStateException.class, step1::result).getMessage(), + equalTo("step is completed with a failure")); + assertThat(expectThrows(IllegalStateException.class, step2::result).getMessage(), + equalTo("step is not completed yet")); + } else { + assertThat(step1.result(), equalTo("hello")); + assertThat(expectThrows(IllegalStateException.class, step2::result).getMessage(), + equalTo("step is completed with a failure")); + } + } + + private void executeAction(Runnable runnable) { + if (randomBoolean()) { + threadPool.generic().execute(runnable); + } else { + runnable.run(); + } + } +} From 28f873c844cce58ebdfaa763e1b46ad449fbecdb Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 10 Jan 2019 21:54:06 -0500 Subject: [PATCH 2/4] wording --- .../elasticsearch/action/StepListener.java | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/StepListener.java b/server/src/main/java/org/elasticsearch/action/StepListener.java index 26b6bb2bf271b..7933f4265457f 100644 --- a/server/src/main/java/org/elasticsearch/action/StepListener.java +++ b/server/src/main/java/org/elasticsearch/action/StepListener.java @@ -31,20 +31,21 @@ * multiple asynchronous steps without having nested callbacks. For example: * *
{@code
- *  StepListener step1 = new StepListener<>();
- *  asyncStep1(..., step1);
- *
- *  StepListener step2 = new StepListener<>();
- *  step1.whenComplete(r1 -> {
+ *  void asyncFlowMethod(... ActionListener flowListener) {
+ *    StepListener step1 = new StepListener<>();
+ *    asyncStep1(..., step1);
+
+ *    StepListener step2 = new StepListener<>();
+ *    step1.whenComplete(r1 -> {
  *      asyncStep2(r1, ..., step2);
- *  }, onFailure);
+ *    }, flowListener::onFailure);
  *
- *  step2.whenComplete(r2 -> {
+ *    step2.whenComplete(r2 -> {
  *      R1 r1 = step1.result();
  *      R r = combine(r1, r2);
- *     outerListener.onResponse(r);
- *  }, onFailure);
- *
+ *     flowListener.onResponse(r);
+ *    }, flowListener::onFailure);
+ *  }
  * }
*/ @@ -81,7 +82,7 @@ public void onFailure(Exception e) { } /** - * Registers the given actions which are invoked this step is completed. If this step is completed successfully, + * Registers the given actions which are called when this step is completed. If this step is completed successfully, * the {@code onResponse} is called with the result; otherwise the {@code onFailure} is called with the failure. * * @param onResponse is called when this step is completed successfully From f858e9d00c1de6b30ddfa4283faa804550422ed8 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 10 Jan 2019 22:55:04 -0500 Subject: [PATCH 3/4] fix concurrency issue --- .../elasticsearch/action/StepListener.java | 41 ++++++++++--------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/StepListener.java b/server/src/main/java/org/elasticsearch/action/StepListener.java index 7933f4265457f..ff2a46369a7ff 100644 --- a/server/src/main/java/org/elasticsearch/action/StepListener.java +++ b/server/src/main/java/org/elasticsearch/action/StepListener.java @@ -57,30 +57,30 @@ public final class StepListener implements ActionListener { @Override public void onResponse(Response response) { - if (done == false) { - final List> listeners; - synchronized (this) { - this.result = response; - this.done = true; - listeners = this.listeners; - } + if (onComplete(response, null)) { ActionListener.onResponse(listeners, response); } } @Override public void onFailure(Exception e) { - if (done == false) { - final List> listeners; - synchronized (this) { - this.error = e; - this.done = true; - listeners = this.listeners; - } + if (onComplete(null, e)) { ActionListener.onFailure(listeners, e); } } + /** Returns {@code true} if this method changed the state of this step listener */ + private synchronized boolean onComplete(Response response, Exception e) { + if (done == false) { + this.error = e; + this.result = response; + this.done = true; + return true; + } else { + return false; + } + } + /** * Registers the given actions which are called when this step is completed. If this step is completed successfully, * the {@code onResponse} is called with the result; otherwise the {@code onFailure} is called with the failure. @@ -90,16 +90,19 @@ public void onFailure(Exception e) { */ public void whenComplete(CheckedConsumer onResponse, Consumer onFailure) { final ActionListener listener = ActionListener.wrap(onResponse, onFailure); - if (done) { + final boolean ready; + synchronized (this) { + ready = done; + if (ready == false) { + listeners.add(listener); + } + } + if (ready) { if (error == null) { ActionListener.onResponse(Collections.singletonList(listener), result); } else { ActionListener.onFailure(Collections.singletonList(listener), error); } - } else { - synchronized (this) { - listeners.add(listener); - } } } From e9ed6e1515712a6915be016f2a49409b876cd5cf Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 10 Jan 2019 23:28:09 -0500 Subject: [PATCH 4/4] reuse ListenableFuture --- .../elasticsearch/action/StepListener.java | 61 +++++-------------- .../util/concurrent/ListenableFuture.java | 8 ++- .../action/StepListenerTests.java | 11 ++-- 3 files changed, 27 insertions(+), 53 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/StepListener.java b/server/src/main/java/org/elasticsearch/action/StepListener.java index ff2a46369a7ff..efbf8c755d57c 100644 --- a/server/src/main/java/org/elasticsearch/action/StepListener.java +++ b/server/src/main/java/org/elasticsearch/action/StepListener.java @@ -20,10 +20,10 @@ package org.elasticsearch.action; import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.common.util.concurrent.ListenableFuture; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.util.function.Consumer; /** @@ -50,35 +50,20 @@ */ public final class StepListener implements ActionListener { - private volatile boolean done = false; - private volatile Response result = null; - private volatile Exception error = null; - private final List> listeners = new ArrayList<>(); + private final ListenableFuture delegate; + + public StepListener() { + this.delegate = new ListenableFuture<>(); + } @Override public void onResponse(Response response) { - if (onComplete(response, null)) { - ActionListener.onResponse(listeners, response); - } + delegate.onResponse(response); } @Override public void onFailure(Exception e) { - if (onComplete(null, e)) { - ActionListener.onFailure(listeners, e); - } - } - - /** Returns {@code true} if this method changed the state of this step listener */ - private synchronized boolean onComplete(Response response, Exception e) { - if (done == false) { - this.error = e; - this.result = response; - this.done = true; - return true; - } else { - return false; - } + delegate.onFailure(e); } /** @@ -89,34 +74,16 @@ private synchronized boolean onComplete(Response response, Exception e) { * @param onFailure is called when this step is completed with a failure */ public void whenComplete(CheckedConsumer onResponse, Consumer onFailure) { - final ActionListener listener = ActionListener.wrap(onResponse, onFailure); - final boolean ready; - synchronized (this) { - ready = done; - if (ready == false) { - listeners.add(listener); - } - } - if (ready) { - if (error == null) { - ActionListener.onResponse(Collections.singletonList(listener), result); - } else { - ActionListener.onFailure(Collections.singletonList(listener), error); - } - } + delegate.addListener(ActionListener.wrap(onResponse, onFailure), EsExecutors.newDirectExecutorService(), null); } /** - * Gets the result of this step. This method will throw {@link IllegalArgumentException} - * if this step is not completed yet or completed with a failure. + * Gets the result of this step. This method will throw {@link IllegalStateException} if this step is not completed yet. */ public Response result() { - if (done == false) { + if (delegate.isDone() == false) { throw new IllegalStateException("step is not completed yet"); } - if (error != null) { - throw new IllegalStateException("step is completed with a failure", error); - } - return result; + return FutureUtils.get(delegate); } } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java index 725069c5937ed..d4ba603aca458 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java @@ -60,7 +60,13 @@ public void addListener(ActionListener listener, ExecutorService executor, Th if (done) { run = true; } else { - listeners.add(new Tuple<>(ContextPreservingActionListener.wrapPreservingContext(listener, threadContext), executor)); + final ActionListener wrappedListener; + if (threadContext == null) { + wrappedListener = listener; + } else { + wrappedListener = ContextPreservingActionListener.wrapPreservingContext(listener, threadContext); + } + listeners.add(new Tuple<>(wrappedListener, executor)); run = false; } } diff --git a/server/src/test/java/org/elasticsearch/action/StepListenerTests.java b/server/src/test/java/org/elasticsearch/action/StepListenerTests.java index 2135af3d73011..15e88830e47e9 100644 --- a/server/src/test/java/org/elasticsearch/action/StepListenerTests.java +++ b/server/src/test/java/org/elasticsearch/action/StepListenerTests.java @@ -90,15 +90,16 @@ public void testAbortOnFailure() throws Exception { step2.whenComplete(length -> latch.countDown(), onFailure); latch.await(); assertThat(failureNotified.get(), equalTo(1)); + if (failedStep == 1) { - assertThat(expectThrows(IllegalStateException.class, step1::result).getMessage(), - equalTo("step is completed with a failure")); - assertThat(expectThrows(IllegalStateException.class, step2::result).getMessage(), + assertThat(expectThrows(RuntimeException.class, step1::result).getMessage(), + equalTo("failed at step 1")); + assertThat(expectThrows(RuntimeException.class, step2::result).getMessage(), equalTo("step is not completed yet")); } else { assertThat(step1.result(), equalTo("hello")); - assertThat(expectThrows(IllegalStateException.class, step2::result).getMessage(), - equalTo("step is completed with a failure")); + assertThat(expectThrows(RuntimeException.class, step2::result).getMessage(), + equalTo("failed at step 2")); } }