Skip to content

Commit 990aa34

Browse files
Prevent Duplicate ILM Cluster State Updates from Being Created (#78390)
Prevent duplicate ILM tasks from being enqueued to fix the most immediate issues around #78246. The ILM logic should be further improved though. I did not include `MoveToErrorStepUpdateTask` in this change yet as I wasn't entirely sure how valid/safe hashing/comparing arbitrary `Exception`s would be. That could be looked into in a follow-up as well. Relates #77466 Closes #78246
1 parent 0a22326 commit 990aa34

File tree

12 files changed

+175
-60
lines changed

12 files changed

+175
-60
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/BranchingStep.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@ public class BranchingStep extends ClusterStateActionStep {
2727

2828
private static final Logger logger = LogManager.getLogger(BranchingStep.class);
2929

30-
private StepKey nextStepKeyOnFalse;
31-
private StepKey nextStepKeyOnTrue;
32-
private BiPredicate<Index, ClusterState> predicate;
33-
private SetOnce<Boolean> predicateValue;
30+
private final StepKey nextStepKeyOnFalse;
31+
private final StepKey nextStepKeyOnTrue;
32+
private final BiPredicate<Index, ClusterState> predicate;
33+
private final SetOnce<Boolean> predicateValue;
3434

3535
/**
3636
* {@link BranchingStep} is a step whose next step is based on

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTask.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
import org.apache.logging.log4j.LogManager;
1111
import org.apache.logging.log4j.Logger;
12-
import org.elasticsearch.ElasticsearchException;
12+
import org.apache.logging.log4j.message.ParameterizedMessage;
1313
import org.elasticsearch.cluster.ClusterState;
1414
import org.elasticsearch.cluster.ClusterStateUpdateTask;
1515
import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -24,9 +24,10 @@
2424
import org.elasticsearch.xpack.core.ilm.TerminalPolicyStep;
2525

2626
import java.io.IOException;
27+
import java.util.Objects;
2728
import java.util.function.LongSupplier;
2829

29-
public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
30+
public class ExecuteStepsUpdateTask extends IndexLifecycleClusterStateUpdateTask {
3031
private static final Logger logger = LogManager.getLogger(ExecuteStepsUpdateTask.class);
3132
private final String policy;
3233
private final Index index;
@@ -175,7 +176,7 @@ public ClusterState execute(final ClusterState currentState) throws IOException
175176
}
176177

177178
@Override
178-
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
179+
public void onClusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
179180
if (oldState.equals(newState) == false) {
180181
IndexMetadata indexMetadata = newState.metadata().index(index);
181182
if (indexMetadata != null) {
@@ -200,15 +201,28 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
200201
}
201202

202203
@Override
203-
public void onFailure(String source, Exception e) {
204-
throw new ElasticsearchException(
205-
"policy [" + policy + "] for index [" + index.getName() + "] failed on step [" + startStep.getKey() + "].", e);
204+
public void handleFailure(String source, Exception e) {
205+
logger.warn(new ParameterizedMessage("policy [{}] for index [{}] failed on step [{}].", policy, index, startStep.getKey()), e);
206206
}
207207

208208
private ClusterState moveToErrorStep(final ClusterState state, Step.StepKey currentStepKey, Exception cause) throws IOException {
209209
this.failure = cause;
210-
logger.error("policy [{}] for index [{}] failed on cluster state step [{}]. Moving to ERROR step", policy, index.getName(),
210+
logger.warn("policy [{}] for index [{}] failed on cluster state step [{}]. Moving to ERROR step", policy, index.getName(),
211211
currentStepKey);
212212
return IndexLifecycleTransition.moveClusterStateToErrorStep(index, state, cause, nowSupplier, policyStepsRegistry::getStep);
213213
}
214+
215+
@Override
216+
public boolean equals(Object o) {
217+
if (this == o) return true;
218+
if (o == null || getClass() != o.getClass()) return false;
219+
ExecuteStepsUpdateTask that = (ExecuteStepsUpdateTask) o;
220+
return policy.equals(that.policy) && index.equals(that.index)
221+
&& Objects.equals(startStep, that.startStep);
222+
}
223+
224+
@Override
225+
public int hashCode() {
226+
return Objects.hash(policy, index, startStep);
227+
}
214228
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.ilm;
9+
10+
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.cluster.ClusterState;
12+
import org.elasticsearch.cluster.ClusterStateUpdateTask;
13+
import org.elasticsearch.common.util.concurrent.ListenableFuture;
14+
15+
/**
16+
* Base class for index lifecycle cluster state update tasks that requires implementing {@code equals} and {@code hashCode} to allow
17+
* for these tasks to be deduplicated by {@link IndexLifecycleRunner}.
18+
*/
19+
public abstract class IndexLifecycleClusterStateUpdateTask extends ClusterStateUpdateTask {
20+
21+
private final ListenableFuture<Void> listener = new ListenableFuture<>();
22+
23+
@Override
24+
public final void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
25+
listener.onResponse(null);
26+
onClusterStateProcessed(source, oldState, newState);
27+
}
28+
29+
@Override
30+
public final void onFailure(String source, Exception e) {
31+
listener.onFailure(e);
32+
handleFailure(source, e);
33+
}
34+
35+
/**
36+
* Add a listener that is resolved once this update has been processed or failed and before either the
37+
* {@link #onClusterStateProcessed(String, ClusterState, ClusterState)} or the {@link #handleFailure(String, Exception)} hooks are
38+
* executed.
39+
*/
40+
public final void addListener(ActionListener<Void> listener) {
41+
this.listener.addListener(listener);
42+
}
43+
44+
/**
45+
* This method is functionally the same as {@link ClusterStateUpdateTask#clusterStateProcessed(String, ClusterState, ClusterState)}
46+
* and implementations can override it as they would override {@code ClusterStateUpdateTask#clusterStateProcessed}.
47+
*/
48+
protected void onClusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
49+
}
50+
51+
@Override
52+
public abstract boolean equals(Object other);
53+
54+
@Override
55+
public abstract int hashCode();
56+
57+
/**
58+
* This method is functionally the same as {@link ClusterStateUpdateTask#onFailure(String, Exception)} and implementations can override
59+
* it as they would override {@code ClusterStateUpdateTask#onFailure}.
60+
*/
61+
protected abstract void handleFailure(String source, Exception e);
62+
}

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.action.ActionListener;
1313
import org.elasticsearch.cluster.ClusterState;
1414
import org.elasticsearch.cluster.ClusterStateObserver;
15+
import org.elasticsearch.cluster.ClusterStateTaskConfig;
1516
import org.elasticsearch.cluster.ClusterStateUpdateTask;
1617
import org.elasticsearch.cluster.metadata.IndexMetadata;
1718
import org.elasticsearch.cluster.metadata.Metadata;
@@ -35,7 +36,10 @@
3536
import org.elasticsearch.xpack.ilm.history.ILMHistoryItem;
3637
import org.elasticsearch.xpack.ilm.history.ILMHistoryStore;
3738

