From 603177285a241166ba5020aa125611d751dbd6d2 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 11 Feb 2019 17:22:24 -0700 Subject: [PATCH 1/4] WIP --- .../action/support/ListenerTimeouts.java | 77 +++++++++++++++ .../action/support/ListenerTimeoutsTests.java | 93 +++++++++++++++++++ .../xpack/ccr/repository/CcrRepository.java | 14 ++- .../xpack/ccr/CcrRepositoryIT.java | 54 +++++------ 4 files changed, 207 insertions(+), 31 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/action/support/ListenerTimeouts.java create mode 100644 server/src/test/java/org/elasticsearch/action/support/ListenerTimeoutsTests.java diff --git a/server/src/main/java/org/elasticsearch/action/support/ListenerTimeouts.java b/server/src/main/java/org/elasticsearch/action/support/ListenerTimeouts.java new file mode 100644 index 0000000000000..d8fc32b860a4a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/support/ListenerTimeouts.java @@ -0,0 +1,77 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support; + +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.threadpool.Scheduler; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.concurrent.atomic.AtomicBoolean; + +public class ListenerTimeouts { + + public static ActionListener wrapWithTimeout(ThreadPool threadPool, ActionListener listener, + TimeValue timeout, String listenerName) { + TimeoutableListener wrappedListener = new TimeoutableListener<>(listener, timeout, listenerName); + wrappedListener.cancellable = threadPool.schedule(wrappedListener, timeout, ThreadPool.Names.GENERIC); + return wrappedListener; + } + + private static class TimeoutableListener implements ActionListener, Runnable { + + private final AtomicBoolean isDone = new AtomicBoolean(false); + private final ActionListener delegate; + private final TimeValue timeout; + private final String listenerName; + private volatile Scheduler.ScheduledCancellable cancellable; + + private TimeoutableListener(ActionListener delegate, TimeValue timeout, String listenerName) { + this.delegate = delegate; + this.timeout = timeout; + this.listenerName = listenerName; + } + + @Override + public void onResponse(Response response) { + if (isDone.compareAndSet(false, true)) { + cancellable.cancel(); + delegate.onResponse(response); + } + } + + @Override + public void onFailure(Exception e) { + if (isDone.compareAndSet(false, true)) { + cancellable.cancel(); + delegate.onFailure(e); + } + } + + @Override + public void run() { + if (isDone.compareAndSet(false, true)) { + String timeoutMessage = "[" + listenerName + "]" + " timed out after [" + timeout + "]"; + delegate.onFailure(new ElasticsearchTimeoutException(timeoutMessage)); + } + } + } +} diff --git a/server/src/test/java/org/elasticsearch/action/support/ListenerTimeoutsTests.java b/server/src/test/java/org/elasticsearch/action/support/ListenerTimeoutsTests.java new file mode 100644 index 0000000000000..50a7eec18136b --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/support/ListenerTimeoutsTests.java @@ -0,0 +1,93 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support; + +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.hamcrest.core.IsInstanceOf.instanceOf; + +public class ListenerTimeoutsTests extends ESTestCase { + + private final TimeValue timeout = TimeValue.timeValueMillis(10); + private DeterministicTaskQueue taskQueue; + + @Before + public void setUp() throws Exception { + super.setUp(); + Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build(); + taskQueue = new DeterministicTaskQueue(settings, random()); + } + + public void testListenerTimeout() { + AtomicBoolean completed = new AtomicBoolean(false); + AtomicReference exception = new AtomicReference<>(); + ActionListener listener = wrap(completed, exception); + + ActionListener wrapped = ListenerTimeouts.wrapWithTimeout(taskQueue.getThreadPool(), listener, timeout, "test"); + assertTrue(taskQueue.hasDeferredTasks()); + taskQueue.advanceTime(); + taskQueue.runAllRunnableTasks(); + + wrapped.onResponse(null); + + assertFalse(completed.get()); + assertThat(exception.get(), instanceOf(ElasticsearchTimeoutException.class)); + } + + public void testFinishBeforeTimeout() { + AtomicBoolean completed = new AtomicBoolean(false); + AtomicReference exception = new AtomicReference<>(); + ActionListener listener = wrap(completed, exception); + + ActionListener wrapped = ListenerTimeouts.wrapWithTimeout(taskQueue.getThreadPool(), listener, timeout, "test"); + wrapped.onResponse(null); + + assertTrue(taskQueue.hasDeferredTasks()); + taskQueue.advanceTime(); + taskQueue.runAllRunnableTasks(); + + assertTrue(completed.get()); + assertNull(exception.get()); + } + + private ActionListener wrap(AtomicBoolean completed, AtomicReference exception) { + return new ActionListener() { + @Override + public void onResponse(Void aVoid) { + completed.set(true); + } + + @Override + public void onFailure(Exception e) { + exception.set(e); + } + }; + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index d613200531c5c..98ae6a8c05631 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.action.support.ListenerTimeouts; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -105,7 +106,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit private final ThreadPool threadPool; private final CounterMetric throttledTime = new CounterMetric(); - + public CcrRepository(RepositoryMetaData metadata, Client client, CcrLicenseChecker ccrLicenseChecker, Settings settings, CcrSettings ccrSettings, ThreadPool threadPool) { this.metadata = metadata; @@ -389,7 +390,8 @@ private static class FileSession { protected void restoreFiles(List filesToRecover, Store store) throws IOException { logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover); - try (MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> {})) { + try (MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> { + })) { final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); final AtomicReference> error = new AtomicReference<>(); @@ -444,8 +446,9 @@ protected void restoreFiles(List filesToRecover, Store store) throws I logger.trace("[{}] [{}] fetching chunk for file [{}], expected offset: {}, size: {}", shardId, snapshotId, fileToRecover.name(), fileSession.lastOffset, bytesRequested); - remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request, - ActionListener.wrap( + TimeValue timeout = ccrSettings.getRecoveryActionTimeout(); + ActionListener listener = + ListenerTimeouts.wrapWithTimeout(threadPool, ActionListener.wrap( r -> threadPool.generic().execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { @@ -491,7 +494,8 @@ protected void doRun() throws Exception { error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e)); requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId); } - )); + ), timeout, GetCcrRestoreFileChunkAction.NAME); + remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request, listener); } catch (Exception e) { error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e)); requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java index 9f061b9c33099..7c938d0146d95 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java @@ -37,6 +37,7 @@ import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportActionProxy; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.CcrIntegTestCase; import org.elasticsearch.xpack.ccr.action.repositories.GetCcrRestoreFileChunkAction; @@ -325,7 +326,6 @@ public void testRateLimitingIsEmployed() throws Exception { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38027") public void testIndividualActionsTimeout() throws Exception { ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); TimeValue timeValue = TimeValue.timeValueMillis(100); @@ -348,7 +348,8 @@ public void testIndividualActionsTimeout() throws Exception { MockTransportService mockTransportService = (MockTransportService) transportService; transportServices.add(mockTransportService); mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> { - if (action.equals(GetCcrRestoreFileChunkAction.NAME) == false) { + if (action.equals(GetCcrRestoreFileChunkAction.NAME) == false && + action.equals(TransportActionProxy.getProxyAction(GetCcrRestoreFileChunkAction.NAME)) == false) { connection.sendRequest(requestId, action, request, options); } }); @@ -370,33 +371,34 @@ public void testIndividualActionsTimeout() throws Exception { .renameReplacement(followerIndex).masterNodeTimeout(new TimeValue(1L, TimeUnit.HOURS)) .indexSettings(settingsBuilder); - final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class); - final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class); - PlainActionFuture future = PlainActionFuture.newFuture(); - restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future)); - - // Depending on when the timeout occurs this can fail in two ways. If it times-out when fetching - // metadata this will throw an exception. If it times-out when restoring a shard, the shard will - // be marked as failed. Either one is a success for the purpose of this test. try { - RestoreInfo restoreInfo = future.actionGet(); - assertThat(restoreInfo.failedShards(), greaterThan(0)); - assertThat(restoreInfo.successfulShards(), lessThan(restoreInfo.totalShards())); - assertEquals(numberOfPrimaryShards, restoreInfo.totalShards()); - } catch (Exception e) { - assertThat(ExceptionsHelper.unwrapCause(e), instanceOf(ElasticsearchTimeoutException.class)); - } - + final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class); + final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class); + PlainActionFuture future = PlainActionFuture.newFuture(); + restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future)); + + // Depending on when the timeout occurs this can fail in two ways. If it times-out when fetching + // metadata this will throw an exception. If it times-out when restoring a shard, the shard will + // be marked as failed. Either one is a success for the purpose of this test. + try { + RestoreInfo restoreInfo = future.actionGet(); + assertThat(restoreInfo.failedShards(), greaterThan(0)); + assertThat(restoreInfo.successfulShards(), lessThan(restoreInfo.totalShards())); + assertEquals(numberOfPrimaryShards, restoreInfo.totalShards()); + } catch (Exception e) { + assertThat(ExceptionsHelper.unwrapCause(e), instanceOf(ElasticsearchTimeoutException.class)); + } + } finally { + for (MockTransportService transportService : transportServices) { + transportService.clearAllRules(); + } - for (MockTransportService transportService : transportServices) { - transportService.clearAllRules(); + settingsRequest = new ClusterUpdateSettingsRequest(); + TimeValue defaultValue = CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getDefault(Settings.EMPTY); + settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getKey(), + defaultValue)); + assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); } - - settingsRequest = new ClusterUpdateSettingsRequest(); - TimeValue defaultValue = CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getDefault(Settings.EMPTY); - settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.getKey(), - defaultValue)); - assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); } public void testFollowerMappingIsUpdated() throws IOException { From 32c040f2feb484eebc09c7f60b7cc5828d1b92a3 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 11 Feb 2019 17:30:05 -0700 Subject: [PATCH 2/4] Add docs --- .../action/support/ListenerTimeouts.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/action/support/ListenerTimeouts.java b/server/src/main/java/org/elasticsearch/action/support/ListenerTimeouts.java index d8fc32b860a4a..44a620341a9b1 100644 --- a/server/src/main/java/org/elasticsearch/action/support/ListenerTimeouts.java +++ b/server/src/main/java/org/elasticsearch/action/support/ListenerTimeouts.java @@ -29,6 +29,17 @@ public class ListenerTimeouts { + /** + * Wraps a listener with a listener that can timeout. After the timeout period the + * {@link ActionListener#onFailure(Exception)} will be called with a + * {@link ElasticsearchTimeoutException} if the listener has not already been completed. + * + * @param threadPool used to schedule the timeout + * @param listener to that can timeout + * @param timeout period before listener failed + * @param listenerName name of the listener for timeout exception + * @return the wrapped listener that will timeout + */ public static ActionListener wrapWithTimeout(ThreadPool threadPool, ActionListener listener, TimeValue timeout, String listenerName) { TimeoutableListener wrappedListener = new TimeoutableListener<>(listener, timeout, listenerName); From 63714fe8a1d99061df0c966a675da2c1d5171ec0 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 12 Feb 2019 10:05:26 -0700 Subject: [PATCH 3/4] Changes --- .../action/support/ListenerTimeouts.java | 7 +++-- .../action/support/ListenerTimeoutsTests.java | 30 +++++++++++++++++-- .../xpack/ccr/repository/CcrRepository.java | 2 +- 3 files changed, 32 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/ListenerTimeouts.java b/server/src/main/java/org/elasticsearch/action/support/ListenerTimeouts.java index 44a620341a9b1..df9afd32ca21c 100644 --- a/server/src/main/java/org/elasticsearch/action/support/ListenerTimeouts.java +++ b/server/src/main/java/org/elasticsearch/action/support/ListenerTimeouts.java @@ -37,13 +37,14 @@ public class ListenerTimeouts { * @param threadPool used to schedule the timeout * @param listener to that can timeout * @param timeout period before listener failed + * @param executor to use for scheduling timeout * @param listenerName name of the listener for timeout exception * @return the wrapped listener that will timeout */ - public static ActionListener wrapWithTimeout(ThreadPool threadPool, ActionListener listener, - TimeValue timeout, String listenerName) { + public static ActionListener wrapWithTimeout(ThreadPool threadPool, ActionListener listener, + TimeValue timeout, String executor, String listenerName) { TimeoutableListener wrappedListener = new TimeoutableListener<>(listener, timeout, listenerName); - wrappedListener.cancellable = threadPool.schedule(wrappedListener, timeout, ThreadPool.Names.GENERIC); + wrappedListener.cancellable = threadPool.schedule(wrappedListener, timeout, executor); return wrappedListener; } diff --git a/server/src/test/java/org/elasticsearch/action/support/ListenerTimeoutsTests.java b/server/src/test/java/org/elasticsearch/action/support/ListenerTimeoutsTests.java index 50a7eec18136b..e78221a9eda75 100644 --- a/server/src/test/java/org/elasticsearch/action/support/ListenerTimeoutsTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/ListenerTimeoutsTests.java @@ -25,8 +25,10 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.junit.Before; +import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -36,6 +38,7 @@ public class ListenerTimeoutsTests extends ESTestCase { private final TimeValue timeout = TimeValue.timeValueMillis(10); + private final String generic = ThreadPool.Names.GENERIC; private DeterministicTaskQueue taskQueue; @Before @@ -50,23 +53,26 @@ public void testListenerTimeout() { AtomicReference exception = new AtomicReference<>(); ActionListener listener = wrap(completed, exception); - ActionListener wrapped = ListenerTimeouts.wrapWithTimeout(taskQueue.getThreadPool(), listener, timeout, "test"); + ActionListener wrapped = ListenerTimeouts.wrapWithTimeout(taskQueue.getThreadPool(), listener, timeout, generic, "test"); assertTrue(taskQueue.hasDeferredTasks()); taskQueue.advanceTime(); taskQueue.runAllRunnableTasks(); wrapped.onResponse(null); + wrapped.onFailure(new IOException("incorrect exception")); assertFalse(completed.get()); assertThat(exception.get(), instanceOf(ElasticsearchTimeoutException.class)); } - public void testFinishBeforeTimeout() { + public void testFinishNormallyBeforeTimeout() { AtomicBoolean completed = new AtomicBoolean(false); AtomicReference exception = new AtomicReference<>(); ActionListener listener = wrap(completed, exception); - ActionListener wrapped = ListenerTimeouts.wrapWithTimeout(taskQueue.getThreadPool(), listener, timeout, "test"); + ActionListener wrapped = ListenerTimeouts.wrapWithTimeout(taskQueue.getThreadPool(), listener, timeout, generic, "test"); + wrapped.onResponse(null); + wrapped.onFailure(new IOException("boom")); wrapped.onResponse(null); assertTrue(taskQueue.hasDeferredTasks()); @@ -77,15 +83,33 @@ public void testFinishBeforeTimeout() { assertNull(exception.get()); } + public void testFinishExceptionallyBeforeTimeout() { + AtomicBoolean completed = new AtomicBoolean(false); + AtomicReference exception = new AtomicReference<>(); + ActionListener listener = wrap(completed, exception); + + ActionListener wrapped = ListenerTimeouts.wrapWithTimeout(taskQueue.getThreadPool(), listener, timeout, generic, "test"); + wrapped.onFailure(new IOException("boom")); + + assertTrue(taskQueue.hasDeferredTasks()); + taskQueue.advanceTime(); + taskQueue.runAllRunnableTasks(); + + assertFalse(completed.get()); + assertThat(exception.get(), instanceOf(IOException.class)); + } + private ActionListener wrap(AtomicBoolean completed, AtomicReference exception) { return new ActionListener() { @Override public void onResponse(Void aVoid) { + assert completed.get() == false : "Should not be called twice"; completed.set(true); } @Override public void onFailure(Exception e) { + assert exception.get() == null : "Should not be called twice"; exception.set(e); } }; diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 98ae6a8c05631..e72ad501d43db 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -494,7 +494,7 @@ protected void doRun() throws Exception { error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e)); requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId); } - ), timeout, GetCcrRestoreFileChunkAction.NAME); + ), timeout, ThreadPool.Names.GENERIC, GetCcrRestoreFileChunkAction.NAME); remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request, listener); } catch (Exception e) { error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e)); From 82f00ca8b337db4bf79bfd1c1d9d8a2e9465308b Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 13 Feb 2019 08:03:39 -0700 Subject: [PATCH 4/4] Changes --- .../action/support/ListenerTimeoutsTests.java | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/support/ListenerTimeoutsTests.java b/server/src/test/java/org/elasticsearch/action/support/ListenerTimeoutsTests.java index e78221a9eda75..d5e3f0031c72f 100644 --- a/server/src/test/java/org/elasticsearch/action/support/ListenerTimeoutsTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/ListenerTimeoutsTests.java @@ -49,9 +49,9 @@ public void setUp() throws Exception { } public void testListenerTimeout() { - AtomicBoolean completed = new AtomicBoolean(false); + AtomicBoolean success = new AtomicBoolean(false); AtomicReference exception = new AtomicReference<>(); - ActionListener listener = wrap(completed, exception); + ActionListener listener = wrap(success, exception); ActionListener wrapped = ListenerTimeouts.wrapWithTimeout(taskQueue.getThreadPool(), listener, timeout, generic, "test"); assertTrue(taskQueue.hasDeferredTasks()); @@ -61,14 +61,14 @@ public void testListenerTimeout() { wrapped.onResponse(null); wrapped.onFailure(new IOException("incorrect exception")); - assertFalse(completed.get()); + assertFalse(success.get()); assertThat(exception.get(), instanceOf(ElasticsearchTimeoutException.class)); } public void testFinishNormallyBeforeTimeout() { - AtomicBoolean completed = new AtomicBoolean(false); + AtomicBoolean success = new AtomicBoolean(false); AtomicReference exception = new AtomicReference<>(); - ActionListener listener = wrap(completed, exception); + ActionListener listener = wrap(success, exception); ActionListener wrapped = ListenerTimeouts.wrapWithTimeout(taskQueue.getThreadPool(), listener, timeout, generic, "test"); wrapped.onResponse(null); @@ -79,14 +79,14 @@ public void testFinishNormallyBeforeTimeout() { taskQueue.advanceTime(); taskQueue.runAllRunnableTasks(); - assertTrue(completed.get()); + assertTrue(success.get()); assertNull(exception.get()); } public void testFinishExceptionallyBeforeTimeout() { - AtomicBoolean completed = new AtomicBoolean(false); + AtomicBoolean success = new AtomicBoolean(false); AtomicReference exception = new AtomicReference<>(); - ActionListener listener = wrap(completed, exception); + ActionListener listener = wrap(success, exception); ActionListener wrapped = ListenerTimeouts.wrapWithTimeout(taskQueue.getThreadPool(), listener, timeout, generic, "test"); wrapped.onFailure(new IOException("boom")); @@ -95,22 +95,25 @@ public void testFinishExceptionallyBeforeTimeout() { taskQueue.advanceTime(); taskQueue.runAllRunnableTasks(); - assertFalse(completed.get()); + assertFalse(success.get()); assertThat(exception.get(), instanceOf(IOException.class)); } - private ActionListener wrap(AtomicBoolean completed, AtomicReference exception) { + private ActionListener wrap(AtomicBoolean success, AtomicReference exception) { return new ActionListener() { + + private final AtomicBoolean completed = new AtomicBoolean(); + @Override public void onResponse(Void aVoid) { - assert completed.get() == false : "Should not be called twice"; - completed.set(true); + assertTrue(completed.compareAndSet(false, true)); + assertTrue(success.compareAndSet(false, true)); } @Override public void onFailure(Exception e) { - assert exception.get() == null : "Should not be called twice"; - exception.set(e); + assertTrue(completed.compareAndSet(false, true)); + assertTrue(exception.compareAndSet(null, e)); } }; }