From 2004d74a22623479c4b3e9810f248147f471ca12 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 31 Jul 2018 18:18:19 +0200 Subject: [PATCH 01/17] Use CompletableFuture --- .../elasticsearch/action/ActionFuture.java | 5 +- .../action/support/AdapterActionFuture.java | 4 +- .../support/PlainListenableActionFuture.java | 67 +--- .../common/util/concurrent/BaseFuture.java | 308 +----------------- .../util/concurrent/ListenableFuture.java | 66 +--- .../transport/PlainTransportFuture.java | 4 +- .../zen/NodeJoinControllerTests.java | 4 +- .../ScheduleWithFixedDelayTests.java | 2 +- 8 files changed, 44 insertions(+), 416 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/ActionFuture.java b/server/src/main/java/org/elasticsearch/action/ActionFuture.java index 1bd5d16b03d2c..7fbdf5b6539b2 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionFuture.java +++ b/server/src/main/java/org/elasticsearch/action/ActionFuture.java @@ -21,15 +21,16 @@ import org.elasticsearch.common.unit.TimeValue; +import java.util.concurrent.CompletionStage; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** - * An extension to {@link Future} allowing for simplified "get" operations. + * An extension to {@link Future} allowing for simplified "get" operations and extended async programming capabilities. * * */ -public interface ActionFuture extends Future { +public interface ActionFuture extends Future, CompletionStage { /** * Similar to {@link #get()}, just catching the {@link InterruptedException} and throwing diff --git a/server/src/main/java/org/elasticsearch/action/support/AdapterActionFuture.java b/server/src/main/java/org/elasticsearch/action/support/AdapterActionFuture.java index 528750ba89b90..75a1f570b2598 100644 --- a/server/src/main/java/org/elasticsearch/action/support/AdapterActionFuture.java +++ b/server/src/main/java/org/elasticsearch/action/support/AdapterActionFuture.java @@ -56,12 +56,12 @@ public T actionGet(long timeout, TimeUnit unit) { @Override public void onResponse(L result) { - set(convert(result)); + complete(convert(result)); } @Override public void onFailure(Exception e) { - setException(e); + completeExceptionally(e); } protected abstract T convert(L listenerResponse); diff --git a/server/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java b/server/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java index 943c36797096c..505264b8cc651 100644 --- a/server/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java +++ b/server/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java @@ -20,19 +20,14 @@ package org.elasticsearch.action.support; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ListenableActionFuture; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.threadpool.ThreadPool; -import java.util.ArrayList; -import java.util.List; - public class PlainListenableActionFuture extends AdapterActionFuture implements ListenableActionFuture { - volatile Object listeners; - boolean executedListeners = false; - protected PlainListenableActionFuture() {} /** @@ -59,26 +54,15 @@ public static PlainListenableActionFuture newDispatchingListenableFuture( @Override public void addListener(final ActionListener listener) { - internalAddListener(listener); - } - - @Override - protected void done() { - super.done(); - synchronized (this) { - executedListeners = true; - } - Object listeners = this.listeners; - if (listeners != null) { - if (listeners instanceof List) { - List list = (List) listeners; - for (Object listener : list) { - executeListener((ActionListener) listener); - } + whenComplete((val, throwable) -> { + if (throwable == null) { + listener.onResponse(val); } else { - executeListener((ActionListener) listeners); + assert throwable instanceof Exception : "Expected exception but was: " + throwable.getClass(); + ExceptionsHelper.dieOnError(throwable); + listener.onFailure((Exception) throwable); } - } + }); } @Override @@ -86,41 +70,6 @@ protected T convert(T listenerResponse) { return listenerResponse; } - private void internalAddListener(ActionListener listener) { - boolean executeImmediate = false; - synchronized (this) { - if (executedListeners) { - executeImmediate = true; - } else { - Object listeners = this.listeners; - if (listeners == null) { - listeners = listener; - } else if (listeners instanceof List) { - ((List) this.listeners).add(listener); - } else { - Object orig = listeners; - listeners = new ArrayList<>(2); - ((List) listeners).add(orig); - ((List) listeners).add(listener); - } - this.listeners = listeners; - } - } - if (executeImmediate) { - executeListener(listener); - } - } - - private void executeListener(final ActionListener listener) { - try { - // we use a timeout of 0 to by pass assertion forbidding to call actionGet() (blocking) on a network thread. - // here we know we will never block - listener.onResponse(actionGet(0)); - } catch (Exception e) { - listener.onFailure(e); - } - } - private static final class DispatchingListenableActionFuture extends PlainListenableActionFuture { private static final Logger logger = Loggers.getLogger(DispatchingListenableActionFuture.class); diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java index 3436ccdf7ad7c..de9068baff92c 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java @@ -19,324 +19,46 @@ package org.elasticsearch.common.util.concurrent; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.MasterService; -import org.elasticsearch.common.Nullable; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transports; -import java.util.Objects; -import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.locks.AbstractQueuedSynchronizer; -public abstract class BaseFuture implements Future { +public abstract class BaseFuture extends CompletableFuture { private static final String BLOCKING_OP_REASON = "Blocking operation"; - /** - * Synchronization control for AbstractFutures. - */ - private final Sync sync = new Sync<>(); - - /* - * Improve the documentation of when InterruptedException is thrown. Our - * behavior matches the JDK's, but the JDK's documentation is misleading. - */ - - /** - * {@inheritDoc} - *

- * The default {@link BaseFuture} implementation throws {@code - * InterruptedException} if the current thread is interrupted before or during - * the call, even if the value is already available. - * - * @throws InterruptedException if the current thread was interrupted before - * or during the call (optional but recommended). - * @throws CancellationException {@inheritDoc} - */ @Override public V get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException { - assert timeout <= 0 || - (Transports.assertNotTransportThread(BLOCKING_OP_REASON) && - ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON) && - ClusterApplierService.assertNotClusterStateUpdateThread(BLOCKING_OP_REASON) && - MasterService.assertNotMasterUpdateThread(BLOCKING_OP_REASON)); - return sync.get(unit.toNanos(timeout)); + assert timeout <= 0 || blockingAllowed(); + return super.get(timeout, unit); } - /* - * Improve the documentation of when InterruptedException is thrown. Our - * behavior matches the JDK's, but the JDK's documentation is misleading. - */ - - /** - * {@inheritDoc} - *

- * The default {@link BaseFuture} implementation throws {@code - * InterruptedException} if the current thread is interrupted before or during - * the call, even if the value is already available. - * - * @throws InterruptedException if the current thread was interrupted before - * or during the call (optional but recommended). - * @throws CancellationException {@inheritDoc} - */ @Override public V get() throws InterruptedException, ExecutionException { - assert Transports.assertNotTransportThread(BLOCKING_OP_REASON) && + assert blockingAllowed(); + return super.get(); + } + + private static boolean blockingAllowed() { + return Transports.assertNotTransportThread(BLOCKING_OP_REASON) && ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON) && ClusterApplierService.assertNotClusterStateUpdateThread(BLOCKING_OP_REASON) && MasterService.assertNotMasterUpdateThread(BLOCKING_OP_REASON); - return sync.get(); - } - - @Override - public boolean isDone() { - return sync.isDone(); - } - - @Override - public boolean isCancelled() { - return sync.isCancelled(); } @Override - public boolean cancel(boolean mayInterruptIfRunning) { - if (!sync.cancel()) { - return false; - } - done(); - if (mayInterruptIfRunning) { - interruptTask(); - } - return true; - } - - /** - * Subclasses can override this method to implement interruption of the - * future's computation. The method is invoked automatically by a successful - * call to {@link #cancel(boolean) cancel(true)}. - *

- * The default implementation does nothing. - * - * @since 10.0 - */ - protected void interruptTask() { - } - - /** - * Subclasses should invoke this method to set the result of the computation - * to {@code value}. This will set the state of the future to - * {@link BaseFuture.Sync#COMPLETED} and call {@link #done()} if the - * state was successfully changed. - * - * @param value the value that was the result of the task. - * @return true if the state was successfully changed. - */ - protected boolean set(@Nullable V value) { - boolean result = sync.set(value); - if (result) { - done(); - } - return result; + public boolean completeExceptionally(Throwable ex) { + assert ex instanceof Exception : "Expected exception but was: " + ex.getClass(); + ExceptionsHelper.dieOnError(ex); + return super.completeExceptionally(ex); } - /** - * Subclasses should invoke this method to set the result of the computation - * to an error, {@code throwable}. This will set the state of the future to - * {@link BaseFuture.Sync#COMPLETED} and call {@link #done()} if the - * state was successfully changed. - * - * @param throwable the exception that the task failed with. - * @return true if the state was successfully changed. - * @throws Error if the throwable was an {@link Error}. - */ - protected boolean setException(Throwable throwable) { - boolean result = sync.setException(Objects.requireNonNull(throwable)); - if (result) { - done(); - } - - // If it's an Error, we want to make sure it reaches the top of the - // call stack, so we rethrow it. - - // we want to notify the listeners we have with errors as well, as it breaks - // how we work in ES in terms of using assertions -// if (throwable instanceof Error) { -// throw (Error) throwable; -// } - return result; - } - - protected void done() { - } - - /** - *

Following the contract of {@link AbstractQueuedSynchronizer} we create a - * private subclass to hold the synchronizer. This synchronizer is used to - * implement the blocking and waiting calls as well as to handle state changes - * in a thread-safe manner. The current state of the future is held in the - * Sync state, and the lock is released whenever the state changes to either - * {@link #COMPLETED} or {@link #CANCELLED}. - *

- * To avoid races between threads doing release and acquire, we transition - * to the final state in two steps. One thread will successfully CAS from - * RUNNING to COMPLETING, that thread will then set the result of the - * computation, and only then transition to COMPLETED or CANCELLED. - *

- * We don't use the integer argument passed between acquire methods so we - * pass around a -1 everywhere. - */ - static final class Sync extends AbstractQueuedSynchronizer { - /* Valid states. */ - static final int RUNNING = 0; - static final int COMPLETING = 1; - static final int COMPLETED = 2; - static final int CANCELLED = 4; - - private V value; - private Throwable exception; - - /* - * Acquisition succeeds if the future is done, otherwise it fails. - */ - @Override - protected int tryAcquireShared(int ignored) { - if (isDone()) { - return 1; - } - return -1; - } - - /* - * We always allow a release to go through, this means the state has been - * successfully changed and the result is available. - */ - @Override - protected boolean tryReleaseShared(int finalState) { - setState(finalState); - return true; - } - - /** - * Blocks until the task is complete or the timeout expires. Throws a - * {@link TimeoutException} if the timer expires, otherwise behaves like - * {@link #get()}. - */ - V get(long nanos) throws TimeoutException, CancellationException, - ExecutionException, InterruptedException { - - // Attempt to acquire the shared lock with a timeout. - if (!tryAcquireSharedNanos(-1, nanos)) { - throw new TimeoutException("Timeout waiting for task."); - } - - return getValue(); - } - - /** - * Blocks until {@link #complete(Object, Throwable, int)} has been - * successfully called. Throws a {@link CancellationException} if the task - * was cancelled, or a {@link ExecutionException} if the task completed with - * an error. - */ - V get() throws CancellationException, ExecutionException, - InterruptedException { - - // Acquire the shared lock allowing interruption. - acquireSharedInterruptibly(-1); - return getValue(); - } - - /** - * Implementation of the actual value retrieval. Will return the value - * on success, an exception on failure, a cancellation on cancellation, or - * an illegal state if the synchronizer is in an invalid state. - */ - private V getValue() throws CancellationException, ExecutionException { - int state = getState(); - switch (state) { - case COMPLETED: - if (exception != null) { - throw new ExecutionException(exception); - } else { - return value; - } - - case CANCELLED: - throw new CancellationException("Task was cancelled."); - - default: - throw new IllegalStateException( - "Error, synchronizer in invalid state: " + state); - } - } - - /** - * Checks if the state is {@link #COMPLETED} or {@link #CANCELLED}. - */ - boolean isDone() { - return (getState() & (COMPLETED | CANCELLED)) != 0; - } - - /** - * Checks if the state is {@link #CANCELLED}. - */ - boolean isCancelled() { - return getState() == CANCELLED; - } - - /** - * Transition to the COMPLETED state and set the value. - */ - boolean set(@Nullable V v) { - return complete(v, null, COMPLETED); - } - - /** - * Transition to the COMPLETED state and set the exception. - */ - boolean setException(Throwable t) { - return complete(null, t, COMPLETED); - } - - /** - * Transition to the CANCELLED state. - */ - boolean cancel() { - return complete(null, null, CANCELLED); - } - - /** - * Implementation of completing a task. Either {@code v} or {@code t} will - * be set but not both. The {@code finalState} is the state to change to - * from {@link #RUNNING}. If the state is not in the RUNNING state we - * return {@code false} after waiting for the state to be set to a valid - * final state ({@link #COMPLETED} or {@link #CANCELLED}). - * - * @param v the value to set as the result of the computation. - * @param t the exception to set as the result of the computation. - * @param finalState the state to transition to. - */ - private boolean complete(@Nullable V v, @Nullable Throwable t, - int finalState) { - boolean doCompletion = compareAndSetState(RUNNING, COMPLETING); - if (doCompletion) { - // If this thread successfully transitioned to COMPLETING, set the value - // and exception and then release to the final state. - this.value = v; - this.exception = t; - releaseShared(finalState); - } else if (getState() == COMPLETING) { - // If some other thread is currently completing the future, block until - // they are done so we can guarantee completion. - acquireShared(-1); - } - return doCompletion; - } - } } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java index d50f57aaafaa5..087b423f59a5e 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java @@ -19,13 +19,10 @@ package org.elasticsearch.common.util.concurrent; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.common.collect.Tuple; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; /** * A future implementation that allows for the result to be passed to listeners waiting for @@ -38,9 +35,6 @@ */ public final class ListenableFuture extends BaseFuture implements ActionListener { - private volatile boolean done = false; - private final List, ExecutorService>> listeners = new ArrayList<>(); - /** * Adds a listener to this future. If the future has not yet completed, the listener will be * notified of a response or exception in a runnable submitted to the ExecutorService provided. @@ -48,58 +42,20 @@ public final class ListenableFuture extends BaseFuture implements ActionLi * a different thread. */ public void addListener(ActionListener listener, ExecutorService executor) { - if (done) { - // run the callback directly, we don't hold the lock and don't need to fork! - notifyListener(listener, EsExecutors.newDirectExecutorService()); - } else { - final boolean run; - // check done under lock since it could have been modified and protect modifications - // to the list under lock - synchronized (this) { - if (done) { - run = true; - } else { - listeners.add(new Tuple<>(listener, executor)); - run = false; - } - } - - if (run) { - // run the callback directly, we don't hold the lock and don't need to fork! - notifyListener(listener, EsExecutors.newDirectExecutorService()); + whenCompleteAsync((val, throwable) -> { + if (throwable == null) { + listener.onResponse(val); + } else { + assert throwable instanceof Exception : "Expected exception but was: " + throwable.getClass(); + ExceptionsHelper.dieOnError(throwable); + listener.onFailure((Exception) throwable); } - } - } - - @Override - protected synchronized void done() { - done = true; - listeners.forEach(t -> notifyListener(t.v1(), t.v2())); - // release references to any listeners as we no longer need them and will live - // much longer than the listeners in most cases - listeners.clear(); - } - - private void notifyListener(ActionListener listener, ExecutorService executorService) { - try { - executorService.submit(() -> { - try { - // call get in a non-blocking fashion as we could be on a network thread - // or another thread like the scheduler, which we should never block! - V value = FutureUtils.get(this, 0L, TimeUnit.NANOSECONDS); - listener.onResponse(value); - } catch (Exception e) { - listener.onFailure(e); - } - }); - } catch (Exception e) { - listener.onFailure(e); - } + }, executor); } @Override public void onResponse(V v) { - final boolean set = set(v); + final boolean set = complete(v); if (set == false) { throw new IllegalStateException("did not set value, value or exception already set?"); } @@ -107,7 +63,7 @@ public void onResponse(V v) { @Override public void onFailure(Exception e) { - final boolean set = setException(e); + final boolean set = completeExceptionally(e); if (set == false) { throw new IllegalStateException("did not set exception, value already set or exception already set?"); } diff --git a/server/src/main/java/org/elasticsearch/transport/PlainTransportFuture.java b/server/src/main/java/org/elasticsearch/transport/PlainTransportFuture.java index 4dc530bd40cf2..641fe041ba020 100644 --- a/server/src/main/java/org/elasticsearch/transport/PlainTransportFuture.java +++ b/server/src/main/java/org/elasticsearch/transport/PlainTransportFuture.java @@ -85,7 +85,7 @@ public String executor() { public void handleResponse(V response) { try { handler.handleResponse(response); - set(response); + complete(response); } catch (Exception e) { handleException(new ResponseHandlerFailureTransportException(e)); } @@ -96,7 +96,7 @@ public void handleException(TransportException exp) { try { handler.handleException(exp); } finally { - setException(exp); + completeExceptionally(exp); } } diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java b/server/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java index 9e57382bb4bc8..9e9e1161c7c4d 100644 --- a/server/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java @@ -805,11 +805,11 @@ static class SimpleFuture extends BaseFuture { } public void markAsDone() { - set(null); + complete(null); } public void markAsFailed(Throwable t) { - setException(t); + completeExceptionally(t); } @Override diff --git a/server/src/test/java/org/elasticsearch/threadpool/ScheduleWithFixedDelayTests.java b/server/src/test/java/org/elasticsearch/threadpool/ScheduleWithFixedDelayTests.java index da0125d6f65d9..55a048fffbf47 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/ScheduleWithFixedDelayTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/ScheduleWithFixedDelayTests.java @@ -291,7 +291,7 @@ public void testRunnableRunsAtMostOnceAfterCancellation() throws Exception { static final class TestFuture extends BaseFuture { boolean futureDone(Object value) { - return set(value); + return complete(value); } } } From 6222911529c8f37783a887e14680b512377cc930 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 31 Jul 2018 23:33:36 +0200 Subject: [PATCH 02/17] Fix tests --- .../support/CachingUsernamePasswordRealm.java | 4 +- .../user/TransportGetUsersActionTests.java | 122 +++--------------- .../authc/esnative/ReservedRealmTests.java | 2 + .../security/authc/file/FileRealmTests.java | 2 + 4 files changed, 26 insertions(+), 104 deletions(-) diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealm.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealm.java index 68338e9bcad93..4ba65720a2420 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealm.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealm.java @@ -121,7 +121,7 @@ private void authenticateWithCache(UsernamePasswordToken token, ActionListener handleFailure(future, createdAndStartedFuture.get(), token, e, listener)), - threadPool.executor(ThreadPool.Names.GENERIC)); + threadPool.generic()); } catch (ExecutionException e) { listener.onResponse(AuthenticationResult.unsuccessful("", e)); } @@ -219,7 +219,7 @@ public final void lookupUser(String username, ActionListener listener) { } else { listener.onResponse(null); } - }, listener::onFailure), threadPool.executor(ThreadPool.Names.GENERIC)); + }, listener::onFailure), threadPool.generic()); } catch (ExecutionException e) { listener.onFailure(e); } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportGetUsersActionTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportGetUsersActionTests.java index 1c5f93187c059..fff776c2567a1 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportGetUsersActionTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/user/TransportGetUsersActionTests.java @@ -39,16 +39,12 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static org.hamcrest.Matchers.arrayContaining; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.emptyArray; -import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.AdditionalMatchers.aryEq; import static org.mockito.Matchers.any; @@ -99,23 +95,10 @@ public void testAnonymousUser() { GetUsersRequest request = new GetUsersRequest(); request.usernames(anonymousUser.principal()); - final AtomicReference throwableRef = new AtomicReference<>(); - final AtomicReference responseRef = new AtomicReference<>(); - action.doExecute(mock(Task.class), request, new ActionListener() { - @Override - public void onResponse(GetUsersResponse response) { - responseRef.set(response); - } - - @Override - public void onFailure(Exception e) { - throwableRef.set(e); - } - }); + final PlainActionFuture responseFut = new PlainActionFuture<>(); + action.doExecute(mock(Task.class), request, responseFut); - assertThat(throwableRef.get(), is(nullValue())); - assertThat(responseRef.get(), is(notNullValue())); - final User[] users = responseRef.get().users(); + final User[] users = responseFut.actionGet().users(); if (anonymousEnabled) { assertThat("expected array with anonymous but got: " + Arrays.toString(users), users, arrayContaining(anonymousUser)); } else { @@ -134,23 +117,11 @@ public void testInternalUser() { GetUsersRequest request = new GetUsersRequest(); request.usernames(randomFrom(SystemUser.INSTANCE.principal(), XPackUser.INSTANCE.principal())); - final AtomicReference throwableRef = new AtomicReference<>(); - final AtomicReference responseRef = new AtomicReference<>(); - action.doExecute(mock(Task.class), request, new ActionListener() { - @Override - public void onResponse(GetUsersResponse response) { - responseRef.set(response); - } - - @Override - public void onFailure(Exception e) { - throwableRef.set(e); - } - }); + final PlainActionFuture responseFut = new PlainActionFuture<>(); + action.doExecute(mock(Task.class), request, responseFut); - assertThat(throwableRef.get(), instanceOf(IllegalArgumentException.class)); - assertThat(throwableRef.get().getMessage(), containsString("is internal")); - assertThat(responseRef.get(), is(nullValue())); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> responseFut.actionGet()); + assertThat(e.getMessage(), containsString("is internal")); verifyZeroInteractions(usersStore); } @@ -178,25 +149,10 @@ public void testReservedUsersOnly() { GetUsersRequest request = new GetUsersRequest(); request.usernames(names.toArray(new String[names.size()])); - final AtomicReference throwableRef = new AtomicReference<>(); - final AtomicReference responseRef = new AtomicReference<>(); - action.doExecute(mock(Task.class), request, new ActionListener() { - @Override - public void onResponse(GetUsersResponse response) { - responseRef.set(response); - } - - @Override - public void onFailure(Exception e) { - logger.warn("Request failed", e); - throwableRef.set(e); - } - }); - - User[] users = responseRef.get().users(); + final PlainActionFuture responseFut = new PlainActionFuture<>(); + action.doExecute(mock(Task.class), request, responseFut); - assertThat(throwableRef.get(), is(nullValue())); - assertThat(responseRef.get(), is(notNullValue())); + User[] users = responseFut.actionGet().users(); assertThat(users, arrayContaining(reservedUsers.toArray(new User[reservedUsers.size()]))); } @@ -225,19 +181,8 @@ public Void answer(InvocationOnMock invocation) { } }).when(usersStore).getUsers(eq(Strings.EMPTY_ARRAY), any(ActionListener.class)); - final AtomicReference throwableRef = new AtomicReference<>(); - final AtomicReference responseRef = new AtomicReference<>(); - action.doExecute(mock(Task.class), request, new ActionListener() { - @Override - public void onResponse(GetUsersResponse response) { - responseRef.set(response); - } - - @Override - public void onFailure(Exception e) { - throwableRef.set(e); - } - }); + final PlainActionFuture responseFut = new PlainActionFuture<>(); + action.doExecute(mock(Task.class), request, responseFut); final List expectedList = new ArrayList<>(); PlainActionFuture> userFuture = new PlainActionFuture<>(); @@ -245,9 +190,7 @@ public void onFailure(Exception e) { expectedList.addAll(userFuture.actionGet()); expectedList.addAll(storeUsers); - assertThat(throwableRef.get(), is(nullValue())); - assertThat(responseRef.get(), is(notNullValue())); - assertThat(responseRef.get().users(), arrayContaining(expectedList.toArray(new User[expectedList.size()]))); + assertThat(responseFut.actionGet().users(), arrayContaining(expectedList.toArray(new User[expectedList.size()]))); verify(usersStore, times(1)).getUsers(aryEq(Strings.EMPTY_ARRAY), any(ActionListener.class)); } @@ -271,26 +214,13 @@ public void testGetStoreOnlyUsers() { return null; }).when(usersStore).getUsers(aryEq(storeUsernames), any(ActionListener.class)); - final AtomicReference throwableRef = new AtomicReference<>(); - final AtomicReference responseRef = new AtomicReference<>(); - action.doExecute(mock(Task.class), request, new ActionListener() { - @Override - public void onResponse(GetUsersResponse response) { - responseRef.set(response); - } - - @Override - public void onFailure(Exception e) { - throwableRef.set(e); - } - }); + final PlainActionFuture responseFut = new PlainActionFuture<>(); + action.doExecute(mock(Task.class), request, responseFut); final List expectedList = new ArrayList<>(); expectedList.addAll(storeUsers); - assertThat(throwableRef.get(), is(nullValue())); - assertThat(responseRef.get(), is(notNullValue())); - assertThat(responseRef.get().users(), arrayContaining(expectedList.toArray(new User[expectedList.size()]))); + assertThat(responseFut.actionGet().users(), arrayContaining(expectedList.toArray(new User[expectedList.size()]))); if (storeUsers.size() > 1) { verify(usersStore, times(1)).getUsers(aryEq(storeUsernames), any(ActionListener.class)); } else { @@ -319,23 +249,11 @@ public void testException() { return null; }).when(usersStore).getUsers(aryEq(storeUsernames), any(ActionListener.class)); - final AtomicReference throwableRef = new AtomicReference<>(); - final AtomicReference responseRef = new AtomicReference<>(); - action.doExecute(mock(Task.class), request, new ActionListener() { - @Override - public void onResponse(GetUsersResponse response) { - responseRef.set(response); - } - - @Override - public void onFailure(Exception e) { - throwableRef.set(e); - } - }); + final PlainActionFuture responseFut = new PlainActionFuture<>(); + action.doExecute(mock(Task.class), request, responseFut); - assertThat(throwableRef.get(), is(notNullValue())); - assertThat(throwableRef.get(), is(sameInstance(e))); - assertThat(responseRef.get(), is(nullValue())); + Exception thrownE = expectThrows(Exception.class, () -> responseFut.actionGet()); + assertThat(thrownE, is(sameInstance(e))); verify(usersStore, times(1)).getUsers(aryEq(storeUsernames), any(ActionListener.class)); } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/esnative/ReservedRealmTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/esnative/ReservedRealmTests.java index fad3d43e6d5f3..b65d88d8d6dc1 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/esnative/ReservedRealmTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/esnative/ReservedRealmTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.SecureString; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.env.Environment; import org.elasticsearch.test.ESTestCase; @@ -75,6 +76,7 @@ public void setupMocks() throws Exception { mockGetAllReservedUserInfo(usersStore, Collections.emptyMap()); threadPool = mock(ThreadPool.class); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + when(threadPool.generic()).thenReturn(EsExecutors.newDirectExecutorService()); } public void testInvalidHashingAlgorithmFails() { diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/file/FileRealmTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/file/FileRealmTests.java index f5dad8b7c684c..80e5607954093 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/file/FileRealmTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/file/FileRealmTests.java @@ -8,6 +8,7 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.settings.SecureString; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.test.ESTestCase; @@ -63,6 +64,7 @@ public void init() throws Exception { threadPool = mock(ThreadPool.class); threadContext = new ThreadContext(globalSettings); when(threadPool.getThreadContext()).thenReturn(threadContext); + when(threadPool.generic()).thenReturn(EsExecutors.newDirectExecutorService()); } public void testAuthenticate() throws Exception { From edd838a2d064b3bc5807a50e1ebdee52c5dd094b Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 2 Aug 2018 16:10:46 +0200 Subject: [PATCH 03/17] wrap onResponse --- .../action/support/PlainListenableActionFuture.java | 6 +++++- .../common/util/concurrent/ListenableFuture.java | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java b/server/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java index 505264b8cc651..bf79481be023b 100644 --- a/server/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java +++ b/server/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java @@ -56,7 +56,11 @@ public static PlainListenableActionFuture newDispatchingListenableFuture( public void addListener(final ActionListener listener) { whenComplete((val, throwable) -> { if (throwable == null) { - listener.onResponse(val); + try { + listener.onResponse(val); + } catch (Exception e) { + listener.onFailure(e); + } } else { assert throwable instanceof Exception : "Expected exception but was: " + throwable.getClass(); ExceptionsHelper.dieOnError(throwable); diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java index 087b423f59a5e..beeea41acb914 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java @@ -44,7 +44,11 @@ public final class ListenableFuture extends BaseFuture implements ActionLi public void addListener(ActionListener listener, ExecutorService executor) { whenCompleteAsync((val, throwable) -> { if (throwable == null) { - listener.onResponse(val); + try { + listener.onResponse(val); + } catch (Exception e) { + listener.onFailure(e); + } } else { assert throwable instanceof Exception : "Expected exception but was: " + throwable.getClass(); ExceptionsHelper.dieOnError(throwable); From e8284d90a99443f36ec5fa0b7ccfae554350127f Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 18 Oct 2018 13:39:26 +0200 Subject: [PATCH 04/17] wip --- .../support/PlainListenableActionFuture.java | 2 +- .../common/util/concurrent/BaseFuture.java | 60 ++++++++++++++++++- .../util/concurrent/ListenableFuture.java | 3 - .../concurrent/ListenableFutureTests.java | 11 ++++ 4 files changed, 70 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java b/server/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java index bf79481be023b..e8e2befd89c05 100644 --- a/server/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java +++ b/server/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java @@ -63,7 +63,7 @@ public void addListener(final ActionListener listener) { } } else { assert throwable instanceof Exception : "Expected exception but was: " + throwable.getClass(); - ExceptionsHelper.dieOnError(throwable); + ExceptionsHelper.maybeDieOnAnotherThread(throwable); listener.onFailure((Exception) throwable); } }); diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java index de9068baff92c..a8a4ec45b3fd4 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java @@ -27,10 +27,13 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.BiConsumer; +import java.util.function.Supplier; -public abstract class BaseFuture extends CompletableFuture { +public class BaseFuture extends CompletableFuture { private static final String BLOCKING_OP_REASON = "Blocking operation"; @@ -57,8 +60,61 @@ private static boolean blockingAllowed() { @Override public boolean completeExceptionally(Throwable ex) { assert ex instanceof Exception : "Expected exception but was: " + ex.getClass(); - ExceptionsHelper.dieOnError(ex); + ExceptionsHelper.maybeDieOnAnotherThread(ex); return super.completeExceptionally(ex); } + @Override + public CompletableFuture whenComplete(BiConsumer action) { + return super.whenComplete(wrap(action)); + } + + @Override + public CompletableFuture whenCompleteAsync(BiConsumer action, Executor executor) { + return super.whenCompleteAsync(wrap(action), executor); + } + + // method only provided since JDK9 + public Executor defaultExecutor() { + throw new IllegalStateException("default executor"); + } + +// public CompletableFuture completeAsync(Supplier supplier, Executor executor) { +// // only works on JDK 9 +// // return super.completeAsync(wrap(supplier), executor); +// +// } + + private BiConsumer wrap(BiConsumer action) { + return (v, t) -> { + if (t != null) { + ExceptionsHelper.maybeDieOnAnotherThread(t); + } + try { + action.accept(v, t); + } catch (Throwable throwable) { + ExceptionsHelper.maybeDieOnAnotherThread(throwable); + throw throwable; + } + }; + } + + private Supplier wrap(Supplier supplier) { + return () -> { + try { + return supplier.get(); + } catch (Throwable throwable) { + ExceptionsHelper.maybeDieOnAnotherThread(throwable); + throw throwable; + } + }; + } + + // TODO: Unfortunately this method was only introduced in JDK 9. This means that we can't guarantee for older JDKs to return our + // subclass here for the CompletionStage methods (bummer) + // it's ok though, because ES 7.0 will require JDK 11 to run + public CompletableFuture newIncompleteFuture() { + return new BaseFuture<>(); + } + } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java index eb569564d5643..70af6c0ade127 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java @@ -19,7 +19,6 @@ package org.elasticsearch.common.util.concurrent; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; @@ -52,8 +51,6 @@ public void addListener(ActionListener listener, ExecutorService executor, Th wrappedListener.onFailure(e); } } else { - assert throwable instanceof Exception : "Expected exception but was: " + throwable.getClass(); - ExceptionsHelper.maybeDieOnAnotherThread(throwable); wrappedListener.onFailure((Exception) throwable); } }, executor); diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/ListenableFutureTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/ListenableFutureTests.java index 75a2e29946179..a0036b2fa0241 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/ListenableFutureTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/ListenableFutureTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; import java.util.concurrent.BrokenBarrierException; @@ -58,6 +59,16 @@ public void testListenableFutureNotifiesListeners() { assertTrue(future.isDone()); } + public void testThrowableListener() { + executorService = EsExecutors.newFixed("testThrowableListener", 1, 1000, + EsExecutors.daemonThreadFactory("listener"), threadContext); + ListenableFuture context = new ListenableFuture<>(); + context.addListener(ActionListener.wrap(() -> { + assert false : "Should not fail"; + }), executorService, threadContext); + context.onResponse(null); + } + public void testListenableFutureNotifiesListenersOnException() { ListenableFuture future = new ListenableFuture<>(); AtomicInteger notifications = new AtomicInteger(0); From 94b7fd0d6410f70ad6580056004fc26800078499 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Sun, 2 Dec 2018 11:39:14 +0100 Subject: [PATCH 05/17] more wip --- .../common/util/concurrent/BaseFuture.java | 244 ++++++++++++++++-- 1 file changed, 223 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java index a8a4ec45b3fd4..25794c3417ea4 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java @@ -26,17 +26,55 @@ import org.elasticsearch.transport.Transports; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; -import java.util.function.Supplier; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +/** + * CompletableFuture implementation that properly bubbles up errors. How does it work? + * The main idea is to override newIncompleteFuture() and provide a BaseFuture instance. This means that all CompletionStage methods + * properly instantiate BaseFuture instances. The newIncompleteFuture() method was only introduced in JDK9. + * This means that we can't guarantee for older JDKs to return our subclass for the CompletionStage methods. It's ok though, + * because ES 7.0 will require JDK 11 to run. There are other methods on CompletableFuture that are unsafe to use, and we will have + * to ban them. I envision the following plan for them: + * - add CompletableFuture methods to forbidden APIs: + * - blacklist all methods on CompletableFuture / CompletableStage that take the default (forkjoinpool) executor + * - blacklist constructor methods of CompletableFuture and static methods that create CompletableFuture instances + * (failedFuture, allOf, anyOf, 2 * supplyAsync, 2 * runAsync, completedFuture) + * - blacklist methods that return MinimalStage (completedStage, failedStage, minimalCompletionStage(possibly override this one)) + * - blacklist methods we cannot wrap because they only exist on JDK9 (2 * completeAsync, orTimeout, completeOnTimeout, + * 2 * delayedExecutor, + * - provide corresponding methods for the forbidden static ones on BaseFuture + * - possibly return BaseFuture on overridden methods such as whenComplete etc. This makes users able to program against BaseFuture + * instead of CompletableFuture. Would only be possible once we switch to JDK 9+ as JDK8 would get a classcastexception otherwise. + */ public class BaseFuture extends CompletableFuture { private static final String BLOCKING_OP_REASON = "Blocking operation"; + // Unfortunately this method was only introduced in JDK 9. This means that we can't guarantee for older JDKs to return our + // subclass here for the CompletionStage methods (bummer) + // it's ok though, because ES 7.0 will require JDK 11 to run + public CompletableFuture newIncompleteFuture() { + return new BaseFuture<>(); + } + + // method only provided since JDK9 so cannot specify Override annotation here + public Executor defaultExecutor() { + throw new IllegalStateException("default executor"); + } + + // method only provided since JDK9 so cannot specify Override annotation here + public CompletionStage minimalCompletionStage() { + throw new IllegalStateException("minimalCompletionStage"); + } + @Override public V get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException { @@ -59,33 +97,143 @@ private static boolean blockingAllowed() { @Override public boolean completeExceptionally(Throwable ex) { - assert ex instanceof Exception : "Expected exception but was: " + ex.getClass(); ExceptionsHelper.maybeDieOnAnotherThread(ex); return super.completeExceptionally(ex); } + @Override + public CompletableFuture thenApply(Function fn) { + return super.thenApply(wrap(fn)); + } + + @Override + public CompletableFuture thenApplyAsync(Function fn, Executor executor) { + return super.thenApplyAsync(wrap(fn), executor); + } + + @Override + public CompletableFuture thenAccept(Consumer action) { + return super.thenAccept(wrap(action)); + } + + @Override + public CompletableFuture thenAcceptAsync(Consumer action, Executor executor) { + return super.thenAcceptAsync(wrap(action), executor); + } + + @Override + public CompletableFuture thenRun(Runnable action) { + return super.thenRun(wrap(action)); + } + + @Override + public CompletableFuture thenRunAsync(Runnable action, Executor executor) { + return super.thenRunAsync(wrap(action), executor); + } + + @Override + public CompletableFuture thenCombine(CompletionStage other, BiFunction fn) { + return super.thenCombine(other, wrap(fn)); + } + + @Override + public CompletableFuture thenCombineAsync(CompletionStage other, + BiFunction fn, Executor executor) { + return super.thenCombineAsync(other, wrap(fn), executor); + } + + @Override + public CompletableFuture thenAcceptBoth(CompletionStage other, BiConsumer action) { + return super.thenAcceptBoth(other, wrap(action)); + } + + @Override + public CompletableFuture thenAcceptBothAsync(CompletionStage other, BiConsumer action, + Executor executor) { + return super.thenAcceptBothAsync(other, wrap(action), executor); + } + + @Override + public CompletableFuture runAfterBoth(CompletionStage other, Runnable action) { + return super.runAfterBoth(other, wrap(action)); + } + + @Override + public CompletableFuture runAfterBothAsync(CompletionStage other, Runnable action, Executor executor) { + return super.runAfterBothAsync(other, wrap(action), executor); + } + + @Override + public CompletableFuture applyToEither(CompletionStage other, Function fn) { + return super.applyToEither(other, wrap(fn)); + } + + @Override + public CompletableFuture applyToEitherAsync(CompletionStage other, Function fn, Executor executor) { + return super.applyToEitherAsync(other, wrap(fn), executor); + } + + @Override + public CompletableFuture acceptEither(CompletionStage other, Consumer action) { + return super.acceptEither(other, wrap(action)); + } + + @Override + public CompletableFuture acceptEitherAsync(CompletionStage other, Consumer action, Executor executor) { + return super.acceptEitherAsync(other, wrap(action), executor); + } + + @Override + public CompletableFuture runAfterEither(CompletionStage other, Runnable action) { + return super.runAfterEither(other, wrap(action)); + } + + @Override + public CompletableFuture runAfterEitherAsync(CompletionStage other, Runnable action, Executor executor) { + return super.runAfterEitherAsync(other, wrap(action), executor); + } + + @Override + public CompletableFuture thenCompose(Function> fn) { + return super.thenCompose(wrap(fn)); + } + + @Override + public CompletableFuture thenComposeAsync(Function> fn, Executor executor) { + return super.thenComposeAsync(wrap(fn), executor); + } + @Override public CompletableFuture whenComplete(BiConsumer action) { - return super.whenComplete(wrap(action)); + return super.whenComplete(wrap2(action)); } @Override public CompletableFuture whenCompleteAsync(BiConsumer action, Executor executor) { - return super.whenCompleteAsync(wrap(action), executor); + return super.whenCompleteAsync(wrap2(action), executor); } - // method only provided since JDK9 - public Executor defaultExecutor() { - throw new IllegalStateException("default executor"); + @Override + public CompletableFuture handle(BiFunction fn) { + return super.handle(wrap2(fn)); } -// public CompletableFuture completeAsync(Supplier supplier, Executor executor) { -// // only works on JDK 9 -// // return super.completeAsync(wrap(supplier), executor); -// -// } + @Override + public CompletableFuture handleAsync(BiFunction fn, Executor executor) { + return super.handleAsync(wrap2(fn), executor); + } + + @Override + public CompletableFuture toCompletableFuture() { + return super.toCompletableFuture(); + } - private BiConsumer wrap(BiConsumer action) { + @Override + public CompletableFuture exceptionally(Function fn) { + return super.exceptionally(wrap(fn)); + } + + private BiConsumer wrap2(BiConsumer action) { return (v, t) -> { if (t != null) { ExceptionsHelper.maybeDieOnAnotherThread(t); @@ -99,10 +247,10 @@ public Executor defaultExecutor() { }; } - private Supplier wrap(Supplier supplier) { - return () -> { + private static BiConsumer wrap(BiConsumer action) { + return (v, t) -> { try { - return supplier.get(); + action.accept(v, t); } catch (Throwable throwable) { ExceptionsHelper.maybeDieOnAnotherThread(throwable); throw throwable; @@ -110,11 +258,65 @@ private Supplier wrap(Supplier supplier) { }; } - // TODO: Unfortunately this method was only introduced in JDK 9. This means that we can't guarantee for older JDKs to return our - // subclass here for the CompletionStage methods (bummer) - // it's ok though, because ES 7.0 will require JDK 11 to run - public CompletableFuture newIncompleteFuture() { - return new BaseFuture<>(); + private BiFunction wrap2(BiFunction fn) { + return (v, t) -> { + if (t != null) { + ExceptionsHelper.maybeDieOnAnotherThread(t); + } + try { + return fn.apply(v, t); + } catch (Throwable throwable) { + ExceptionsHelper.maybeDieOnAnotherThread(throwable); + throw throwable; + } + }; + } + + private static BiFunction wrap(BiFunction fn) { + return (v, t) -> { + try { + return fn.apply(v, t); + } catch (Throwable throwable) { + ExceptionsHelper.maybeDieOnAnotherThread(throwable); + throw throwable; + } + }; + } + + private static Function wrap(Function fn) { + return t -> { + if (t instanceof Throwable) { + ExceptionsHelper.maybeDieOnAnotherThread((Throwable) t); + } + try { + return fn.apply(t); + } catch (Throwable throwable) { + ExceptionsHelper.maybeDieOnAnotherThread(throwable); + throw throwable; + } + }; + } + + private static Consumer wrap(Consumer consumer) { + return t -> { + try { + consumer.accept(t); + } catch (Throwable throwable) { + ExceptionsHelper.maybeDieOnAnotherThread(throwable); + throw throwable; + } + }; + } + + private static Runnable wrap(Runnable runnable) { + return () -> { + try { + runnable.run(); + } catch (Throwable throwable) { + ExceptionsHelper.maybeDieOnAnotherThread(throwable); + throw throwable; + } + }; } } From f4f0e62ec6e0ff201d4bef80d5add93ce826e309 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 3 Dec 2018 22:01:04 +0100 Subject: [PATCH 06/17] Wrap CompletableFuture --- .../util/concurrent/BaseFutureTests.java | 84 ++++ .../common/util/concurrent/BaseFuture.java | 372 ++++++++++-------- 2 files changed, 283 insertions(+), 173 deletions(-) create mode 100644 qa/evil-tests/src/test/java/org/elasticsearch/common/util/concurrent/BaseFutureTests.java diff --git a/qa/evil-tests/src/test/java/org/elasticsearch/common/util/concurrent/BaseFutureTests.java b/qa/evil-tests/src/test/java/org/elasticsearch/common/util/concurrent/BaseFutureTests.java new file mode 100644 index 0000000000000..ddbcd37f3ef11 --- /dev/null +++ b/qa/evil-tests/src/test/java/org/elasticsearch/common/util/concurrent/BaseFutureTests.java @@ -0,0 +1,84 @@ +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.EvilThreadPoolTests; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasToString; +import static org.hamcrest.Matchers.instanceOf; + +public class BaseFutureTests extends ESTestCase { + + private ThreadPool threadPool; + + @Before + public void setUpThreadPool() { + threadPool = new TestThreadPool(EvilThreadPoolTests.class.getName()); + } + + @After + public void tearDownThreadPool() { + terminate(threadPool); + } + + public void testErrorCaught() throws InterruptedException { + Consumer expected = t -> { + assertThat(t, instanceOf(Error.class)); + assertThat(t, hasToString(containsString("future error"))); + }; + + runExecutionTest( + () -> new BaseFuture<>().completeExceptionally(new Error("future error")), + expected); + + runExecutionTest( + () -> { + BaseFuture fut = new BaseFuture<>(); + fut.thenRun(() -> { + throw new Error("future error"); + }); + fut.complete(new Object()); + }, + expected); + + runExecutionTest( + () -> { + BaseFuture fut = new BaseFuture<>(); + fut.thenRunAsync(() -> { + throw new Error("future error"); + }, threadPool.generic()); + fut.complete(new Object()); + }, + expected); + } + + private void runExecutionTest(final Runnable runnable, final Consumer consumer) throws InterruptedException { + final AtomicReference throwableReference = new AtomicReference<>(); + final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler(); + final CountDownLatch uncaughtExceptionHandlerLatch = new CountDownLatch(1); + + try { + Thread.setDefaultUncaughtExceptionHandler((t, e) -> { + throwableReference.set(e); + uncaughtExceptionHandlerLatch.countDown(); + }); + + runnable.run(); + + uncaughtExceptionHandlerLatch.await(); + consumer.accept(throwableReference.get()); + } finally { + Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler); + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java index 25794c3417ea4..45357ece5965d 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java @@ -25,69 +25,65 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transports; +import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; /** - * CompletableFuture implementation that properly bubbles up errors. How does it work? - * The main idea is to override newIncompleteFuture() and provide a BaseFuture instance. This means that all CompletionStage methods - * properly instantiate BaseFuture instances. The newIncompleteFuture() method was only introduced in JDK9. - * This means that we can't guarantee for older JDKs to return our subclass for the CompletionStage methods. It's ok though, - * because ES 7.0 will require JDK 11 to run. There are other methods on CompletableFuture that are unsafe to use, and we will have - * to ban them. I envision the following plan for them: - * - add CompletableFuture methods to forbidden APIs: - * - blacklist all methods on CompletableFuture / CompletableStage that take the default (forkjoinpool) executor + * Wraps a CompletableFuture and ensures Errors are properly bubbled up + * TODO: + * - add CompletableFuture class and methods to forbidden APIs: * - blacklist constructor methods of CompletableFuture and static methods that create CompletableFuture instances - * (failedFuture, allOf, anyOf, 2 * supplyAsync, 2 * runAsync, completedFuture) * - blacklist methods that return MinimalStage (completedStage, failedStage, minimalCompletionStage(possibly override this one)) - * - blacklist methods we cannot wrap because they only exist on JDK9 (2 * completeAsync, orTimeout, completeOnTimeout, - * 2 * delayedExecutor, + * - blacklist static methods on CompletableFuture (failedFuture, allOf, anyOf, 2 * supplyAsync, 2 * runAsync, completedFuture) * - provide corresponding methods for the forbidden static ones on BaseFuture - * - possibly return BaseFuture on overridden methods such as whenComplete etc. This makes users able to program against BaseFuture - * instead of CompletableFuture. Would only be possible once we switch to JDK 9+ as JDK8 would get a classcastexception otherwise. */ -public class BaseFuture extends CompletableFuture { +public class BaseFuture implements Future, CompletionStage { - private static final String BLOCKING_OP_REASON = "Blocking operation"; - - // Unfortunately this method was only introduced in JDK 9. This means that we can't guarantee for older JDKs to return our - // subclass here for the CompletionStage methods (bummer) - // it's ok though, because ES 7.0 will require JDK 11 to run - public CompletableFuture newIncompleteFuture() { - return new BaseFuture<>(); - } + private final CompletableFuture wrapped; - // method only provided since JDK9 so cannot specify Override annotation here - public Executor defaultExecutor() { - throw new IllegalStateException("default executor"); + public BaseFuture() { + this(new CompletableFuture<>()); } - // method only provided since JDK9 so cannot specify Override annotation here - public CompletionStage minimalCompletionStage() { - throw new IllegalStateException("minimalCompletionStage"); + private BaseFuture(CompletableFuture fut) { + wrapped = fut; + wrapped.exceptionally(t -> { + ExceptionsHelper.maybeDieOnAnotherThread(t); + return null; + }); } @Override public V get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException { assert timeout <= 0 || blockingAllowed(); - return super.get(timeout, unit); + return wrapped.get(timeout, unit); } @Override public V get() throws InterruptedException, ExecutionException { assert blockingAllowed(); - return super.get(); + return wrapped.get(); + } + + public V join() { + assert blockingAllowed(); + return wrapped.join(); } + private static final String BLOCKING_OP_REASON = "Blocking operation"; + private static boolean blockingAllowed() { return Transports.assertNotTransportThread(BLOCKING_OP_REASON) && ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON) && @@ -95,228 +91,258 @@ private static boolean blockingAllowed() { MasterService.assertNotMasterUpdateThread(BLOCKING_OP_REASON); } + public V getNow(V valueIfAbsent) { + return wrapped.getNow(valueIfAbsent); + } + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return wrapped.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return wrapped.isCancelled(); + } + + @Override + public boolean isDone() { + return wrapped.isDone(); + } + + public boolean isCompletedExceptionally() { + return wrapped.isCompletedExceptionally(); + } + + public boolean complete(V value) { + return wrapped.complete(value); + } + public boolean completeExceptionally(Throwable ex) { - ExceptionsHelper.maybeDieOnAnotherThread(ex); - return super.completeExceptionally(ex); + return wrapped.completeExceptionally(ex); } @Override - public CompletableFuture thenApply(Function fn) { - return super.thenApply(wrap(fn)); + public String toString() { + return "BaseFuture{" + wrapped + "}"; + } + + public static BaseFuture allOf(BaseFuture... cfs) { + return new BaseFuture<>(CompletableFuture.allOf(Arrays.stream(cfs).map(bf -> bf.wrapped).toArray(CompletableFuture[]::new))); + } + + public static BaseFuture anyOf(BaseFuture... cfs) { + return new BaseFuture<>(CompletableFuture.anyOf(Arrays.stream(cfs).map(bf -> bf.wrapped).toArray(CompletableFuture[]::new))); + } + + public static BaseFuture supplyAsync(Supplier supplier, Executor executor) { + return new BaseFuture<>(CompletableFuture.supplyAsync(supplier, executor)); + } + + public static BaseFuture runAsync(Runnable runnable, Executor executor) { + return new BaseFuture<>(CompletableFuture.runAsync(runnable, executor)); + } + + public static BaseFuture completedFuture(U value) { + return new BaseFuture<>(CompletableFuture.completedFuture(value)); + } + + public static BaseFuture failedFuture(Throwable ex) { + final BaseFuture fut = new BaseFuture<>(); + fut.completeExceptionally(ex); + return fut; } @Override - public CompletableFuture thenApplyAsync(Function fn, Executor executor) { - return super.thenApplyAsync(wrap(fn), executor); + public BaseFuture thenApply(Function fn) { + return new BaseFuture<>(wrapped.thenApply(fn)); } @Override - public CompletableFuture thenAccept(Consumer action) { - return super.thenAccept(wrap(action)); + public CompletionStage thenApplyAsync(Function fn) { + throw new UnsupportedOperationException("specify executor"); } @Override - public CompletableFuture thenAcceptAsync(Consumer action, Executor executor) { - return super.thenAcceptAsync(wrap(action), executor); + public BaseFuture thenApplyAsync(Function fn, Executor executor) { + return new BaseFuture<>(wrapped.thenApplyAsync(fn, executor)); } @Override - public CompletableFuture thenRun(Runnable action) { - return super.thenRun(wrap(action)); + public BaseFuture thenAccept(Consumer action) { + return new BaseFuture<>(wrapped.thenAccept(action)); } @Override - public CompletableFuture thenRunAsync(Runnable action, Executor executor) { - return super.thenRunAsync(wrap(action), executor); + public CompletionStage thenAcceptAsync(Consumer action) { + throw new UnsupportedOperationException("specify executor"); } @Override - public CompletableFuture thenCombine(CompletionStage other, BiFunction fn) { - return super.thenCombine(other, wrap(fn)); + public BaseFuture thenAcceptAsync(Consumer action, Executor executor) { + return new BaseFuture<>(wrapped.thenAcceptAsync(action, executor)); } @Override - public CompletableFuture thenCombineAsync(CompletionStage other, + public BaseFuture thenRun(Runnable action) { + return new BaseFuture<>(wrapped.thenRun(action)); + } + + @Override + public CompletionStage thenRunAsync(Runnable action) { + throw new UnsupportedOperationException("specify executor"); + } + + @Override + public BaseFuture thenRunAsync(Runnable action, Executor executor) { + return new BaseFuture<>(wrapped.thenRunAsync(action, executor)); + } + + @Override + public BaseFuture thenCombine(CompletionStage other, BiFunction fn) { + return new BaseFuture<>(wrapped.thenCombine(other, fn)); + } + + @Override + public CompletionStage thenCombineAsync(CompletionStage other, + BiFunction fn) { + throw new UnsupportedOperationException("specify executor"); + } + + @Override + public BaseFuture thenCombineAsync(CompletionStage other, BiFunction fn, Executor executor) { - return super.thenCombineAsync(other, wrap(fn), executor); + return new BaseFuture<>(wrapped.thenCombineAsync(other, fn, executor)); } @Override - public CompletableFuture thenAcceptBoth(CompletionStage other, BiConsumer action) { - return super.thenAcceptBoth(other, wrap(action)); + public BaseFuture thenAcceptBoth(CompletionStage other, BiConsumer action) { + return new BaseFuture<>(wrapped.thenAcceptBoth(other, action)); } @Override - public CompletableFuture thenAcceptBothAsync(CompletionStage other, BiConsumer action, + public CompletionStage thenAcceptBothAsync(CompletionStage other, BiConsumer action) { + throw new UnsupportedOperationException("specify executor"); + } + + @Override + public BaseFuture thenAcceptBothAsync(CompletionStage other, BiConsumer action, Executor executor) { - return super.thenAcceptBothAsync(other, wrap(action), executor); + return new BaseFuture<>(wrapped.thenAcceptBothAsync(other, action, executor)); } @Override - public CompletableFuture runAfterBoth(CompletionStage other, Runnable action) { - return super.runAfterBoth(other, wrap(action)); + public BaseFuture runAfterBoth(CompletionStage other, Runnable action) { + return new BaseFuture<>(wrapped.runAfterBoth(other, action)); } @Override - public CompletableFuture runAfterBothAsync(CompletionStage other, Runnable action, Executor executor) { - return super.runAfterBothAsync(other, wrap(action), executor); + public CompletionStage runAfterBothAsync(CompletionStage other, Runnable action) { + throw new UnsupportedOperationException("specify executor"); } @Override - public CompletableFuture applyToEither(CompletionStage other, Function fn) { - return super.applyToEither(other, wrap(fn)); + public BaseFuture runAfterBothAsync(CompletionStage other, Runnable action, Executor executor) { + return new BaseFuture<>(wrapped.runAfterBothAsync(other, action, executor)); } @Override - public CompletableFuture applyToEitherAsync(CompletionStage other, Function fn, Executor executor) { - return super.applyToEitherAsync(other, wrap(fn), executor); + public BaseFuture applyToEither(CompletionStage other, Function fn) { + return new BaseFuture<>(wrapped.applyToEither(other, fn)); } @Override - public CompletableFuture acceptEither(CompletionStage other, Consumer action) { - return super.acceptEither(other, wrap(action)); + public CompletionStage applyToEitherAsync(CompletionStage other, Function fn) { + throw new UnsupportedOperationException("specify executor"); } @Override - public CompletableFuture acceptEitherAsync(CompletionStage other, Consumer action, Executor executor) { - return super.acceptEitherAsync(other, wrap(action), executor); + public BaseFuture applyToEitherAsync(CompletionStage other, Function fn, Executor executor) { + return new BaseFuture<>(wrapped.applyToEitherAsync(other, fn, executor)); } @Override - public CompletableFuture runAfterEither(CompletionStage other, Runnable action) { - return super.runAfterEither(other, wrap(action)); + public BaseFuture acceptEither(CompletionStage other, Consumer action) { + return new BaseFuture<>(wrapped.acceptEither(other, action)); } @Override - public CompletableFuture runAfterEitherAsync(CompletionStage other, Runnable action, Executor executor) { - return super.runAfterEitherAsync(other, wrap(action), executor); + public CompletionStage acceptEitherAsync(CompletionStage other, Consumer action) { + throw new UnsupportedOperationException("specify executor"); } @Override - public CompletableFuture thenCompose(Function> fn) { - return super.thenCompose(wrap(fn)); + public BaseFuture acceptEitherAsync(CompletionStage other, Consumer action, Executor executor) { + return new BaseFuture<>(wrapped.acceptEitherAsync(other, action, executor)); } @Override - public CompletableFuture thenComposeAsync(Function> fn, Executor executor) { - return super.thenComposeAsync(wrap(fn), executor); + public BaseFuture runAfterEither(CompletionStage other, Runnable action) { + return new BaseFuture<>(wrapped.runAfterEither(other, action)); } @Override - public CompletableFuture whenComplete(BiConsumer action) { - return super.whenComplete(wrap2(action)); + public CompletionStage runAfterEitherAsync(CompletionStage other, Runnable action) { + throw new UnsupportedOperationException("specify executor"); } @Override - public CompletableFuture whenCompleteAsync(BiConsumer action, Executor executor) { - return super.whenCompleteAsync(wrap2(action), executor); + public BaseFuture runAfterEitherAsync(CompletionStage other, Runnable action, Executor executor) { + return new BaseFuture<>(wrapped.runAfterEitherAsync(other, action, executor)); } @Override - public CompletableFuture handle(BiFunction fn) { - return super.handle(wrap2(fn)); + public BaseFuture thenCompose(Function> fn) { + return new BaseFuture<>(wrapped.thenCompose(fn)); } @Override - public CompletableFuture handleAsync(BiFunction fn, Executor executor) { - return super.handleAsync(wrap2(fn), executor); + public CompletionStage thenComposeAsync(Function> fn) { + throw new UnsupportedOperationException("specify executor"); } @Override - public CompletableFuture toCompletableFuture() { - return super.toCompletableFuture(); - } - - @Override - public CompletableFuture exceptionally(Function fn) { - return super.exceptionally(wrap(fn)); - } - - private BiConsumer wrap2(BiConsumer action) { - return (v, t) -> { - if (t != null) { - ExceptionsHelper.maybeDieOnAnotherThread(t); - } - try { - action.accept(v, t); - } catch (Throwable throwable) { - ExceptionsHelper.maybeDieOnAnotherThread(throwable); - throw throwable; - } - }; - } - - private static BiConsumer wrap(BiConsumer action) { - return (v, t) -> { - try { - action.accept(v, t); - } catch (Throwable throwable) { - ExceptionsHelper.maybeDieOnAnotherThread(throwable); - throw throwable; - } - }; - } - - private BiFunction wrap2(BiFunction fn) { - return (v, t) -> { - if (t != null) { - ExceptionsHelper.maybeDieOnAnotherThread(t); - } - try { - return fn.apply(v, t); - } catch (Throwable throwable) { - ExceptionsHelper.maybeDieOnAnotherThread(throwable); - throw throwable; - } - }; - } - - private static BiFunction wrap(BiFunction fn) { - return (v, t) -> { - try { - return fn.apply(v, t); - } catch (Throwable throwable) { - ExceptionsHelper.maybeDieOnAnotherThread(throwable); - throw throwable; - } - }; - } - - private static Function wrap(Function fn) { - return t -> { - if (t instanceof Throwable) { - ExceptionsHelper.maybeDieOnAnotherThread((Throwable) t); - } - try { - return fn.apply(t); - } catch (Throwable throwable) { - ExceptionsHelper.maybeDieOnAnotherThread(throwable); - throw throwable; - } - }; - } - - private static Consumer wrap(Consumer consumer) { - return t -> { - try { - consumer.accept(t); - } catch (Throwable throwable) { - ExceptionsHelper.maybeDieOnAnotherThread(throwable); - throw throwable; - } - }; - } - - private static Runnable wrap(Runnable runnable) { - return () -> { - try { - runnable.run(); - } catch (Throwable throwable) { - ExceptionsHelper.maybeDieOnAnotherThread(throwable); - throw throwable; - } - }; + public BaseFuture thenComposeAsync(Function> fn, Executor executor) { + return new BaseFuture<>(wrapped.thenComposeAsync(fn, executor)); + } + + @Override + public BaseFuture whenComplete(BiConsumer action) { + return new BaseFuture<>(wrapped.whenComplete(action)); + } + + @Override + public CompletionStage whenCompleteAsync(BiConsumer action) { + throw new UnsupportedOperationException("specify executor"); + } + + @Override + public BaseFuture whenCompleteAsync(BiConsumer action, Executor executor) { + return new BaseFuture<>(wrapped.whenCompleteAsync(action, executor)); + } + + @Override + public BaseFuture handle(BiFunction fn) { + return new BaseFuture<>(wrapped.handle(fn)); + } + + @Override + public CompletionStage handleAsync(BiFunction fn) { + throw new UnsupportedOperationException("specify executor"); } + @Override + public BaseFuture handleAsync(BiFunction fn, Executor executor) { + return new BaseFuture<>(wrapped.handleAsync(fn, executor)); + } + + @Override + public BaseFuture exceptionally(Function fn) { + return new BaseFuture<>(wrapped.exceptionally(fn)); + } + + @Override + public CompletableFuture toCompletableFuture() { + return wrapped.toCompletableFuture(); + } } From f136f13017ec80b23f8ca508c3a0694f45746907 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 4 Dec 2018 13:51:49 +0100 Subject: [PATCH 07/17] add license header --- .../util/concurrent/BaseFutureTests.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/qa/evil-tests/src/test/java/org/elasticsearch/common/util/concurrent/BaseFutureTests.java b/qa/evil-tests/src/test/java/org/elasticsearch/common/util/concurrent/BaseFutureTests.java index ddbcd37f3ef11..9bec5577c4247 100644 --- a/qa/evil-tests/src/test/java/org/elasticsearch/common/util/concurrent/BaseFutureTests.java +++ b/qa/evil-tests/src/test/java/org/elasticsearch/common/util/concurrent/BaseFutureTests.java @@ -1,3 +1,21 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.elasticsearch.common.util.concurrent; import org.elasticsearch.test.ESTestCase; From f555eb509bac48f5dd63e6fdaebbb74d5aaf384a Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 26 Dec 2018 19:54:09 +0100 Subject: [PATCH 08/17] add forbiddenApis exclusions for CompletableFuture --- .../forbidden/es-server-signatures.txt | 14 ++++++++++ .../grok/ThreadWatchdogTests.java | 9 ++++--- .../util/concurrent/BaseFutureTests.java | 1 - .../org/elasticsearch/common/cache/Cache.java | 26 +++++++++---------- .../common/util/concurrent/BaseFuture.java | 19 ++++++++------ .../discovery/zen/ZenDiscovery.java | 6 ++--- .../recovery/RecoverySourceHandler.java | 4 +-- .../cluster/coordination/NodeJoinTests.java | 4 +-- .../concurrent/ListenableFutureTests.java | 1 - .../single/SingleNodeDiscoveryIT.java | 6 ++--- .../ingest/ConditionalProcessorTests.java | 4 +-- .../ingest/PipelineProcessorTests.java | 4 +-- 12 files changed, 57 insertions(+), 41 deletions(-) diff --git a/buildSrc/src/main/resources/forbidden/es-server-signatures.txt b/buildSrc/src/main/resources/forbidden/es-server-signatures.txt index 01c7d18907346..2010f9d6b0876 100644 --- a/buildSrc/src/main/resources/forbidden/es-server-signatures.txt +++ b/buildSrc/src/main/resources/forbidden/es-server-signatures.txt @@ -26,6 +26,20 @@ java.util.concurrent.Executors#newScheduledThreadPool(int) java.util.concurrent.Executors#defaultThreadFactory() java.util.concurrent.Executors#privilegedThreadFactory() +@defaultMessage use org.elasticsearch.common.util.concurrent.BaseFuture or one of its subclasses instead + +java.util.concurrent.CompletableFuture#() +java.util.concurrent.CompletableFuture#allOf(java.util.concurrent.CompletableFuture[]) +java.util.concurrent.CompletableFuture#anyOf(java.util.concurrent.CompletableFuture[]) +java.util.concurrent.CompletableFuture#supplyAsync(java.util.function.Supplier) +java.util.concurrent.CompletableFuture#supplyAsync(java.util.function.Supplier, java.util.concurrent.Executor) +java.util.concurrent.CompletableFuture#runAsync(java.lang.Runnable) +java.util.concurrent.CompletableFuture#runAsync(java.lang.Runnable, java.util.concurrent.Executor) +java.util.concurrent.CompletableFuture#completedFuture(java.lang.Object) +java.util.concurrent.CompletableFuture#failedFuture(java.lang.Throwable) + +org.elasticsearch.common.util.concurrent.BaseFuture#toCompletableFuture() + java.lang.Character#codePointBefore(char[],int) @ Implicit start offset is error-prone when the char[] is a buffer and the first chars are random chars java.lang.Character#codePointAt(char[],int) @ Implicit end offset is error-prone when the char[] is a buffer and the last chars are random chars diff --git a/libs/grok/src/test/java/org/elasticsearch/grok/ThreadWatchdogTests.java b/libs/grok/src/test/java/org/elasticsearch/grok/ThreadWatchdogTests.java index 29e2351215f60..4ba6c62f0fa7d 100644 --- a/libs/grok/src/test/java/org/elasticsearch/grok/ThreadWatchdogTests.java +++ b/libs/grok/src/test/java/org/elasticsearch/grok/ThreadWatchdogTests.java @@ -18,13 +18,14 @@ */ package org.elasticsearch.grok; +import org.elasticsearch.common.util.concurrent.BaseFuture; +import org.elasticsearch.test.ESTestCase; +import org.mockito.Mockito; + import java.util.Map; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.elasticsearch.test.ESTestCase; -import org.mockito.Mockito; import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.any; @@ -84,7 +85,7 @@ public void testIdleIfNothingRegistered() throws Exception { (delay, command) -> threadPool.schedule(command, delay, TimeUnit.MILLISECONDS)); // Periodic action is not scheduled because no thread is registered verifyZeroInteractions(threadPool); - CompletableFuture commandFuture = new CompletableFuture<>(); + BaseFuture commandFuture = new BaseFuture<>(); // Periodic action is scheduled because a thread is registered doAnswer(invocationOnMock -> { commandFuture.complete((Runnable) invocationOnMock.getArguments()[0]); diff --git a/qa/evil-tests/src/test/java/org/elasticsearch/common/util/concurrent/BaseFutureTests.java b/qa/evil-tests/src/test/java/org/elasticsearch/common/util/concurrent/BaseFutureTests.java index 9bec5577c4247..15bf5e8d26d40 100644 --- a/qa/evil-tests/src/test/java/org/elasticsearch/common/util/concurrent/BaseFutureTests.java +++ b/qa/evil-tests/src/test/java/org/elasticsearch/common/util/concurrent/BaseFutureTests.java @@ -25,7 +25,6 @@ import org.junit.After; import org.junit.Before; -import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; diff --git a/server/src/main/java/org/elasticsearch/common/cache/Cache.java b/server/src/main/java/org/elasticsearch/common/cache/Cache.java index 67061a1533475..403093d937512 100644 --- a/server/src/main/java/org/elasticsearch/common/cache/Cache.java +++ b/server/src/main/java/org/elasticsearch/common/cache/Cache.java @@ -20,6 +20,7 @@ package org.elasticsearch.common.cache; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.util.concurrent.BaseFuture; import org.elasticsearch.common.util.concurrent.ReleasableLock; import java.util.Arrays; @@ -27,7 +28,6 @@ import java.util.Iterator; import java.util.Map; import java.util.Objects; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReadWriteLock; @@ -191,7 +191,7 @@ private static class CacheSegment { ReleasableLock readLock = new ReleasableLock(segmentLock.readLock()); ReleasableLock writeLock = new ReleasableLock(segmentLock.writeLock()); - Map>> map = new HashMap<>(); + Map>> map = new HashMap<>(); SegmentStats segmentStats = new SegmentStats(); @@ -206,7 +206,7 @@ private static class CacheSegment { * @return the entry if there was one, otherwise null */ Entry get(K key, long now, Predicate> isExpired, Consumer> onExpiration) { - CompletableFuture> future; + BaseFuture> future; try (ReleasableLock ignored = readLock.acquire()) { future = map.get(key); } @@ -249,7 +249,7 @@ Tuple, Entry> put(K key, V value, long now) { Entry existing = null; try (ReleasableLock ignored = writeLock.acquire()) { try { - CompletableFuture> future = map.put(key, CompletableFuture.completedFuture(entry)); + BaseFuture> future = map.put(key, BaseFuture.completedFuture(entry)); if (future != null) { existing = future.handle((ok, ex) -> { if (ok != null) { @@ -272,8 +272,8 @@ Tuple, Entry> put(K key, V value, long now) { * @param key the key of the entry to remove from the cache * @param onRemoval a callback for the removed entry */ - void remove(K key, Consumer>> onRemoval) { - CompletableFuture> future; + void remove(K key, Consumer>> onRemoval) { + BaseFuture> future; try (ReleasableLock ignored = writeLock.acquire()) { future = map.remove(key); } @@ -291,8 +291,8 @@ void remove(K key, Consumer>> onRemoval) { * @param value the value expected to be associated with the key * @param onRemoval a callback for the removed entry */ - void remove(K key, V value, Consumer>> onRemoval) { - CompletableFuture> future; + void remove(K key, V value, Consumer>> onRemoval) { + BaseFuture> future; boolean removed = false; try (ReleasableLock ignored = writeLock.acquire()) { future = map.get(key); @@ -400,8 +400,8 @@ public V computeIfAbsent(K key, CacheLoader loader) throws ExecutionExcept // the segment lock; to do this, we atomically put a future in the map that can load the value, and then // get the value from this future on the thread that won the race to place the future into the segment map CacheSegment segment = getCacheSegment(key); - CompletableFuture> future; - CompletableFuture> completableFuture = new CompletableFuture<>(); + BaseFuture> future; + BaseFuture> completableFuture = new BaseFuture<>(); try (ReleasableLock ignored = segment.writeLock.acquire()) { future = segment.map.putIfAbsent(key, completableFuture); @@ -415,7 +415,7 @@ public V computeIfAbsent(K key, CacheLoader loader) throws ExecutionExcept return ok.value; } else { try (ReleasableLock ignored = segment.writeLock.acquire()) { - CompletableFuture> sanity = segment.map.get(key); + BaseFuture> sanity = segment.map.get(key); if (sanity != null && sanity.isCompletedExceptionally()) { segment.map.remove(key); } @@ -424,7 +424,7 @@ public V computeIfAbsent(K key, CacheLoader loader) throws ExecutionExcept } }; - CompletableFuture completableValue; + BaseFuture completableValue; if (future == null) { future = completableFuture; completableValue = future.handle(handler); @@ -490,7 +490,7 @@ private void put(K key, V value, long now) { } } - private final Consumer>> invalidationConsumer = f -> { + private final Consumer>> invalidationConsumer = f -> { try { Entry entry = f.get(); try (ReleasableLock ignored = lruLock.acquire()) { diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java index 45357ece5965d..bd14a915f0fc0 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java @@ -22,6 +22,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transports; @@ -40,18 +41,13 @@ import java.util.function.Supplier; /** - * Wraps a CompletableFuture and ensures Errors are properly bubbled up - * TODO: - * - add CompletableFuture class and methods to forbidden APIs: - * - blacklist constructor methods of CompletableFuture and static methods that create CompletableFuture instances - * - blacklist methods that return MinimalStage (completedStage, failedStage, minimalCompletionStage(possibly override this one)) - * - blacklist static methods on CompletableFuture (failedFuture, allOf, anyOf, 2 * supplyAsync, 2 * runAsync, completedFuture) - * - provide corresponding methods for the forbidden static ones on BaseFuture + * Wraps a CompletableFuture and ensures Errors are properly bubbled up to the uncaught exception handler */ public class BaseFuture implements Future, CompletionStage { private final CompletableFuture wrapped; + @SuppressForbidden(reason = "safely wraps CompletableFuture") public BaseFuture() { this(new CompletableFuture<>()); } @@ -84,7 +80,7 @@ public V join() { private static final String BLOCKING_OP_REASON = "Blocking operation"; - private static boolean blockingAllowed() { + protected boolean blockingAllowed() { return Transports.assertNotTransportThread(BLOCKING_OP_REASON) && ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON) && ClusterApplierService.assertNotClusterStateUpdateThread(BLOCKING_OP_REASON) && @@ -95,6 +91,7 @@ public V getNow(V valueIfAbsent) { return wrapped.getNow(valueIfAbsent); } + @SuppressForbidden(reason = "just delegating") @Override public boolean cancel(boolean mayInterruptIfRunning) { return wrapped.cancel(mayInterruptIfRunning); @@ -127,26 +124,32 @@ public String toString() { return "BaseFuture{" + wrapped + "}"; } + @SuppressForbidden(reason = "safely wraps CompletableFuture") public static BaseFuture allOf(BaseFuture... cfs) { return new BaseFuture<>(CompletableFuture.allOf(Arrays.stream(cfs).map(bf -> bf.wrapped).toArray(CompletableFuture[]::new))); } + @SuppressForbidden(reason = "safely wraps CompletableFuture") public static BaseFuture anyOf(BaseFuture... cfs) { return new BaseFuture<>(CompletableFuture.anyOf(Arrays.stream(cfs).map(bf -> bf.wrapped).toArray(CompletableFuture[]::new))); } + @SuppressForbidden(reason = "safely wraps CompletableFuture") public static BaseFuture supplyAsync(Supplier supplier, Executor executor) { return new BaseFuture<>(CompletableFuture.supplyAsync(supplier, executor)); } + @SuppressForbidden(reason = "safely wraps CompletableFuture") public static BaseFuture runAsync(Runnable runnable, Executor executor) { return new BaseFuture<>(CompletableFuture.runAsync(runnable, executor)); } + @SuppressForbidden(reason = "safely wraps CompletableFuture") public static BaseFuture completedFuture(U value) { return new BaseFuture<>(CompletableFuture.completedFuture(value)); } + @SuppressForbidden(reason = "safely wraps CompletableFuture") public static BaseFuture failedFuture(Throwable ex) { final BaseFuture fut = new BaseFuture<>(); fut.completeExceptionally(ex); diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 87ecbf03b8609..48813191b82ff 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.cluster.coordination.JoinTaskExecutor; import org.elasticsearch.cluster.coordination.NodeRemovalClusterStateTaskExecutor; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -53,11 +54,11 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.BaseFuture; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.DiscoveryStats; -import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.discovery.zen.PublishClusterStateAction.IncomingClusterStateListener; import org.elasticsearch.gateway.GatewayMetaState; import org.elasticsearch.tasks.Task; @@ -77,7 +78,6 @@ import java.util.List; import java.util.Locale; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -963,7 +963,7 @@ public void handleException(TransportException exp) { } private ZenPing.PingCollection pingAndWait(TimeValue timeout) { - final CompletableFuture response = new CompletableFuture<>(); + final BaseFuture response = new BaseFuture<>(); try { zenPing.ping(response::complete, timeout); } catch (Exception ex) { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 46f98275740ae..b22627e3f7d18 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.CancellableThreads; +import org.elasticsearch.common.util.concurrent.BaseFuture; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.core.internal.io.Streams; @@ -67,7 +68,6 @@ import java.util.Comparator; import java.util.List; import java.util.Locale; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.function.Supplier; @@ -240,7 +240,7 @@ private boolean isTargetSameHistory() { static void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable, String reason, IndexShard primary, CancellableThreads cancellableThreads, Logger logger) { cancellableThreads.execute(() -> { - CompletableFuture permit = new CompletableFuture<>(); + BaseFuture permit = new BaseFuture<>(); final ActionListener onAcquired = new ActionListener() { @Override public void onResponse(Releasable releasable) { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java index ea45eb42d89a3..d25fd546fa027 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java @@ -200,11 +200,11 @@ static class SimpleFuture extends BaseFuture { } public void markAsDone() { - set(null); + complete(null); } public void markAsFailed(Throwable t) { - setException(t); + completeExceptionally(t); } @Override diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/ListenableFutureTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/ListenableFutureTests.java index a0036b2fa0241..baa280212b659 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/ListenableFutureTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/ListenableFutureTests.java @@ -23,7 +23,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; import java.util.concurrent.BrokenBarrierException; diff --git a/server/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryIT.java b/server/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryIT.java index 31005ea83cd42..54846d4954863 100644 --- a/server/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryIT.java @@ -19,7 +19,6 @@ package org.elasticsearch.discovery.single; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -27,6 +26,8 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.BaseFuture; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.discovery.zen.UnicastHostsProvider; import org.elasticsearch.discovery.zen.UnicastZenPing; import org.elasticsearch.discovery.zen.ZenPing; @@ -44,7 +45,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.Stack; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.function.Function; @@ -103,7 +103,7 @@ protected void finishPingingRound(PingingRound pingingRound) { }; unicastZenPing.start(); closeables.push(unicastZenPing); - final CompletableFuture responses = new CompletableFuture<>(); + final BaseFuture responses = new BaseFuture<>(); unicastZenPing.ping(responses::complete, TimeValue.timeValueSeconds(3)); latch.await(); responses.get(); diff --git a/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java index c5548ae559400..05ad49e4e4db1 100644 --- a/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.ingest; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.BaseFuture; import org.elasticsearch.script.MockScriptEngine; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptModule; @@ -32,7 +33,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.LongSupplier; @@ -135,7 +135,7 @@ public void testActsOnImmutableData() throws Exception { private static void assertMutatingCtxThrows(Consumer> mutation) throws Exception { String scriptName = "conditionalScript"; - CompletableFuture expectedException = new CompletableFuture<>(); + BaseFuture expectedException = new BaseFuture<>(); ScriptService scriptService = new ScriptService(Settings.builder().build(), Collections.singletonMap( Script.DEFAULT_SCRIPT_LANG, diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java index 0ad88c05ccc6e..912a37bce606f 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java @@ -19,13 +19,13 @@ package org.elasticsearch.ingest; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.util.concurrent.BaseFuture; import org.elasticsearch.test.ESTestCase; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.LongSupplier; @@ -38,7 +38,7 @@ public class PipelineProcessorTests extends ESTestCase { public void testExecutesPipeline() throws Exception { String pipelineId = "pipeline"; IngestService ingestService = mock(IngestService.class); - CompletableFuture invoked = new CompletableFuture<>(); + BaseFuture invoked = new BaseFuture<>(); IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); Pipeline pipeline = new Pipeline( pipelineId, null, null, From 151e39b1c92a7d58602d5cf37c45391c19f1b178 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 26 Dec 2018 23:52:41 +0100 Subject: [PATCH 09/17] add more tests --- .../util/concurrent/BaseFutureTests.java | 302 +++++++++++++++++- 1 file changed, 298 insertions(+), 4 deletions(-) diff --git a/qa/evil-tests/src/test/java/org/elasticsearch/common/util/concurrent/BaseFutureTests.java b/qa/evil-tests/src/test/java/org/elasticsearch/common/util/concurrent/BaseFutureTests.java index 15bf5e8d26d40..a417bd2357525 100644 --- a/qa/evil-tests/src/test/java/org/elasticsearch/common/util/concurrent/BaseFutureTests.java +++ b/qa/evil-tests/src/test/java/org/elasticsearch/common/util/concurrent/BaseFutureTests.java @@ -53,11 +53,21 @@ public void testErrorCaught() throws InterruptedException { assertThat(t, hasToString(containsString("future error"))); }; - runExecutionTest( + runFutureTest( () -> new BaseFuture<>().completeExceptionally(new Error("future error")), expected); - runExecutionTest( + runFutureTest( + () -> { + BaseFuture fut = new BaseFuture<>(); + fut.exceptionally(t -> { + throw new Error("future error"); + }); + fut.completeExceptionally(new RuntimeException("test")); + }, + expected); + + runFutureTest( () -> { BaseFuture fut = new BaseFuture<>(); fut.thenRun(() -> { @@ -67,7 +77,7 @@ public void testErrorCaught() throws InterruptedException { }, expected); - runExecutionTest( + runFutureTest( () -> { BaseFuture fut = new BaseFuture<>(); fut.thenRunAsync(() -> { @@ -76,9 +86,293 @@ public void testErrorCaught() throws InterruptedException { fut.complete(new Object()); }, expected); + + runFutureTest( + () -> { + BaseFuture fut = new BaseFuture<>(); + fut.thenApply(o -> { + throw new Error("future error"); + }); + fut.complete(new Object()); + }, + expected); + + runFutureTest( + () -> { + BaseFuture fut = new BaseFuture<>(); + fut.thenApplyAsync(o -> { + throw new Error("future error"); + }, threadPool.generic()); + fut.complete(new Object()); + }, + expected); + + runFutureTest( + () -> { + BaseFuture fut = new BaseFuture<>(); + fut.handle((o, t) -> { + throw new Error("future error"); + }); + fut.complete(new Object()); + }, + expected); + + runFutureTest( + () -> { + BaseFuture fut = new BaseFuture<>(); + fut.handleAsync((o, t) -> { + throw new Error("future error"); + }, threadPool.generic()); + fut.complete(new Object()); + }, + expected); + + runFutureTest( + () -> { + BaseFuture fut1 = new BaseFuture<>(); + BaseFuture fut2 = new BaseFuture<>(); + fut1.acceptEither(fut2, o -> { + throw new Error("future error"); + }); + fut1.complete(new Object()); + }, + expected); + + runFutureTest( + () -> { + BaseFuture fut1 = new BaseFuture<>(); + BaseFuture fut2 = new BaseFuture<>(); + fut1.acceptEither(fut2, o -> { + throw new Error("future error"); + }); + fut2.complete(new Object()); + }, + expected); + + runFutureTest( + () -> { + BaseFuture fut1 = new BaseFuture<>(); + BaseFuture fut2 = new BaseFuture<>(); + fut1.acceptEitherAsync(fut2, o -> { + throw new Error("future error"); + }, threadPool.generic()); + fut1.complete(new Object()); + }, + expected); + + runFutureTest( + () -> { + BaseFuture fut1 = new BaseFuture<>(); + BaseFuture fut2 = new BaseFuture<>(); + fut1.acceptEitherAsync(fut2, o -> { + throw new Error("future error"); + }, threadPool.generic()); + fut2.complete(new Object()); + }, + expected); + + runFutureTest( + () -> { + BaseFuture fut1 = new BaseFuture<>(); + BaseFuture fut2 = new BaseFuture<>(); + fut1.applyToEither(fut2, o -> { + throw new Error("future error"); + }); + fut1.complete(new Object()); + }, + expected); + + runFutureTest( + () -> { + BaseFuture fut1 = new BaseFuture<>(); + BaseFuture fut2 = new BaseFuture<>(); + fut1.applyToEither(fut2, o -> { + throw new Error("future error"); + }); + fut2.complete(new Object()); + }, + expected); + + runFutureTest( + () -> { + BaseFuture fut1 = new BaseFuture<>(); + BaseFuture fut2 = new BaseFuture<>(); + fut1.applyToEitherAsync(fut2, o -> { + throw new Error("future error"); + }, threadPool.generic()); + fut1.complete(new Object()); + }, + expected); + + runFutureTest( + () -> { + BaseFuture fut1 = new BaseFuture<>(); + BaseFuture fut2 = new BaseFuture<>(); + fut1.applyToEitherAsync(fut2, o -> { + throw new Error("future error"); + }, threadPool.generic()); + fut2.complete(new Object()); + }, + expected); + + runFutureTest( + () -> { + BaseFuture fut1 = new BaseFuture<>(); + fut1.whenComplete((o, t) -> { + throw new Error("future error"); + }); + fut1.complete(new Object()); + }, + expected); + + runFutureTest( + () -> { + BaseFuture fut1 = new BaseFuture<>(); + fut1.whenCompleteAsync((o, t) -> { + throw new Error("future error"); + }, threadPool.generic()); + fut1.complete(new Object()); + }, + expected); + + runFutureTest( + () -> { + BaseFuture fut1 = new BaseFuture<>(); + fut1.thenAccept(o -> { + throw new Error("future error"); + }); + fut1.complete(new Object()); + }, + expected); + + runFutureTest( + () -> { + BaseFuture fut1 = new BaseFuture<>(); + fut1.thenAcceptAsync(o -> { + throw new Error("future error"); + }, threadPool.generic()); + fut1.complete(new Object()); + }, + expected); + + runFutureTest( + () -> { + BaseFuture fut1 = new BaseFuture<>(); + BaseFuture fut2 = new BaseFuture<>(); + fut1.runAfterBoth(fut2, () -> { + throw new Error("future error"); + }); + fut1.complete(new Object()); + fut2.complete(new Object()); + }, + expected); + + runFutureTest( + () -> { + BaseFuture fut1 = new BaseFuture<>(); + BaseFuture fut2 = new BaseFuture<>(); + fut1.runAfterBothAsync(fut2, () -> { + throw new Error("future error"); + }, threadPool.generic()); + fut1.complete(new Object()); + fut2.complete(new Object()); + }, + expected); + + runFutureTest( + () -> { + BaseFuture fut1 = new BaseFuture<>(); + BaseFuture fut2 = new BaseFuture<>(); + fut1.runAfterEither(fut2, () -> { + throw new Error("future error"); + }); + fut1.complete(new Object()); + }, + expected); + + runFutureTest( + () -> { + BaseFuture fut1 = new BaseFuture<>(); + BaseFuture fut2 = new BaseFuture<>(); + fut1.runAfterEither(fut2, () -> { + throw new Error("future error"); + }); + fut2.complete(new Object()); + }, + expected); + + runFutureTest( + () -> { + BaseFuture fut1 = new BaseFuture<>(); + BaseFuture fut2 = new BaseFuture<>(); + fut1.runAfterEitherAsync(fut2, () -> { + throw new Error("future error"); + }, threadPool.generic()); + fut1.complete(new Object()); + }, + expected); + + runFutureTest( + () -> { + BaseFuture fut1 = new BaseFuture<>(); + BaseFuture fut2 = new BaseFuture<>(); + fut1.runAfterEitherAsync(fut2, () -> { + throw new Error("future error"); + }, threadPool.generic()); + fut2.complete(new Object()); + }, + expected); + + runFutureTest( + () -> { + BaseFuture fut1 = new BaseFuture<>(); + BaseFuture fut2 = new BaseFuture<>(); + fut1.thenCombine(fut2, (o1, o2) -> { + throw new Error("future error"); + }); + fut1.complete(new Object()); + fut2.complete(new Object()); + }, + expected); + + runFutureTest( + () -> { + BaseFuture fut1 = new BaseFuture<>(); + BaseFuture fut2 = new BaseFuture<>(); + fut1.thenCombineAsync(fut2, (o1, o2) -> { + throw new Error("future error"); + }, threadPool.generic()); + fut1.complete(new Object()); + fut2.complete(new Object()); + }, + expected); + + runFutureTest( + () -> { + BaseFuture fut1 = new BaseFuture<>(); + BaseFuture fut2 = new BaseFuture<>(); + fut1.thenAcceptBoth(fut2, (o1, o2) -> { + throw new Error("future error"); + }); + fut1.complete(new Object()); + fut2.complete(new Object()); + }, + expected); + + runFutureTest( + () -> { + BaseFuture fut1 = new BaseFuture<>(); + BaseFuture fut2 = new BaseFuture<>(); + fut1.thenAcceptBothAsync(fut2, (o1, o2) -> { + throw new Error("future error"); + }, threadPool.generic()); + fut1.complete(new Object()); + fut2.complete(new Object()); + }, + expected); } - private void runExecutionTest(final Runnable runnable, final Consumer consumer) throws InterruptedException { + private void runFutureTest(final Runnable runnable, final Consumer consumer) throws InterruptedException { final AtomicReference throwableReference = new AtomicReference<>(); final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler(); final CountDownLatch uncaughtExceptionHandlerLatch = new CountDownLatch(1); From 62c57014faae7be8fa9169db2dc0864cd0d1374a Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 27 Dec 2018 00:14:47 +0100 Subject: [PATCH 10/17] Cache accesses `Future.get` from ClusterApplierService :( --- .../java/org/elasticsearch/common/cache/Cache.java | 12 +++++++++++- .../common/util/concurrent/BaseFuture.java | 2 +- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/cache/Cache.java b/server/src/main/java/org/elasticsearch/common/cache/Cache.java index 403093d937512..9b1e03d45e51b 100644 --- a/server/src/main/java/org/elasticsearch/common/cache/Cache.java +++ b/server/src/main/java/org/elasticsearch/common/cache/Cache.java @@ -19,9 +19,12 @@ package org.elasticsearch.common.cache; +import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.util.concurrent.BaseFuture; import org.elasticsearch.common.util.concurrent.ReleasableLock; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.Transports; import java.util.Arrays; import java.util.HashMap; @@ -401,7 +404,14 @@ public V computeIfAbsent(K key, CacheLoader loader) throws ExecutionExcept // get the value from this future on the thread that won the race to place the future into the segment map CacheSegment segment = getCacheSegment(key); BaseFuture> future; - BaseFuture> completableFuture = new BaseFuture<>(); + BaseFuture> completableFuture = new BaseFuture>() { + @Override + protected boolean blockingAllowed() { + return Transports.assertNotTransportThread(BLOCKING_OP_REASON) && + ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON) && + MasterService.assertNotMasterUpdateThread(BLOCKING_OP_REASON); + } + }; try (ReleasableLock ignored = segment.writeLock.acquire()) { future = segment.map.putIfAbsent(key, completableFuture); diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java index bd14a915f0fc0..86edeeacdb6e0 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java @@ -78,7 +78,7 @@ public V join() { return wrapped.join(); } - private static final String BLOCKING_OP_REASON = "Blocking operation"; + protected static final String BLOCKING_OP_REASON = "Blocking operation"; protected boolean blockingAllowed() { return Transports.assertNotTransportThread(BLOCKING_OP_REASON) && From da89985bdae774b1b137df218134b0adbf46cc1f Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 27 Dec 2018 11:07:41 +0100 Subject: [PATCH 11/17] fix more tests --- .../action/support/AdapterActionFuture.java | 9 +++ .../action/support/PlainActionFuture.java | 10 ++++ .../cluster/service/MasterService.java | 33 ++++++++--- .../org/elasticsearch/common/cache/Cache.java | 33 ++++++++--- .../common/util/concurrent/BaseFuture.java | 56 ++++++++++--------- .../concurrent/ListenableFutureTests.java | 10 ---- 6 files changed, 99 insertions(+), 52 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/AdapterActionFuture.java b/server/src/main/java/org/elasticsearch/action/support/AdapterActionFuture.java index 75a1f570b2598..f9e3b0d6447c0 100644 --- a/server/src/main/java/org/elasticsearch/action/support/AdapterActionFuture.java +++ b/server/src/main/java/org/elasticsearch/action/support/AdapterActionFuture.java @@ -25,10 +25,19 @@ import org.elasticsearch.common.util.concurrent.BaseFuture; import org.elasticsearch.common.util.concurrent.FutureUtils; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; public abstract class AdapterActionFuture extends BaseFuture implements ActionFuture, ActionListener { + public AdapterActionFuture() { + super(); + } + + protected AdapterActionFuture(CompletableFuture fut) { + super(fut); + } + @Override public T actionGet() { return FutureUtils.get(this); diff --git a/server/src/main/java/org/elasticsearch/action/support/PlainActionFuture.java b/server/src/main/java/org/elasticsearch/action/support/PlainActionFuture.java index 094f82ae31f63..849984779e37b 100644 --- a/server/src/main/java/org/elasticsearch/action/support/PlainActionFuture.java +++ b/server/src/main/java/org/elasticsearch/action/support/PlainActionFuture.java @@ -19,8 +19,18 @@ package org.elasticsearch.action.support; +import java.util.concurrent.CompletableFuture; + public class PlainActionFuture extends AdapterActionFuture { + public PlainActionFuture() { + super(); + } + + protected PlainActionFuture(CompletableFuture fut) { + super(fut); + } + public static PlainActionFuture newFuture() { return new PlainActionFuture<>(); } diff --git a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java index 472ffcd73effd..8a949845a5aed 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java @@ -32,6 +32,8 @@ import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ClusterStateTaskExecutor.ClusterTasksResult; import org.elasticsearch.cluster.ClusterStateTaskListener; +import org.elasticsearch.cluster.coordination.ClusterStatePublisher; +import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -49,9 +51,7 @@ import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.cluster.coordination.ClusterStatePublisher; import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; @@ -60,6 +60,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -122,6 +123,27 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { threadPool.scheduler()); } + private static class MasterFuture extends PlainActionFuture { + + MasterFuture() { + super(); + } + + private MasterFuture(CompletableFuture fut) { + super(fut); + } + + @Override + protected MasterFuture newInstance(CompletableFuture fut) { + return new MasterFuture<>(fut); + } + + @Override + protected boolean blockingAllowed() { + return isMasterUpdateThread() || super.blockingAllowed(); + } + } + class Batcher extends TaskBatcher { Batcher(Logger logger, PrioritizedEsThreadPoolExecutor threadExecutor) { @@ -244,12 +266,7 @@ protected void runTasks(TaskInputs taskInputs) { } protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeNS) { - final PlainActionFuture fut = new PlainActionFuture() { - @Override - protected boolean blockingAllowed() { - return isMasterUpdateThread() || super.blockingAllowed(); - } - }; + final MasterFuture fut = new MasterFuture<>(); clusterStatePublisher.publish(clusterChangedEvent, fut, taskOutputs.createAckListener(threadPool, clusterChangedEvent.state())); // indefinitely wait for publication to complete diff --git a/server/src/main/java/org/elasticsearch/common/cache/Cache.java b/server/src/main/java/org/elasticsearch/common/cache/Cache.java index 9b1e03d45e51b..49f714f9162ac 100644 --- a/server/src/main/java/org/elasticsearch/common/cache/Cache.java +++ b/server/src/main/java/org/elasticsearch/common/cache/Cache.java @@ -31,6 +31,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReadWriteLock; @@ -404,14 +405,7 @@ public V computeIfAbsent(K key, CacheLoader loader) throws ExecutionExcept // get the value from this future on the thread that won the race to place the future into the segment map CacheSegment segment = getCacheSegment(key); BaseFuture> future; - BaseFuture> completableFuture = new BaseFuture>() { - @Override - protected boolean blockingAllowed() { - return Transports.assertNotTransportThread(BLOCKING_OP_REASON) && - ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON) && - MasterService.assertNotMasterUpdateThread(BLOCKING_OP_REASON); - } - }; + BaseFuture> completableFuture = new EntryBaseFuture<>(); try (ReleasableLock ignored = segment.writeLock.acquire()) { future = segment.map.putIfAbsent(key, completableFuture); @@ -659,6 +653,29 @@ public void remove() { }; } + static class EntryBaseFuture extends BaseFuture { + + EntryBaseFuture() { + super(); + } + + private EntryBaseFuture(CompletableFuture fut) { + super(fut); + } + + @Override + protected boolean blockingAllowed() { + return Transports.assertNotTransportThread(BLOCKING_OP_REASON) && + ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON) && + MasterService.assertNotMasterUpdateThread(BLOCKING_OP_REASON); + } + + @Override + protected EntryBaseFuture newInstance(CompletableFuture fut) { + return new EntryBaseFuture<>(fut); + } + } + private class CacheIterator implements Iterator> { private Entry current; private Entry next; diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java index 86edeeacdb6e0..bd80ecbb32208 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/BaseFuture.java @@ -52,7 +52,7 @@ public BaseFuture() { this(new CompletableFuture<>()); } - private BaseFuture(CompletableFuture fut) { + protected BaseFuture(CompletableFuture fut) { wrapped = fut; wrapped.exceptionally(t -> { ExceptionsHelper.maybeDieOnAnotherThread(t); @@ -60,6 +60,10 @@ private BaseFuture(CompletableFuture fut) { }); } + protected BaseFuture newInstance(CompletableFuture fut) { + return new BaseFuture<>(fut); + } + @Override public V get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException { @@ -158,7 +162,7 @@ public static BaseFuture failedFuture(Throwable ex) { @Override public BaseFuture thenApply(Function fn) { - return new BaseFuture<>(wrapped.thenApply(fn)); + return newInstance(wrapped.thenApply(fn)); } @Override @@ -168,12 +172,12 @@ public CompletionStage thenApplyAsync(Function fn @Override public BaseFuture thenApplyAsync(Function fn, Executor executor) { - return new BaseFuture<>(wrapped.thenApplyAsync(fn, executor)); + return newInstance(wrapped.thenApplyAsync(fn, executor)); } @Override public BaseFuture thenAccept(Consumer action) { - return new BaseFuture<>(wrapped.thenAccept(action)); + return newInstance(wrapped.thenAccept(action)); } @Override @@ -183,12 +187,12 @@ public CompletionStage thenAcceptAsync(Consumer action) { @Override public BaseFuture thenAcceptAsync(Consumer action, Executor executor) { - return new BaseFuture<>(wrapped.thenAcceptAsync(action, executor)); + return newInstance(wrapped.thenAcceptAsync(action, executor)); } @Override public BaseFuture thenRun(Runnable action) { - return new BaseFuture<>(wrapped.thenRun(action)); + return newInstance(wrapped.thenRun(action)); } @Override @@ -198,12 +202,12 @@ public CompletionStage thenRunAsync(Runnable action) { @Override public BaseFuture thenRunAsync(Runnable action, Executor executor) { - return new BaseFuture<>(wrapped.thenRunAsync(action, executor)); + return newInstance(wrapped.thenRunAsync(action, executor)); } @Override public BaseFuture thenCombine(CompletionStage other, BiFunction fn) { - return new BaseFuture<>(wrapped.thenCombine(other, fn)); + return newInstance(wrapped.thenCombine(other, fn)); } @Override @@ -215,12 +219,12 @@ public CompletionStage thenCombineAsync(CompletionStage o @Override public BaseFuture thenCombineAsync(CompletionStage other, BiFunction fn, Executor executor) { - return new BaseFuture<>(wrapped.thenCombineAsync(other, fn, executor)); + return newInstance(wrapped.thenCombineAsync(other, fn, executor)); } @Override public BaseFuture thenAcceptBoth(CompletionStage other, BiConsumer action) { - return new BaseFuture<>(wrapped.thenAcceptBoth(other, action)); + return newInstance(wrapped.thenAcceptBoth(other, action)); } @Override @@ -231,12 +235,12 @@ public CompletionStage thenAcceptBothAsync(CompletionStage BaseFuture thenAcceptBothAsync(CompletionStage other, BiConsumer action, Executor executor) { - return new BaseFuture<>(wrapped.thenAcceptBothAsync(other, action, executor)); + return newInstance(wrapped.thenAcceptBothAsync(other, action, executor)); } @Override public BaseFuture runAfterBoth(CompletionStage other, Runnable action) { - return new BaseFuture<>(wrapped.runAfterBoth(other, action)); + return newInstance(wrapped.runAfterBoth(other, action)); } @Override @@ -246,12 +250,12 @@ public CompletionStage runAfterBothAsync(CompletionStage other, Runnabl @Override public BaseFuture runAfterBothAsync(CompletionStage other, Runnable action, Executor executor) { - return new BaseFuture<>(wrapped.runAfterBothAsync(other, action, executor)); + return newInstance(wrapped.runAfterBothAsync(other, action, executor)); } @Override public BaseFuture applyToEither(CompletionStage other, Function fn) { - return new BaseFuture<>(wrapped.applyToEither(other, fn)); + return newInstance(wrapped.applyToEither(other, fn)); } @Override @@ -261,12 +265,12 @@ public CompletionStage applyToEitherAsync(CompletionStage ot @Override public BaseFuture applyToEitherAsync(CompletionStage other, Function fn, Executor executor) { - return new BaseFuture<>(wrapped.applyToEitherAsync(other, fn, executor)); + return newInstance(wrapped.applyToEitherAsync(other, fn, executor)); } @Override public BaseFuture acceptEither(CompletionStage other, Consumer action) { - return new BaseFuture<>(wrapped.acceptEither(other, action)); + return newInstance(wrapped.acceptEither(other, action)); } @Override @@ -276,12 +280,12 @@ public CompletionStage acceptEitherAsync(CompletionStage othe @Override public BaseFuture acceptEitherAsync(CompletionStage other, Consumer action, Executor executor) { - return new BaseFuture<>(wrapped.acceptEitherAsync(other, action, executor)); + return newInstance(wrapped.acceptEitherAsync(other, action, executor)); } @Override public BaseFuture runAfterEither(CompletionStage other, Runnable action) { - return new BaseFuture<>(wrapped.runAfterEither(other, action)); + return newInstance(wrapped.runAfterEither(other, action)); } @Override @@ -291,12 +295,12 @@ public CompletionStage runAfterEitherAsync(CompletionStage other, Runna @Override public BaseFuture runAfterEitherAsync(CompletionStage other, Runnable action, Executor executor) { - return new BaseFuture<>(wrapped.runAfterEitherAsync(other, action, executor)); + return newInstance(wrapped.runAfterEitherAsync(other, action, executor)); } @Override public BaseFuture thenCompose(Function> fn) { - return new BaseFuture<>(wrapped.thenCompose(fn)); + return newInstance(wrapped.thenCompose(fn)); } @Override @@ -306,12 +310,12 @@ public CompletionStage thenComposeAsync(Function BaseFuture thenComposeAsync(Function> fn, Executor executor) { - return new BaseFuture<>(wrapped.thenComposeAsync(fn, executor)); + return newInstance(wrapped.thenComposeAsync(fn, executor)); } @Override public BaseFuture whenComplete(BiConsumer action) { - return new BaseFuture<>(wrapped.whenComplete(action)); + return newInstance(wrapped.whenComplete(action)); } @Override @@ -321,12 +325,12 @@ public CompletionStage whenCompleteAsync(BiConsumer whenCompleteAsync(BiConsumer action, Executor executor) { - return new BaseFuture<>(wrapped.whenCompleteAsync(action, executor)); + return newInstance(wrapped.whenCompleteAsync(action, executor)); } @Override public BaseFuture handle(BiFunction fn) { - return new BaseFuture<>(wrapped.handle(fn)); + return newInstance(wrapped.handle(fn)); } @Override @@ -336,12 +340,12 @@ public CompletionStage handleAsync(BiFunction BaseFuture handleAsync(BiFunction fn, Executor executor) { - return new BaseFuture<>(wrapped.handleAsync(fn, executor)); + return newInstance(wrapped.handleAsync(fn, executor)); } @Override public BaseFuture exceptionally(Function fn) { - return new BaseFuture<>(wrapped.exceptionally(fn)); + return newInstance(wrapped.exceptionally(fn)); } @Override diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/ListenableFutureTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/ListenableFutureTests.java index baa280212b659..75a2e29946179 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/ListenableFutureTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/ListenableFutureTests.java @@ -58,16 +58,6 @@ public void testListenableFutureNotifiesListeners() { assertTrue(future.isDone()); } - public void testThrowableListener() { - executorService = EsExecutors.newFixed("testThrowableListener", 1, 1000, - EsExecutors.daemonThreadFactory("listener"), threadContext); - ListenableFuture context = new ListenableFuture<>(); - context.addListener(ActionListener.wrap(() -> { - assert false : "Should not fail"; - }), executorService, threadContext); - context.onResponse(null); - } - public void testListenableFutureNotifiesListenersOnException() { ListenableFuture future = new ListenableFuture<>(); AtomicInteger notifications = new AtomicInteger(0); From dc0edbd3ddbaae8399b194c1b9aa1ebab9d250b3 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 27 Dec 2018 11:57:59 +0100 Subject: [PATCH 12/17] allow blocking calls on Cache's future :/ --- .../src/main/java/org/elasticsearch/common/cache/Cache.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/cache/Cache.java b/server/src/main/java/org/elasticsearch/common/cache/Cache.java index 49f714f9162ac..70f9ae86fbb77 100644 --- a/server/src/main/java/org/elasticsearch/common/cache/Cache.java +++ b/server/src/main/java/org/elasticsearch/common/cache/Cache.java @@ -665,9 +665,7 @@ private EntryBaseFuture(CompletableFuture fut) { @Override protected boolean blockingAllowed() { - return Transports.assertNotTransportThread(BLOCKING_OP_REASON) && - ThreadPool.assertNotScheduleThread(BLOCKING_OP_REASON) && - MasterService.assertNotMasterUpdateThread(BLOCKING_OP_REASON); + return true; // We're calling .get() both from transport and cluster state update threads. TODO: investigate and fix } @Override From 553c02c561c7830278192917a6cbded6c931059f Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 27 Dec 2018 12:09:16 +0100 Subject: [PATCH 13/17] fixed imports --- server/src/main/java/org/elasticsearch/common/cache/Cache.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/cache/Cache.java b/server/src/main/java/org/elasticsearch/common/cache/Cache.java index 70f9ae86fbb77..00a9aabb5f446 100644 --- a/server/src/main/java/org/elasticsearch/common/cache/Cache.java +++ b/server/src/main/java/org/elasticsearch/common/cache/Cache.java @@ -19,12 +19,9 @@ package org.elasticsearch.common.cache; -import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.util.concurrent.BaseFuture; import org.elasticsearch.common.util.concurrent.ReleasableLock; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.Transports; import java.util.Arrays; import java.util.HashMap; From 2fdbaf7274ca2984f4c7b56c2168aecb6040bcd4 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 27 Dec 2018 12:23:42 +0100 Subject: [PATCH 14/17] cosmetic changes --- .../common/util/concurrent/BaseFutureTests.java | 2 +- .../action/support/PlainListenableActionFuture.java | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/qa/evil-tests/src/test/java/org/elasticsearch/common/util/concurrent/BaseFutureTests.java b/qa/evil-tests/src/test/java/org/elasticsearch/common/util/concurrent/BaseFutureTests.java index a417bd2357525..7495526fd0522 100644 --- a/qa/evil-tests/src/test/java/org/elasticsearch/common/util/concurrent/BaseFutureTests.java +++ b/qa/evil-tests/src/test/java/org/elasticsearch/common/util/concurrent/BaseFutureTests.java @@ -39,7 +39,7 @@ public class BaseFutureTests extends ESTestCase { @Before public void setUpThreadPool() { - threadPool = new TestThreadPool(EvilThreadPoolTests.class.getName()); + threadPool = new TestThreadPool(BaseFutureTests.class.getName()); } @After diff --git a/server/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java b/server/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java index 7774e904a6fe5..766673e41929d 100644 --- a/server/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java +++ b/server/src/main/java/org/elasticsearch/action/support/PlainListenableActionFuture.java @@ -21,7 +21,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.ExceptionsHelper; +import org.apache.logging.log4j.core.util.Throwables; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ListenableActionFuture; import org.elasticsearch.threadpool.ThreadPool; @@ -61,10 +61,10 @@ public void addListener(final ActionListener listener) { } catch (Exception e) { listener.onFailure(e); } - } else { - assert throwable instanceof Exception : "Expected exception but was: " + throwable.getClass(); - ExceptionsHelper.maybeDieOnAnotherThread(throwable); + } else if (throwable instanceof Exception) { listener.onFailure((Exception) throwable); + } else { + Throwables.rethrow(throwable); } }); } From 37a6194664c84dc43639a4577faee21f320cbec1 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 27 Dec 2018 12:38:35 +0100 Subject: [PATCH 15/17] properly wrap EntryFuture in Cache everywhere --- .../org/elasticsearch/common/cache/Cache.java | 38 +++++++++++-------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/cache/Cache.java b/server/src/main/java/org/elasticsearch/common/cache/Cache.java index 00a9aabb5f446..1ef3eb6be6241 100644 --- a/server/src/main/java/org/elasticsearch/common/cache/Cache.java +++ b/server/src/main/java/org/elasticsearch/common/cache/Cache.java @@ -19,6 +19,7 @@ package org.elasticsearch.common.cache; +import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.util.concurrent.BaseFuture; import org.elasticsearch.common.util.concurrent.ReleasableLock; @@ -192,7 +193,7 @@ private static class CacheSegment { ReleasableLock readLock = new ReleasableLock(segmentLock.readLock()); ReleasableLock writeLock = new ReleasableLock(segmentLock.writeLock()); - Map>> map = new HashMap<>(); + Map>> map = new HashMap<>(); SegmentStats segmentStats = new SegmentStats(); @@ -207,7 +208,7 @@ private static class CacheSegment { * @return the entry if there was one, otherwise null */ Entry get(K key, long now, Predicate> isExpired, Consumer> onExpiration) { - BaseFuture> future; + EntryFuture> future; try (ReleasableLock ignored = readLock.acquire()) { future = map.get(key); } @@ -250,7 +251,7 @@ Tuple, Entry> put(K key, V value, long now) { Entry existing = null; try (ReleasableLock ignored = writeLock.acquire()) { try { - BaseFuture> future = map.put(key, BaseFuture.completedFuture(entry)); + EntryFuture> future = map.put(key, EntryFuture.completedFuture(entry)); if (future != null) { existing = future.handle((ok, ex) -> { if (ok != null) { @@ -273,8 +274,8 @@ Tuple, Entry> put(K key, V value, long now) { * @param key the key of the entry to remove from the cache * @param onRemoval a callback for the removed entry */ - void remove(K key, Consumer>> onRemoval) { - BaseFuture> future; + void remove(K key, Consumer>> onRemoval) { + EntryFuture> future; try (ReleasableLock ignored = writeLock.acquire()) { future = map.remove(key); } @@ -292,8 +293,8 @@ void remove(K key, Consumer>> onRemoval) { * @param value the value expected to be associated with the key * @param onRemoval a callback for the removed entry */ - void remove(K key, V value, Consumer>> onRemoval) { - BaseFuture> future; + void remove(K key, V value, Consumer>> onRemoval) { + EntryFuture> future; boolean removed = false; try (ReleasableLock ignored = writeLock.acquire()) { future = map.get(key); @@ -401,8 +402,8 @@ public V computeIfAbsent(K key, CacheLoader loader) throws ExecutionExcept // the segment lock; to do this, we atomically put a future in the map that can load the value, and then // get the value from this future on the thread that won the race to place the future into the segment map CacheSegment segment = getCacheSegment(key); - BaseFuture> future; - BaseFuture> completableFuture = new EntryBaseFuture<>(); + EntryFuture> future; + EntryFuture> completableFuture = new EntryFuture<>(); try (ReleasableLock ignored = segment.writeLock.acquire()) { future = segment.map.putIfAbsent(key, completableFuture); @@ -416,7 +417,7 @@ public V computeIfAbsent(K key, CacheLoader loader) throws ExecutionExcept return ok.value; } else { try (ReleasableLock ignored = segment.writeLock.acquire()) { - BaseFuture> sanity = segment.map.get(key); + EntryFuture> sanity = segment.map.get(key); if (sanity != null && sanity.isCompletedExceptionally()) { segment.map.remove(key); } @@ -491,7 +492,7 @@ private void put(K key, V value, long now) { } } - private final Consumer>> invalidationConsumer = f -> { + private final Consumer>> invalidationConsumer = f -> { try { Entry entry = f.get(); try (ReleasableLock ignored = lruLock.acquire()) { @@ -650,13 +651,13 @@ public void remove() { }; } - static class EntryBaseFuture extends BaseFuture { + static class EntryFuture extends BaseFuture { - EntryBaseFuture() { + EntryFuture() { super(); } - private EntryBaseFuture(CompletableFuture fut) { + private EntryFuture(CompletableFuture fut) { super(fut); } @@ -666,8 +667,13 @@ protected boolean blockingAllowed() { } @Override - protected EntryBaseFuture newInstance(CompletableFuture fut) { - return new EntryBaseFuture<>(fut); + protected EntryFuture newInstance(CompletableFuture fut) { + return new EntryFuture<>(fut); + } + + @SuppressForbidden(reason = "safely wraps CompletableFuture") + public static EntryFuture completedFuture(U value) { + return new EntryFuture<>(CompletableFuture.completedFuture(value)); } } From fe9be315a0c92d09de00c4e241124e08e47ea6d9 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 27 Dec 2018 13:08:07 +0100 Subject: [PATCH 16/17] checkstyle --- .../elasticsearch/common/util/concurrent/BaseFutureTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/qa/evil-tests/src/test/java/org/elasticsearch/common/util/concurrent/BaseFutureTests.java b/qa/evil-tests/src/test/java/org/elasticsearch/common/util/concurrent/BaseFutureTests.java index 7495526fd0522..617b3778b7d6e 100644 --- a/qa/evil-tests/src/test/java/org/elasticsearch/common/util/concurrent/BaseFutureTests.java +++ b/qa/evil-tests/src/test/java/org/elasticsearch/common/util/concurrent/BaseFutureTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.common.util.concurrent; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.EvilThreadPoolTests; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; From 559ae9d2f93296ccbbd1fe8ba2a529aeb4f34fc9 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 12 Jan 2022 10:44:13 +0100 Subject: [PATCH 17/17] Update docs/changelog/32512.yaml --- docs/changelog/32512.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/32512.yaml diff --git a/docs/changelog/32512.yaml b/docs/changelog/32512.yaml new file mode 100644 index 0000000000000..090d970d42bf7 --- /dev/null +++ b/docs/changelog/32512.yaml @@ -0,0 +1,5 @@ +pr: 32512 +summary: Replace custom Future implementations by `CompletableFuture` +area: Infra/Core +type: enhancement +issues: []