39+
import java.util.Collections;
40+
import java.util.HashSet;
3841
import java.util.Locale;
42+
import java.util.Set;
3943
import java.util.function.LongSupplier;
4044

4145
import static org.elasticsearch.xpack.core.ilm.LifecycleSettings.LIFECYCLE_ORIGINATION_DATE;
@@ -374,7 +378,7 @@ void runPolicyAfterStateChange(String policy, IndexMetadata indexMetadata) {
374378
}
375379
} else if (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) {
376380
logger.debug("[{}] running policy with current-step [{}]", indexMetadata.getIndex().getName(), currentStep.getKey());
377-
clusterService.submitStateUpdateTask(String.format(Locale.ROOT, "ilm-execute-cluster-state-steps [%s]", currentStep),
381+
submitUnlessAlreadyQueued(String.format(Locale.ROOT, "ilm-execute-cluster-state-steps [%s]", currentStep),
378382
new ExecuteStepsUpdateTask(policy, indexMetadata.getIndex(), currentStep, stepRegistry, this, nowSupplier));
379383
} else {
380384
logger.trace("[{}] ignoring step execution from cluster state change event [{}]", index, currentStep.getKey());
@@ -387,7 +391,7 @@ void runPolicyAfterStateChange(String policy, IndexMetadata indexMetadata) {
387391
*/
388392
private void moveToStep(Index index, String policy, Step.StepKey currentStepKey, Step.StepKey newStepKey) {
389393
logger.debug("[{}] moving to step [{}] {} -> {}", index.getName(), policy, currentStepKey, newStepKey);
390-
clusterService.submitStateUpdateTask(
394+
submitUnlessAlreadyQueued(
391395
String.format(Locale.ROOT, "ilm-move-to-step {policy [%s], index [%s], currentStep [%s], nextStep [%s]}", policy,
392396
index.getName(), currentStepKey, newStepKey),
393397
new MoveToNextStepUpdateTask(index, policy, currentStepKey, newStepKey, nowSupplier, stepRegistry, clusterState ->
@@ -420,7 +424,7 @@ private void moveToErrorStep(Index index, String policy, Step.StepKey currentSte
420424
* changing other execution state.
421425
*/
422426
private void setStepInfo(Index index, String policy, @Nullable Step.StepKey currentStepKey, ToXContentObject stepInfo) {
423-
clusterService.submitStateUpdateTask(
427+
submitUnlessAlreadyQueued(
424428
String.format(Locale.ROOT, "ilm-set-step-info {policy [%s], index [%s], currentStep [%s]}", policy, index.getName(),
425429
currentStepKey),
426430
new SetStepInfoUpdateTask(index, policy, currentStepKey, stepInfo));
@@ -504,4 +508,27 @@ void registerFailedOperation(IndexMetadata indexMetadata, Exception failure) {
504508
LifecycleExecutionState.fromIndexMetadata(indexMetadata),
505509
failure));
506510
}
511+
512+
private final Set<IndexLifecycleClusterStateUpdateTask> executingTasks = Collections.synchronizedSet(new HashSet<>());
513+
514+
/**
515+
* Tracks already executing {@link IndexLifecycleClusterStateUpdateTask} tasks in {@link #executingTasks} to prevent queueing up
516+
* duplicate cluster state updates.
517+
* TODO: refactor ILM logic so that this is not required any longer. It is unreasonably expensive to only filter out duplicate tasks at
518+
* this point given how these tasks are mostly set up on the cluster state applier thread.
519+
*
520+
* @param source source string as used in {@link ClusterService#submitStateUpdateTask(String, ClusterStateTaskConfig)}
521+
* @param task task to submit unless already tracked in {@link #executingTasks}.
522+
*/
523+
private void submitUnlessAlreadyQueued(String source, IndexLifecycleClusterStateUpdateTask task) {
524+
if (executingTasks.add(task)) {
525+
task.addListener(ActionListener.wrap(() -> {
526+
final boolean removed = executingTasks.remove(task);
527+
assert removed : "tried to unregister unknown task [" + task + "]";
528+
}));
529+
clusterService.submitStateUpdateTask(source, task);
530+
} else {
531+
logger.trace("skipped redundant execution of [{}]", source);
532+
}
533+
}
507534
}

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ public class IndexLifecycleService
7777
private final PolicyStepsRegistry policyRegistry;
7878
private final IndexLifecycleRunner lifecycleRunner;
7979
private final Settings settings;
80-
private ClusterService clusterService;
81-
private LongSupplier nowSupplier;
80+
private final ClusterService clusterService;
81+
private final LongSupplier nowSupplier;
8282
private SchedulerEngine.Job scheduledJob;
8383

8484
public IndexLifecycleService(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool, Clock clock,

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/MoveToNextStepUpdateTask.java

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,20 @@
88

99
import org.apache.logging.log4j.LogManager;
1010
import org.apache.logging.log4j.Logger;
11-
import org.elasticsearch.ElasticsearchException;
11+
import org.apache.logging.log4j.message.ParameterizedMessage;
1212
import org.elasticsearch.cluster.ClusterState;
13-
import org.elasticsearch.cluster.ClusterStateUpdateTask;
1413
import org.elasticsearch.cluster.metadata.IndexMetadata;
1514
import org.elasticsearch.common.settings.Settings;
1615
import org.elasticsearch.index.Index;
1716
import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState;
1817
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
1918
import org.elasticsearch.xpack.core.ilm.Step;
2019

20+
import java.util.Objects;
2121
import java.util.function.Consumer;
2222
import java.util.function.LongSupplier;
2323

24-
public class MoveToNextStepUpdateTask extends ClusterStateUpdateTask {
24+
public class MoveToNextStepUpdateTask extends IndexLifecycleClusterStateUpdateTask {
2525
private static final Logger logger = LogManager.getLogger(MoveToNextStepUpdateTask.class);
2626

2727
private final Index index;
@@ -44,22 +44,6 @@ public MoveToNextStepUpdateTask(Index index, String policy, Step.StepKey current
4444
this.stateChangeConsumer = stateChangeConsumer;
4545
}
4646

47-
Index getIndex() {
48-
return index;
49-
}
50-
51-
String getPolicy() {
52-
return policy;
53-
}
54-
55-
Step.StepKey getCurrentStepKey() {
56-
return currentStepKey;
57-
}
58-
59-
Step.StepKey getNextStepKey() {
60-
return nextStepKey;
61-
}
62-
6347
@Override
6448
public ClusterState execute(ClusterState currentState) {
6549
IndexMetadata indexMetadata = currentState.getMetadata().index(index);
@@ -82,15 +66,36 @@ public ClusterState execute(ClusterState currentState) {
8266
}
8367

8468
@Override
85-
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
69+
public void onClusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
8670
if (oldState.equals(newState) == false) {
8771
stateChangeConsumer.accept(newState);
8872
}
8973
}
9074

9175
@Override
92-
public void onFailure(String source, Exception e) {
93-
throw new ElasticsearchException("policy [" + policy + "] for index [" + index.getName() + "] failed trying to move from step ["
94-
+ currentStepKey + "] to step [" + nextStepKey + "].", e);
76+
public boolean equals(Object o) {
77+
if (this == o) return true;
78+
if (o == null || getClass() != o.getClass()) return false;
79+
MoveToNextStepUpdateTask that = (MoveToNextStepUpdateTask) o;
80+
return index.equals(that.index)
81+
&& policy.equals(that.policy)
82+
&& currentStepKey.equals(that.currentStepKey)
83+
&& nextStepKey.equals(that.nextStepKey);
84+
}
85+
86+
@Override
87+
public int hashCode() {
88+
return Objects.hash(index, policy, currentStepKey, nextStepKey);
89+
}
90+
91+
@Override
92+
public void handleFailure(String source, Exception e) {
93+
logger.warn(
94+
new ParameterizedMessage(
95+
"policy [{}] for index [{}] failed trying to move from step [{}] to step [{}].",
96+
policy, index, currentStepKey, nextStepKey
97+
),
98+
e
99+
);
95100
}
96101
}

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/SetStepInfoUpdateTask.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.apache.logging.log4j.message.ParameterizedMessage;
1313
import org.elasticsearch.ElasticsearchException;
1414
import org.elasticsearch.cluster.ClusterState;
15-
import org.elasticsearch.cluster.ClusterStateUpdateTask;
1615
import org.elasticsearch.cluster.metadata.IndexMetadata;
1716
import org.elasticsearch.common.settings.Settings;
1817
import org.elasticsearch.common.xcontent.ToXContentObject;
@@ -25,7 +24,7 @@
2524
import java.io.IOException;
2625
import java.util.Objects;
2726

28-
public class SetStepInfoUpdateTask extends ClusterStateUpdateTask {
27+
public class SetStepInfoUpdateTask extends IndexLifecycleClusterStateUpdateTask {
2928

3029
private static final Logger logger = LogManager.getLogger(SetStepInfoUpdateTask.class);
3130

@@ -78,9 +77,28 @@ public ClusterState execute(ClusterState currentState) throws IOException {
7877
}
7978

8079
@Override
81-
public void onFailure(String source, Exception e) {
82-
logger.warn(new ParameterizedMessage("policy [{}] for index [{}] failed trying to set step info for step [{}].",
83-
policy, index.getName(), currentStepKey), e);
80+
public void handleFailure(String source, Exception e) {
81+
logger.warn(
82+
new ParameterizedMessage(
83+
"policy [{}] for index [{}] failed trying to set step info for step [{}].",
84+
policy, index, currentStepKey
85+
),
86+
e
87+
);
88+
}
89+
90+
@Override
91+
public boolean equals(Object o) {
92+
if (this == o) return true;
93+
if (o == null || getClass() != o.getClass()) return false;
94+
SetStepInfoUpdateTask that = (SetStepInfoUpdateTask) o;
95+
return index.equals(that.index) && policy.equals(that.policy)
96+
&& currentStepKey.equals(that.currentStepKey) && Objects.equals(stepInfo, that.stepInfo);
97+
}
98+
99+
@Override
100+
public int hashCode() {
101+
return Objects.hash(index, policy, currentStepKey, stepInfo);
84102
}
85103

86104
public static class ExceptionWrapper implements ToXContentObject {

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848

4949
public class SnapshotLifecycleTask implements SchedulerEngine.Listener {
5050

51-
private static Logger logger = LogManager.getLogger(SnapshotLifecycleTask.class);
51+
private static final Logger logger = LogManager.getLogger(SnapshotLifecycleTask.class);
5252

5353
private final Client client;
5454
private final ClusterService clusterService;

x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTaskTests.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
package org.elasticsearch.xpack.ilm;
99

10-
import org.elasticsearch.ElasticsearchException;
1110
import org.elasticsearch.Version;
1211
import org.elasticsearch.client.Client;
1312
import org.elasticsearch.cluster.ClusterName;
@@ -250,11 +249,7 @@ public void testOnFailure() throws IOException {
250249
long now = randomNonNegativeLong();
251250
ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, null, () -> now);
252251
Exception expectedException = new RuntimeException();
253-
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
254-
() -> task.onFailure(randomAlphaOfLength(10), expectedException));
255-
assertEquals("policy [" + mixedPolicyName + "] for index [" + index.getName() + "] failed on step [" + startStep.getKey() + "].",
256-
exception.getMessage());
257-
assertSame(expectedException, exception.getCause());
252+
task.onFailure(randomAlphaOfLength(10), expectedException);
258253
}
259254

260255
public void testClusterActionStepThrowsException() throws IOException {

0 commit comments

Comments
 (0)