From 513443d6abe1db4783412485bfe072e17c3e1d93 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 28 Oct 2020 14:26:53 +0100 Subject: [PATCH] Simplify ClusterStateUpdateTask Timeout Handling (#64117) It's confusing and slightly error prone (see #64116) to handle the timeouts via overrides but the priority via a field. This simplifies the code to to avoid future issues and save over 100 LOC. Also this fixes a bug in `TransportVotingConfigExclusionsAction` where trying to instantiate a time value with a negative time could throw and unexpected exception and as a result leak a listener. --- .../cluster/service/ClusterServiceIT.java | 55 ++++--------------- ...portClearVotingConfigExclusionsAction.java | 9 +-- .../health/TransportClusterHealthAction.java | 7 +-- .../cluster/AckedClusterStateUpdateTask.java | 9 +-- .../cluster/ClusterStateUpdateTask.java | 18 +++++- .../metadata/MetadataIndexStateService.java | 15 +---- .../MetadataIndexTemplateService.java | 43 +++------------ .../blobstore/BlobStoreRepository.java | 8 +-- .../snapshots/RestoreService.java | 9 +-- .../snapshots/SnapshotsService.java | 30 ++-------- .../cluster/service/MasterServiceTests.java | 55 ++++++++----------- .../BlockMasterServiceOnMaster.java | 7 +-- .../DeleteDataStreamTransportAction.java | 8 +-- .../xpack/ilm/IndexLifecycleRunner.java | 15 ++--- .../xpack/ilm/IndexLifecycleRunnerTests.java | 1 + 15 files changed, 81 insertions(+), 208 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java index 0a5f71427dac1..1d1f58d97ac1c 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java @@ -46,6 +46,8 @@ @ClusterScope(scope = Scope.TEST, numDataNodes = 0) public class ClusterServiceIT extends ESIntegTestCase { + private static final TimeValue TEN_SECONDS = TimeValue.timeValueSeconds(10L); + public void testAckedUpdateTask() throws Exception { internalCluster().startNode(); ClusterService clusterService = internalCluster().getInstance(ClusterService.class); @@ -56,7 +58,8 @@ public void testAckedUpdateTask() throws Exception { final AtomicBoolean executed = new AtomicBoolean(false); final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch processedLatch = new CountDownLatch(1); - clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask(null, null) { + clusterService.submitStateUpdateTask("test", + new AckedClusterStateUpdateTask(MasterServiceTests.ackedRequest(TEN_SECONDS, TEN_SECONDS), null) { @Override protected Void newResponse(boolean acknowledged) { return null; @@ -79,16 +82,6 @@ public void onAckTimeout() { latch.countDown(); } - @Override - public TimeValue ackTimeout() { - return TimeValue.timeValueSeconds(10); - } - - @Override - public TimeValue timeout() { - return TimeValue.timeValueSeconds(10); - } - @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { processedLatch.countDown(); @@ -129,7 +122,8 @@ public void testAckedUpdateTaskSameClusterState() throws Exception { final AtomicBoolean executed = new AtomicBoolean(false); final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch processedLatch = new CountDownLatch(1); - clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask(null, null) { + clusterService.submitStateUpdateTask("test", + new AckedClusterStateUpdateTask(MasterServiceTests.ackedRequest(TEN_SECONDS, TEN_SECONDS), null) { @Override protected Void newResponse(boolean acknowledged) { return null; @@ -147,16 +141,6 @@ public void onAckTimeout() { latch.countDown(); } - @Override - public TimeValue ackTimeout() { - return TimeValue.timeValueSeconds(10); - } - - @Override - public TimeValue timeout() { - return TimeValue.timeValueSeconds(10); - } - @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { processedLatch.countDown(); @@ -196,7 +180,9 @@ public void testAckedUpdateTaskNoAckExpected() throws Exception { final AtomicBoolean onFailure = new AtomicBoolean(false); final AtomicBoolean executed = new AtomicBoolean(false); final CountDownLatch latch = new CountDownLatch(1); - clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask(null, null) { + + clusterService.submitStateUpdateTask( + "test", new AckedClusterStateUpdateTask(MasterServiceTests.ackedRequest(TEN_SECONDS, TEN_SECONDS), null) { @Override protected Void newResponse(boolean acknowledged) { return null; @@ -219,16 +205,6 @@ public void onAckTimeout() { latch.countDown(); } - @Override - public TimeValue ackTimeout() { - return TimeValue.timeValueSeconds(10); - } - - @Override - public TimeValue timeout() { - return TimeValue.timeValueSeconds(10); - } - @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { } @@ -266,7 +242,8 @@ public void testAckedUpdateTaskTimeoutZero() throws Exception { final AtomicBoolean executed = new AtomicBoolean(false); final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch processedLatch = new CountDownLatch(1); - clusterService.submitStateUpdateTask("test", new AckedClusterStateUpdateTask(null, null) { + clusterService.submitStateUpdateTask("test", + new AckedClusterStateUpdateTask(MasterServiceTests.ackedRequest(TimeValue.ZERO, TEN_SECONDS), null) { @Override protected Void newResponse(boolean acknowledged) { return null; @@ -289,16 +266,6 @@ public void onAckTimeout() { latch.countDown(); } - @Override - public TimeValue ackTimeout() { - return TimeValue.timeValueSeconds(0); - } - - @Override - public TimeValue timeout() { - return TimeValue.timeValueSeconds(10); - } - @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { processedLatch.countDown(); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingConfigExclusionsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingConfigExclusionsAction.java index a7de5387560a9..d5d4cd4434a3e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingConfigExclusionsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/configuration/TransportClearVotingConfigExclusionsAction.java @@ -104,7 +104,9 @@ public void onTimeout(TimeValue timeout) { private void submitClearVotingConfigExclusionsTask(ClearVotingConfigExclusionsRequest request, long startTimeMillis, ActionListener listener) { - clusterService.submitStateUpdateTask("clear-voting-config-exclusions", new ClusterStateUpdateTask(Priority.URGENT) { + clusterService.submitStateUpdateTask("clear-voting-config-exclusions", new ClusterStateUpdateTask(Priority.URGENT, + TimeValue.timeValueMillis( + Math.max(0, request.getTimeout().millis() + startTimeMillis - threadPool.relativeTimeInMillis()))) { @Override public ClusterState execute(ClusterState currentState) { final CoordinationMetadata newCoordinationMetadata = @@ -119,11 +121,6 @@ public void onFailure(String source, Exception e) { listener.onFailure(e); } - @Override - public TimeValue timeout() { - return TimeValue.timeValueMillis(request.getTimeout().millis() + startTimeMillis - threadPool.relativeTimeInMillis()); - } - @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { listener.onResponse(new ClearVotingConfigExclusionsResponse()); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java index 94b8e7be81c05..fe2274e7b51a2 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java @@ -127,17 +127,12 @@ public void onFailure(String source, Exception e) { } else { final TimeValue taskTimeout = TimeValue.timeValueMillis(Math.max(0, endTimeRelativeMillis - threadPool.relativeTimeInMillis())); clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])", - new ClusterStateUpdateTask(request.waitForEvents()) { + new ClusterStateUpdateTask(request.waitForEvents(), taskTimeout) { @Override public ClusterState execute(ClusterState currentState) { return currentState; } - @Override - public TimeValue timeout() { - return taskTimeout; - } - @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { final long timeoutInMillis = Math.max(0, endTimeRelativeMillis - threadPool.relativeTimeInMillis()); diff --git a/server/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java b/server/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java index 8d61fe964265d..cceaf5709a04f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java +++ b/server/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java @@ -39,7 +39,7 @@ protected AckedClusterStateUpdateTask(AckedRequest request, ActionListener listener) { - super(priority); + super(priority, request.masterNodeTimeout()); this.listener = listener; this.request = request; } @@ -82,12 +82,7 @@ public void onFailure(String source, Exception e) { /** * Acknowledgement timeout, maximum time interval to wait for acknowledgements */ - public TimeValue ackTimeout() { + public final TimeValue ackTimeout() { return request.ackTimeout(); } - - @Override - public TimeValue timeout() { - return request.masterNodeTimeout(); - } } diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java b/server/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java index 7a8afcdae38ac..4b2bfdc26cbc9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterStateUpdateTask.java @@ -33,12 +33,24 @@ public abstract class ClusterStateUpdateTask private final Priority priority; + @Nullable + private final TimeValue timeout; + public ClusterStateUpdateTask() { this(Priority.NORMAL); } public ClusterStateUpdateTask(Priority priority) { + this(priority, null); + } + + public ClusterStateUpdateTask(TimeValue timeout) { + this(Priority.NORMAL, timeout); + } + + public ClusterStateUpdateTask(Priority priority, TimeValue timeout) { this.priority = priority; + this.timeout = timeout; } @Override @@ -75,12 +87,12 @@ public final void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) * {@link ClusterStateTaskListener#onFailure(String, Exception)}. May return null to indicate no timeout is needed (default). */ @Nullable - public TimeValue timeout() { - return null; + public final TimeValue timeout() { + return timeout; } @Override - public Priority priority() { + public final Priority priority() { return priority; } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java index 4693a86251cd6..5ab4472b7749c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java @@ -63,7 +63,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.CountDown; @@ -160,7 +159,7 @@ public void closeIndices(final CloseIndexClusterStateUpdateRequest request, fina } clusterService.submitStateUpdateTask("add-block-index-to-close " + Arrays.toString(concreteIndices), - new ClusterStateUpdateTask(Priority.URGENT) { + new ClusterStateUpdateTask(Priority.URGENT, request.masterNodeTimeout()) { private final Map blockedIndices = new HashMap<>(); @@ -235,11 +234,6 @@ public void clusterStateProcessed(final String source, public void onFailure(final String source, final Exception e) { listener.onFailure(e); } - - @Override - public TimeValue timeout() { - return request.masterNodeTimeout(); - } } ); } @@ -411,7 +405,7 @@ public void addIndexBlock(AddIndexBlockClusterStateUpdateRequest request, } clusterService.submitStateUpdateTask("add-index-block-[" + request.getBlock().name + "]-" + Arrays.toString(concreteIndices), - new ClusterStateUpdateTask(Priority.URGENT) { + new ClusterStateUpdateTask(Priority.URGENT, request.masterNodeTimeout()) { private Map blockedIndices; @@ -473,11 +467,6 @@ public void clusterStateProcessed(final String source, public void onFailure(final String source, final Exception e) { listener.onFailure(e); } - - @Override - public TimeValue timeout() { - return request.masterNodeTimeout(); - } } ); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java index e29cdd5b3246a..c0fef633387b5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java @@ -120,12 +120,8 @@ public MetadataIndexTemplateService(ClusterService clusterService, } public void removeTemplates(final RemoveRequest request, final RemoveListener listener) { - clusterService.submitStateUpdateTask("remove-index-template [" + request.name + "]", new ClusterStateUpdateTask(Priority.URGENT) { - - @Override - public TimeValue timeout() { - return request.masterTimeout; - } + clusterService.submitStateUpdateTask( + "remove-index-template [" + request.name + "]", new ClusterStateUpdateTask(Priority.URGENT, request.masterTimeout) { @Override public void onFailure(String source, Exception e) { @@ -171,12 +167,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS public void putComponentTemplate(final String cause, final boolean create, final String name, final TimeValue masterTimeout, final ComponentTemplate template, final ActionListener listener) { clusterService.submitStateUpdateTask("create-component-template [" + name + "], cause [" + cause + "]", - new ClusterStateUpdateTask(Priority.URGENT) { - - @Override - public TimeValue timeout() { - return masterTimeout; - } + new ClusterStateUpdateTask(Priority.URGENT, masterTimeout) { @Override public void onFailure(String source, Exception e) { @@ -305,12 +296,7 @@ public void removeComponentTemplate(final String name, final TimeValue masterTim final ActionListener listener) { validateNotInUse(clusterService.state().metadata(), name); clusterService.submitStateUpdateTask("remove-component-template [" + name + "]", - new ClusterStateUpdateTask(Priority.URGENT) { - - @Override - public TimeValue timeout() { - return masterTimeout; - } + new ClusterStateUpdateTask(Priority.URGENT, masterTimeout) { @Override public void onFailure(String source, Exception e) { @@ -384,12 +370,7 @@ public void putIndexTemplateV2(final String cause, final boolean create, final S final ComposableIndexTemplate template, final ActionListener listener) { validateV2TemplateRequest(clusterService.state().metadata(), name, template); clusterService.submitStateUpdateTask("create-index-template-v2 [" + name + "], cause [" + cause + "]", - new ClusterStateUpdateTask(Priority.URGENT) { - - @Override - public TimeValue timeout() { - return masterTimeout; - } + new ClusterStateUpdateTask(Priority.URGENT, masterTimeout) { @Override public void onFailure(String source, Exception e) { @@ -639,12 +620,7 @@ static Map> findConflictingV2Templates(final ClusterState s public void removeIndexTemplateV2(final String name, final TimeValue masterTimeout, final ActionListener listener) { clusterService.submitStateUpdateTask("remove-index-template-v2 [" + name + "]", - new ClusterStateUpdateTask(Priority.URGENT) { - - @Override - public TimeValue timeout() { - return masterTimeout; - } + new ClusterStateUpdateTask(Priority.URGENT, masterTimeout) { @Override public void onFailure(String source, Exception e) { @@ -734,12 +710,7 @@ public void putTemplate(final PutRequest request, final PutListener listener) { final IndexTemplateMetadata.Builder templateBuilder = IndexTemplateMetadata.builder(request.name); clusterService.submitStateUpdateTask("create-index-template [" + request.name + "], cause [" + request.cause + "]", - new ClusterStateUpdateTask(Priority.URGENT) { - - @Override - public TimeValue timeout() { - return request.masterTimeout; - } + new ClusterStateUpdateTask(Priority.URGENT, request.masterTimeout) { @Override public void onFailure(String source, Exception e) { diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index ded0819ac8ea2..3bf748eb1dcea 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -77,7 +77,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; @@ -361,7 +360,7 @@ public void executeConsistentStateUpdate(Function { final ClusterStateUpdateTask updateTask = createUpdateTask.apply(repositoryData); - clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask(updateTask.priority()) { + clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask(updateTask.priority(), updateTask.timeout()) { private boolean executedTask = false; @@ -397,11 +396,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS executeConsistentStateUpdate(createUpdateTask, source, onFailure); } } - - @Override - public TimeValue timeout() { - return updateTask.timeout(); - } }); }, onFailure)); } diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index bed5c19e676d0..7e9817f2798dd 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -67,7 +67,6 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.IndexShard; @@ -263,7 +262,8 @@ public void restoreSnapshot(final RestoreSnapshotRequest request, final ActionLi // Now we can start the actual restore process by adding shards to be recovered in the cluster state // and updating cluster metadata (global and index) as needed - clusterService.submitStateUpdateTask("restore_snapshot[" + snapshotName + ']', new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask( + "restore_snapshot[" + snapshotName + ']', new ClusterStateUpdateTask(request.masterNodeTimeout()) { final String restoreUUID = UUIDs.randomBase64UUID(); RestoreInfo restoreInfo = null; @@ -597,11 +597,6 @@ public void onFailure(String source, Exception e) { listener.onFailure(e); } - @Override - public TimeValue timeout() { - return request.masterNodeTimeout(); - } - @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { listener.onResponse(new RestoreCompletionResponse(restoreUUID, snapshot, restoreInfo)); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index a8ca8cba31fe7..e4505c081d33f 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -75,7 +75,6 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; @@ -243,7 +242,8 @@ public void createSnapshotLegacy(final CreateSnapshotRequest request, final Acti final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot Repository repository = repositoriesService.repository(request.repository()); final Map userMeta = repository.adaptUserMetadata(request.userMetadata()); - clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', + new ClusterStateUpdateTask(request.masterNodeTimeout()) { private List indices; @@ -324,11 +324,6 @@ public void onFailure(final Exception e) { }); } } - - @Override - public TimeValue timeout() { - return request.masterNodeTimeout(); - } }); } @@ -368,7 +363,7 @@ public void createSnapshot(final CreateSnapshotRequest request, final ActionList } final Snapshot snapshot = new Snapshot(repositoryName, snapshotId); final Map userMeta = repository.adaptUserMetadata(request.userMetadata()); - repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask() { + repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(request.masterNodeTimeout()) { private SnapshotsInProgress.Entry newEntry; @@ -451,11 +446,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl } } } - - @Override - public TimeValue timeout() { - return request.masterNodeTimeout(); - } }, "create_snapshot [" + snapshotName + ']', listener::onFailure); } @@ -491,7 +481,7 @@ public void cloneSnapshot(CloneSnapshotRequest request, ActionListener lis final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); final Snapshot snapshot = new Snapshot(repositoryName, snapshotId); initializingClones.add(snapshot); - repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask() { + repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(request.masterNodeTimeout()) { private SnapshotsInProgress.Entry newEntry; @@ -553,11 +543,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl addListener(snapshot, ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure)); startCloning(repository, newEntry); } - - @Override - public TimeValue timeout() { - return request.masterNodeTimeout(); - } }, "clone_snapshot [" + request.source() + "][" + snapshotName + ']', listener::onFailure); } @@ -1831,7 +1816,7 @@ public void deleteSnapshots(final DeleteSnapshotRequest request, final ActionLis Strings.arrayToCommaDelimitedString(snapshotNames), repoName)); final Repository repository = repositoriesService.repository(repoName); - repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(Priority.NORMAL) { + repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(request.masterNodeTimeout()) { private Snapshot runningSnapshot; @@ -1960,11 +1945,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } )); } - - @Override - public TimeValue timeout() { - return request.masterNodeTimeout(); - } }, "delete snapshot", listener::onFailure); } diff --git a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java index 5cf09466c30db..f8e73cbcc2adc 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.LocalClusterUpdateTask; +import org.elasticsearch.cluster.ack.AckedRequest; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.coordination.ClusterStatePublisher; import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; @@ -189,7 +190,7 @@ public void testThreadContext() throws InterruptedException { final TimeValue ackTimeout = randomBoolean() ? TimeValue.ZERO : TimeValue.timeValueMillis(randomInt(10000)); final TimeValue masterTimeout = randomBoolean() ? TimeValue.ZERO : TimeValue.timeValueMillis(randomInt(10000)); - master.submitStateUpdateTask("test", new AckedClusterStateUpdateTask(null, null) { + master.submitStateUpdateTask("test", new AckedClusterStateUpdateTask(ackedRequest(ackTimeout, masterTimeout), null) { @Override public ClusterState execute(ClusterState currentState) { assertTrue(threadPool.getThreadContext().isSystemContext()); @@ -227,15 +228,6 @@ protected Void newResponse(boolean acknowledged) { return null; } - public TimeValue ackTimeout() { - return ackTimeout; - } - - @Override - public TimeValue timeout() { - return masterTimeout; - } - @Override public void onAllNodesAcked(@Nullable Exception e) { assertFalse(threadPool.getThreadContext().isSystemContext()); @@ -927,22 +919,13 @@ public void testAcking() throws InterruptedException { publisherRef.set((clusterChangedEvent, publishListener, ackListener) -> publishListener.onFailure(new FailedToCommitClusterStateException("mock exception"))); - masterService.submitStateUpdateTask("test2", new AckedClusterStateUpdateTask(null, null) { + masterService.submitStateUpdateTask("test2", new AckedClusterStateUpdateTask( + ackedRequest(TimeValue.ZERO, null), null) { @Override public ClusterState execute(ClusterState currentState) { return ClusterState.builder(currentState).build(); } - @Override - public TimeValue ackTimeout() { - return TimeValue.ZERO; - } - - @Override - public TimeValue timeout() { - return null; - } - @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { fail(); @@ -982,22 +965,13 @@ public void onAckTimeout() { ackListener.onNodeAck(node3, null); }); - masterService.submitStateUpdateTask("test2", new AckedClusterStateUpdateTask(null, null) { + masterService.submitStateUpdateTask( + "test2", new AckedClusterStateUpdateTask(ackedRequest(ackTimeout, null), null) { @Override public ClusterState execute(ClusterState currentState) { return ClusterState.builder(currentState).build(); } - @Override - public TimeValue ackTimeout() { - return ackTimeout; - } - - @Override - public TimeValue timeout() { - return null; - } - @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { latch.countDown(); @@ -1032,4 +1006,21 @@ public static ClusterState discoveryState(MasterService masterService) { return masterService.state(); } + /** + * Returns a plain {@link AckedRequest} that does not implement any functionality outside of the timeout getters. + */ + public static AckedRequest ackedRequest(TimeValue ackTimeout, TimeValue masterNodeTimeout) { + return new AckedRequest() { + @Override + public TimeValue ackTimeout() { + return ackTimeout; + } + + @Override + public TimeValue masterNodeTimeout() { + return masterNodeTimeout; + } + }; + } + } diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/BlockMasterServiceOnMaster.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/BlockMasterServiceOnMaster.java index 0547ce70f2f91..aaa1b977ebe2b 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/BlockMasterServiceOnMaster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/BlockMasterServiceOnMaster.java @@ -55,11 +55,8 @@ public void startDisrupting() { boolean success = disruptionLatch.compareAndSet(null, new CountDownLatch(1)); assert success : "startDisrupting called without waiting on stopDisrupting to complete"; final CountDownLatch started = new CountDownLatch(1); - clusterService.getMasterService().submitStateUpdateTask("service_disruption_block", new ClusterStateUpdateTask() { - @Override - public Priority priority() { - return Priority.IMMEDIATE; - } + clusterService.getMasterService().submitStateUpdateTask( + "service_disruption_block", new ClusterStateUpdateTask(Priority.IMMEDIATE) { @Override public ClusterState execute(ClusterState currentState) throws Exception { diff --git a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/DeleteDataStreamTransportAction.java b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/DeleteDataStreamTransportAction.java index f56021b6960e3..8712dcba193f7 100644 --- a/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/DeleteDataStreamTransportAction.java +++ b/x-pack/plugin/data-streams/src/main/java/org/elasticsearch/xpack/datastreams/action/DeleteDataStreamTransportAction.java @@ -25,7 +25,6 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.snapshots.SnapshotInProgressException; import org.elasticsearch.snapshots.SnapshotsService; @@ -73,12 +72,7 @@ protected void masterOperation( ) throws Exception { clusterService.submitStateUpdateTask( "remove-data-stream [" + Strings.arrayToCommaDelimitedString(request.getNames()) + "]", - new ClusterStateUpdateTask(Priority.HIGH) { - - @Override - public TimeValue timeout() { - return request.masterNodeTimeout(); - } + new ClusterStateUpdateTask(Priority.HIGH, request.masterNodeTimeout()) { @Override public void onFailure(String source, Exception e) { diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java index f78cf467ed78e..8989bdf783e44 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java @@ -213,18 +213,13 @@ private void onErrorMaybeRetryFailedStep(String policy, IndexMetadata indexMetad int currentRetryAttempt = lifecycleState.getFailedStepRetryCount() == null ? 1 : 1 + lifecycleState.getFailedStepRetryCount(); logger.info("policy [{}] for index [{}] on an error step due to a transient error, moving back to the failed " + "step [{}] for execution. retry attempt [{}]", policy, index, lifecycleState.getFailedStep(), currentRetryAttempt); + // we can afford to drop these requests if they timeout as on the next {@link + // IndexLifecycleRunner#runPeriodicStep} run the policy will still be in the ERROR step, as we haven't been able + // to move it back into the failed step, so we'll try again clusterService.submitStateUpdateTask( String.format(Locale.ROOT, "ilm-retry-failed-step {policy [%s], index [%s], failedStep [%s]}", policy, index, - failedStep.getKey()), - new ClusterStateUpdateTask() { - - @Override - public TimeValue timeout() { - // we can afford to drop these requests if they timeout as on the next {@link - // IndexLifecycleRunner#runPeriodicStep} run the policy will still be in the ERROR step, as we haven't been able - // to move it back into the failed step, so we'll try again - return LifecycleSettings.LIFECYCLE_STEP_MASTER_TIMEOUT_SETTING.get(clusterService.state().metadata().settings()); - } + failedStep.getKey()), new ClusterStateUpdateTask( + LifecycleSettings.LIFECYCLE_STEP_MASTER_TIMEOUT_SETTING.get(clusterService.state().metadata().settings())) { @Override public ClusterState execute(ClusterState currentState) { diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java index f0bffcb8cb715..f0514c43ce3d8 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java @@ -218,6 +218,7 @@ public void testRunPolicyErrorStepOnRetryableFailedStep() { PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, waitForRolloverStep); ClusterService clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L); LifecycleExecutionState.Builder newState = LifecycleExecutionState.builder(); newState.setFailedStep(stepKey.getName());