From 1666cced8a119d002d15df31db10e46b97aed860 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Mon, 24 Sep 2018 10:33:04 -0600 Subject: [PATCH 1/6] Change step execution flow to be deliberate about type This commit changes the way that step execution flows. Rather than have any step run when the cluster state changes or the periodic scheduler fires, this now runs the different types of steps at different times. `AsyncWaitStep` is run at a periodic manner, ie, every 10 minutes by default `ClusterStateActionStep` and `ClusterStateWaitStep` are run every time the cluster state changes. `AsyncActionStep` is now run only after the cluster state has been transitioned into a new step. This prevents these non-idempotent steps from running at the same time. It addition to being run when transitioned into, this is also run when a node is newly elected master (only if set as the current step) so that master failover does not fail to run the step. This also changes the `RolloverStep` from an `AsyncActionStep` to an `AsyncWaitStep` so that it can run periodically. Relates to #29823 --- .../core/indexlifecycle/AsyncWaitStep.java | 4 +- .../core/indexlifecycle/LifecyclePolicy.java | 4 +- .../core/indexlifecycle/RolloverStep.java | 19 +- .../core/indexlifecycle/SegmentCountStep.java | 16 +- .../indexlifecycle/RolloverStepTests.java | 18 +- .../indexlifecycle/SegmentCountStepTests.java | 18 +- .../ExecuteStepsUpdateTask.java | 21 +- .../indexlifecycle/IndexLifecycleRunner.java | 183 ++++--- .../indexlifecycle/IndexLifecycleService.java | 26 +- .../MoveToNextStepUpdateTask.java | 17 +- .../indexlifecycle/PolicyStepsRegistry.java | 8 +- .../ExecuteStepsUpdateTaskTests.java | 13 +- .../IndexLifecycleRunnerTests.java | 505 ++++++++++++------ .../IndexLifecycleServiceTests.java | 2 +- .../MoveToNextStepUpdateTaskTests.java | 18 +- 15 files changed, 594 insertions(+), 278 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AsyncWaitStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AsyncWaitStep.java index 5f2e2793e0580..f6c968cfae41a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AsyncWaitStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AsyncWaitStep.java @@ -6,8 +6,8 @@ package org.elasticsearch.xpack.core.indexlifecycle; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.xcontent.ToXContentObject; -import org.elasticsearch.index.Index; public abstract class AsyncWaitStep extends Step { @@ -22,7 +22,7 @@ protected Client getClient() { return client; } - public abstract void evaluateCondition(Index index, Listener listener); + public abstract void evaluateCondition(IndexMetaData indexMetaData, Listener listener); public interface Listener { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicy.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicy.java index 1b55e82ea0057..8fd9098c3e3e0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicy.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicy.java @@ -219,9 +219,9 @@ public List toSteps(Client client) { steps.add(new InitializePolicyContextStep(InitializePolicyContextStep.KEY, lastStepKey)); Collections.reverse(steps); - logger.debug("STEP COUNT: " + steps.size()); + logger.trace("STEP COUNT: " + steps.size()); for (Step step : steps) { - logger.debug(step.getKey() + " -> " + step.getNextStepKey()); + logger.trace(step.getKey() + " -> " + step.getNextStepKey()); } return steps; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/RolloverStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/RolloverStep.java index c4aa7d079a7fe..17f9223c4bc5b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/RolloverStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/RolloverStep.java @@ -8,16 +8,18 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import java.io.IOException; import java.util.Locale; import java.util.Objects; -public class RolloverStep extends AsyncActionStep { +public class RolloverStep extends AsyncWaitStep { public static final String NAME = "attempt_rollover"; private ByteSizeValue maxSize; @@ -33,7 +35,7 @@ public RolloverStep(StepKey key, StepKey nextStepKey, Client client, ByteSizeVal } @Override - public void performAction(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) { + public void evaluateCondition(IndexMetaData indexMetaData, Listener listener) { String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetaData.getSettings()); if (Strings.isNullOrEmpty(rolloverAlias)) { @@ -54,7 +56,7 @@ public void performAction(IndexMetaData indexMetaData, ClusterState currentState rolloverRequest.addMaxIndexDocsCondition(maxDocs); } getClient().admin().indices().rolloverIndex(rolloverRequest, - ActionListener.wrap(response -> listener.onResponse(response.isRolledOver()), listener::onFailure)); + ActionListener.wrap(response -> listener.onResponse(response.isRolledOver(), new EmptyInfo()), listener::onFailure)); } ByteSizeValue getMaxSize() { @@ -89,4 +91,13 @@ public boolean equals(Object obj) { Objects.equals(maxDocs, other.maxDocs); } + // TODO: expand the information we provide? + private class EmptyInfo implements ToXContentObject { + private EmptyInfo() {} + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder; + } + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/SegmentCountStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/SegmentCountStep.java index f775022cf85f0..0d706dca10445 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/SegmentCountStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/SegmentCountStep.java @@ -8,12 +8,12 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.index.Index; import java.io.IOException; import java.util.Arrays; @@ -38,12 +38,14 @@ public int getMaxNumSegments() { } @Override - public void evaluateCondition(Index index, Listener listener) { - getClient().admin().indices().segments(new IndicesSegmentsRequest(index.getName()), ActionListener.wrap(response -> { - long numberShardsLeftToMerge = StreamSupport.stream(response.getIndices().get(index.getName()).spliterator(), false) - .filter(iss -> Arrays.stream(iss.getShards()).anyMatch(p -> p.getSegments().size() > maxNumSegments)).count(); - listener.onResponse(numberShardsLeftToMerge == 0, new Info(numberShardsLeftToMerge)); - }, listener::onFailure)); + public void evaluateCondition(IndexMetaData indexMetaData, Listener listener) { + getClient().admin().indices().segments(new IndicesSegmentsRequest(indexMetaData.getIndex().getName()), + ActionListener.wrap(response -> { + long numberShardsLeftToMerge = + StreamSupport.stream(response.getIndices().get(indexMetaData.getIndex().getName()).spliterator(), false) + .filter(iss -> Arrays.stream(iss.getShards()).anyMatch(p -> p.getSegments().size() > maxNumSegments)).count(); + listener.onResponse(numberShardsLeftToMerge == 0, new Info(numberShardsLeftToMerge)); + }, listener::onFailure)); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/RolloverStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/RolloverStepTests.java index c2445c57db24b..a35daa1a01d7c 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/RolloverStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/RolloverStepTests.java @@ -21,7 +21,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.xpack.core.indexlifecycle.AsyncActionStep.Listener; +import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey; import org.junit.Before; import org.mockito.Mockito; @@ -148,10 +148,10 @@ public Void answer(InvocationOnMock invocation) throws Throwable { }).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any()); SetOnce actionCompleted = new SetOnce<>(); - step.performAction(indexMetaData, null, new Listener() { + step.evaluateCondition(indexMetaData, new AsyncWaitStep.Listener() { @Override - public void onResponse(boolean complete) { + public void onResponse(boolean complete, ToXContentObject obj) { actionCompleted.set(complete); } @@ -205,10 +205,10 @@ public Void answer(InvocationOnMock invocation) throws Throwable { }).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any()); SetOnce actionCompleted = new SetOnce<>(); - step.performAction(indexMetaData, null, new Listener() { + step.evaluateCondition(indexMetaData, new AsyncWaitStep.Listener() { @Override - public void onResponse(boolean complete) { + public void onResponse(boolean complete, ToXContentObject obj) { actionCompleted.set(complete); } @@ -263,10 +263,10 @@ public Void answer(InvocationOnMock invocation) throws Throwable { }).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any()); SetOnce exceptionThrown = new SetOnce<>(); - step.performAction(indexMetaData, null, new Listener() { + step.evaluateCondition(indexMetaData, new AsyncWaitStep.Listener() { @Override - public void onResponse(boolean complete) { + public void onResponse(boolean complete, ToXContentObject obj) { throw new AssertionError("Unexpected method call"); } @@ -292,9 +292,9 @@ public void testPerformActionInvalidNullOrEmptyAlias() { RolloverStep step = createRandomInstance(); SetOnce exceptionThrown = new SetOnce<>(); - step.performAction(indexMetaData, null, new Listener() { + step.evaluateCondition(indexMetaData, new AsyncWaitStep.Listener() { @Override - public void onResponse(boolean complete) { + public void onResponse(boolean complete, ToXContentObject obj) { throw new AssertionError("Unexpected method call"); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/SegmentCountStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/SegmentCountStepTests.java index faa63feeed105..ae0551020fbd1 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/SegmentCountStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/SegmentCountStepTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.core.indexlifecycle; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.segments.IndexSegments; import org.elasticsearch.action.admin.indices.segments.IndexShardSegments; @@ -14,6 +15,8 @@ import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.Client; import org.elasticsearch.client.IndicesAdminClient; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.index.Index; import org.elasticsearch.index.engine.Segment; @@ -41,6 +44,15 @@ public SegmentCountStep createRandomInstance() { return new SegmentCountStep(stepKey, nextStepKey, null, maxNumSegments); } + private IndexMetaData makeMeta(Index index) { + return IndexMetaData.builder(index.getName()) + .settings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) + .build(); + } + @Override public SegmentCountStep mutateInstance(SegmentCountStep instance) { StepKey key = instance.getKey(); @@ -109,7 +121,7 @@ public void testIsConditionMet() { SetOnce conditionInfo = new SetOnce<>(); SegmentCountStep step = new SegmentCountStep(stepKey, nextStepKey, client, maxNumSegments); - step.evaluateCondition(index, new AsyncWaitStep.Listener() { + step.evaluateCondition(makeMeta(index), new AsyncWaitStep.Listener() { @Override public void onResponse(boolean conditionMet, ToXContentObject info) { conditionMetResult.set(conditionMet); @@ -166,7 +178,7 @@ public void testIsConditionFails() { SetOnce conditionInfo = new SetOnce<>(); SegmentCountStep step = new SegmentCountStep(stepKey, nextStepKey, client, maxNumSegments); - step.evaluateCondition(index, new AsyncWaitStep.Listener() { + step.evaluateCondition(makeMeta(index), new AsyncWaitStep.Listener() { @Override public void onResponse(boolean conditionMet, ToXContentObject info) { conditionMetResult.set(conditionMet); @@ -206,7 +218,7 @@ public void testThrowsException() { SetOnce exceptionThrown = new SetOnce<>(); SegmentCountStep step = new SegmentCountStep(stepKey, nextStepKey, client, maxNumSegments); - step.evaluateCondition(index, new AsyncWaitStep.Listener() { + step.evaluateCondition(makeMeta(index), new AsyncWaitStep.Listener() { @Override public void onResponse(boolean conditionMet, ToXContentObject info) { throw new AssertionError("unexpected method call"); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTask.java index a3ca355135e2c..ec5f6ea20c13b 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTask.java @@ -18,6 +18,7 @@ import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateWaitStep; import org.elasticsearch.xpack.core.indexlifecycle.LifecycleExecutionState; import org.elasticsearch.xpack.core.indexlifecycle.Step; +import org.elasticsearch.xpack.core.indexlifecycle.TerminalPolicyStep; import java.io.IOException; import java.util.function.LongSupplier; @@ -28,15 +29,18 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask { private final Index index; private final Step startStep; private final PolicyStepsRegistry policyStepsRegistry; + private final IndexLifecycleRunner lifecycleRunner; private LongSupplier nowSupplier; + private Step.StepKey nextStepKey; public ExecuteStepsUpdateTask(String policy, Index index, Step startStep, PolicyStepsRegistry policyStepsRegistry, - LongSupplier nowSupplier) { + IndexLifecycleRunner lifecycleRunner, LongSupplier nowSupplier) { this.policy = policy; this.index = index; this.startStep = startStep; this.policyStepsRegistry = policyStepsRegistry; this.nowSupplier = nowSupplier; + this.lifecycleRunner = lifecycleRunner; } String getPolicy() { @@ -88,6 +92,7 @@ public ClusterState execute(ClusterState currentState) throws IOException { if (currentStep.getNextStepKey() == null) { return currentState; } + nextStepKey = currentStep.getNextStepKey(); currentState = IndexLifecycleRunner.moveClusterStateToNextStep(index, currentState, currentStep.getKey(), currentStep.getNextStepKey(), nowSupplier); } else { @@ -104,6 +109,7 @@ public ClusterState execute(ClusterState currentState) throws IOException { if (currentStep.getNextStepKey() == null) { return currentState; } + nextStepKey = currentStep.getNextStepKey(); currentState = IndexLifecycleRunner.moveClusterStateToNextStep(index, currentState, currentStep.getKey(), currentStep.getNextStepKey(), nowSupplier); } else { @@ -130,6 +136,19 @@ public ClusterState execute(ClusterState currentState) throws IOException { } } + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (oldState.equals(newState) == false) { + IndexMetaData indexMetaData = newState.metaData().index(index); + if (nextStepKey != null && nextStepKey != TerminalPolicyStep.KEY && indexMetaData != null) { + // After the cluster state has been processed and we have moved + // to a new step, we need to conditionally execute the step iff + // it is an `AsyncAction` so that it is executed exactly once. + lifecycleRunner.maybeRunAsyncAction(newState, indexMetaData, policy, nextStepKey); + } + } + } + @Override public void onFailure(String source, Exception e) { throw new ElasticsearchException( diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunner.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunner.java index 3c868332f5f95..694e42cdba4f7 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunner.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunner.java @@ -82,99 +82,146 @@ boolean isReadyToTransitionToThisPhase(final String policy, final IndexMetaData return now >= lifecycleDate + after.getMillis(); } - public void runPolicy(String policy, IndexMetaData indexMetaData, ClusterState currentState, - boolean fromClusterStateChange) { + /** + * Run the current step, only if it is an asynchronous wait step. These + * wait criteria are checked periodically from the ILM scheduler + */ + public void runPeriodicStep(String policy, IndexMetaData indexMetaData) { Settings indexSettings = indexMetaData.getSettings(); + String index = indexMetaData.getIndex().getName(); LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(indexMetaData); if (LifecycleSettings.LIFECYCLE_SKIP_SETTING.get(indexSettings)) { - logger.info("skipping policy [" + policy + "] for index [" + indexMetaData.getIndex().getName() + "]." - + LifecycleSettings.LIFECYCLE_SKIP + "== true"); + logger.info("skipping policy [{}] for index [{}]: {} == true", policy, index, LifecycleSettings.LIFECYCLE_SKIP); return; } Step currentStep = getCurrentStep(stepRegistry, policy, indexMetaData, lifecycleState); if (currentStep == null) { - // This may happen in the case that there is invalid ilm-step index settings or the stepRegistry is out of - // sync with the current cluster state - logger.warn("current step [" + getCurrentStepKey(lifecycleState) + "] for index [" + indexMetaData.getIndex().getName() - + "] with policy [" + policy + "] is not recognized"); + logger.warn("current step [{}] for index [{}] with policy [{}] is not recognized", + getCurrentStepKey(lifecycleState), index, policy); return; } - logger.debug("running policy with current-step [" + currentStep.getKey() + "]"); + if (currentStep instanceof TerminalPolicyStep) { - logger.debug("policy [" + policy + "] for index [" + indexMetaData.getIndex().getName() + "] complete, skipping execution"); + logger.debug("policy [{}] for index [{}] complete, skipping execution", policy, index); return; } else if (currentStep instanceof ErrorStep) { - logger.debug( - "policy [" + policy + "] for index [" + indexMetaData.getIndex().getName() + "] on an error step, skipping execution"); + logger.debug("policy [{}] for index [{}] on an error step, skipping execution", policy, index); return; - } else if (currentStep instanceof PhaseCompleteStep) { + } + + // Only phase changing and async wait steps should be run through periodic polling + if (currentStep instanceof PhaseCompleteStep) { // Only proceed to the next step if enough time has elapsed to go into the next phase if (isReadyToTransitionToThisPhase(policy, indexMetaData, currentStep.getNextStepKey().getPhase())) { moveToStep(indexMetaData.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey()); } - return; - } - - if (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) { - executeClusterStateSteps(indexMetaData.getIndex(), policy, currentStep); } else if (currentStep instanceof AsyncWaitStep) { - if (fromClusterStateChange == false) { - ((AsyncWaitStep) currentStep).evaluateCondition(indexMetaData.getIndex(), new AsyncWaitStep.Listener() { - - @Override - public void onResponse(boolean conditionMet, ToXContentObject stepInfo) { - logger.debug("cs-change-async-wait-callback. current-step:" + currentStep.getKey()); - if (conditionMet) { - moveToStep(indexMetaData.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey()); - } else if (stepInfo != null) { - setStepInfo(indexMetaData.getIndex(), policy, currentStep.getKey(), stepInfo); - } + logger.debug("running periodic policy with current-step [{}]", currentStep.getKey()); + ((AsyncWaitStep) currentStep).evaluateCondition(indexMetaData, new AsyncWaitStep.Listener() { + + @Override + public void onResponse(boolean conditionMet, ToXContentObject stepInfo) { + logger.debug("cs-change-async-wait-callback, current-step: " + currentStep.getKey()); + if (conditionMet) { + moveToStep(indexMetaData.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey()); + } else if (stepInfo != null) { + setStepInfo(indexMetaData.getIndex(), policy, currentStep.getKey(), stepInfo); } + } - @Override - public void onFailure(Exception e) { - moveToErrorStep(indexMetaData.getIndex(), policy, currentStep.getKey(), e); - } + @Override + public void onFailure(Exception e) { + moveToErrorStep(indexMetaData.getIndex(), policy, currentStep.getKey(), e); + } + }); + } else { + logger.trace("ignoring non periodic step execution from step transition [{}]", currentStep.getKey()); + } + } - }); - } - } else if (currentStep instanceof AsyncActionStep) { - if (fromClusterStateChange == false) { - ((AsyncActionStep) currentStep).performAction(indexMetaData, currentState, new AsyncActionStep.Listener() { - - @Override - public void onResponse(boolean complete) { - logger.debug("cs-change-async-action-callback. current-step:" + currentStep.getKey()); - if (complete && ((AsyncActionStep) currentStep).indexSurvives()) { - moveToStep(indexMetaData.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey()); - } - } + /** + * If the current step (matching the expected step key) is an asynchronous action step, run it + */ + public void maybeRunAsyncAction(ClusterState currentState, IndexMetaData indexMetaData, String policy, StepKey expectedStepKey) { + Settings indexSettings = indexMetaData.getSettings(); + String index = indexMetaData.getIndex().getName(); + LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(indexMetaData); + if (LifecycleSettings.LIFECYCLE_SKIP_SETTING.get(indexSettings)) { + logger.info("skipping policy [{}] for index [{}]: {} == true", policy, index, LifecycleSettings.LIFECYCLE_SKIP); + return; + } + Step currentStep = getCurrentStep(stepRegistry, policy, indexMetaData, lifecycleState); + if (currentStep == null) { + logger.warn("current step [{}] for index [{}] with policy [{}] is not recognized", + getCurrentStepKey(lifecycleState), index, policy); + return; + } - @Override - public void onFailure(Exception e) { - moveToErrorStep(indexMetaData.getIndex(), policy, currentStep.getKey(), e); + if (currentStep.getKey().equals(expectedStepKey) == false) { + throw new IllegalStateException("expected index [" + indexMetaData.getIndex().getName() + "] with policy [" + policy + + "] to have current step consistent with provided step key (" + expectedStepKey + ") but it was " + currentStep.getKey()); + } + if (currentStep instanceof AsyncActionStep) { + logger.debug("running policy with async action step [{}]", currentStep.getKey()); + ((AsyncActionStep) currentStep).performAction(indexMetaData, currentState, new AsyncActionStep.Listener() { + + @Override + public void onResponse(boolean complete) { + logger.debug("cs-change-async-action-callback, current-step: [{}]", currentStep.getKey()); + if (complete && ((AsyncActionStep) currentStep).indexSurvives()) { + moveToStep(indexMetaData.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey()); } - }); - } + } + + @Override + public void onFailure(Exception e) { + moveToErrorStep(indexMetaData.getIndex(), policy, currentStep.getKey(), e); + } + }); } else { - throw new IllegalStateException( - "Step with key [" + currentStep.getKey() + "] is not a recognised type: [" + currentStep.getClass().getName() + "]"); + logger.trace("ignoring non async action step execution from step transition [{}]", currentStep.getKey()); } } - private void runPolicy(IndexMetaData indexMetaData, ClusterState currentState) { - if (indexMetaData == null) { - // This index doesn't exist any more, there's nothing to execute + /** + * Run the current step that either waits for index age, or updates/waits-on cluster state. + * Invoked after the cluster state has been changed + */ + public void runPolicyAfterStateChange(String policy, IndexMetaData indexMetaData) { + Settings indexSettings = indexMetaData.getSettings(); + String index = indexMetaData.getIndex().getName(); + LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(indexMetaData); + if (LifecycleSettings.LIFECYCLE_SKIP_SETTING.get(indexSettings)) { + logger.info("skipping policy [{}] for index [{}]: {} == true", policy, index, LifecycleSettings.LIFECYCLE_SKIP); + return; + } + Step currentStep = getCurrentStep(stepRegistry, policy, indexMetaData, lifecycleState); + if (currentStep == null) { + logger.warn("current step [{}] for index [{}] with policy [{}] is not recognized", + getCurrentStepKey(lifecycleState), index, policy); + return; + } + + if (currentStep instanceof TerminalPolicyStep) { + logger.debug("policy [{}] for index [{}] complete, skipping execution", policy, index); + return; + } else if (currentStep instanceof ErrorStep) { + logger.debug("policy [{}] for index [{}] on an error step, skipping execution", policy, index); return; } - Settings indexSettings = indexMetaData.getSettings(); - String policy = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings); - runPolicy(policy, indexMetaData, currentState, false); - } - private void executeClusterStateSteps(Index index, String policy, Step step) { - assert step instanceof ClusterStateActionStep || step instanceof ClusterStateWaitStep; - clusterService.submitStateUpdateTask("ILM", new ExecuteStepsUpdateTask(policy, index, step, stepRegistry, nowSupplier)); + if (currentStep instanceof PhaseCompleteStep) { + // Only proceed to the next step if enough time has elapsed to go into the next phase + if (isReadyToTransitionToThisPhase(policy, indexMetaData, currentStep.getNextStepKey().getPhase())) { + moveToStep(indexMetaData.getIndex(), policy, currentStep.getKey(), currentStep.getNextStepKey()); + } + } else if (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) { + logger.debug("[{}] running policy with current-step [{}]", indexMetaData.getIndex().getName(), currentStep.getKey()); + clusterService.submitStateUpdateTask("ILM", + new ExecuteStepsUpdateTask(policy, indexMetaData.getIndex(), currentStep, stepRegistry, this, nowSupplier)); + } else { + logger.trace("ignoring step execution from cluster state change event [{}]", currentStep.getKey()); + } } /** @@ -383,8 +430,14 @@ static ClusterState addStepInfoToClusterState(Index index, ClusterState clusterS private void moveToStep(Index index, String policy, StepKey currentStepKey, StepKey nextStepKey) { logger.debug("moveToStep[" + policy + "] [" + index.getName() + "]" + currentStepKey + " -> " + nextStepKey); - clusterService.submitStateUpdateTask("ILM", new MoveToNextStepUpdateTask(index, policy, currentStepKey, - nextStepKey, nowSupplier)); + clusterService.submitStateUpdateTask("ILM", + new MoveToNextStepUpdateTask(index, policy, currentStepKey, nextStepKey, nowSupplier, clusterState -> + { + IndexMetaData indexMetaData = clusterState.metaData().index(index); + if (nextStepKey != null && nextStepKey != TerminalPolicyStep.KEY && indexMetaData != null) { + maybeRunAsyncAction(clusterState, indexMetaData, policy, nextStepKey); + } + })); } private void moveToErrorStep(Index index, String policy, StepKey currentStepKey, Exception e) { diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java index a96020b99584d..21cd6ad120bcd 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java @@ -89,6 +89,26 @@ public ClusterState moveClusterStateToFailedStep(ClusterState currentState, Stri public void onMaster() { this.isMaster = true; maybeScheduleJob(); + + ClusterState clusterState = clusterService.state(); + IndexLifecycleMetadata currentMetadata = clusterState.metaData().custom(IndexLifecycleMetadata.TYPE); + if (currentMetadata != null) { + OperationMode currentMode = currentMetadata.getOperationMode(); + if (OperationMode.STOPPED.equals(currentMode) || OperationMode.STOPPING.equals(currentMode)) { + return; + } + + // If we just became master, we need to kick off any async actions that + // may have not been run due to master rollover + for (ObjectCursor cursor : clusterState.metaData().indices().values()) { + IndexMetaData idxMeta = cursor.value; + String policyName = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(idxMeta.getSettings()); + if (Strings.isNullOrEmpty(policyName) == false) { + StepKey stepKey = IndexLifecycleRunner.getCurrentStepKey(LifecycleExecutionState.fromIndexMetadata(idxMeta)); + lifecycleRunner.maybeRunAsyncAction(clusterState, idxMeta, policyName, stepKey); + } + } + } } @Override @@ -199,7 +219,11 @@ void triggerPolicies(ClusterState clusterState, boolean fromClusterStateChange) + "]. stopping Index Lifecycle execution"); continue; } - lifecycleRunner.runPolicy(policyName, idxMeta, clusterState, fromClusterStateChange); + if (fromClusterStateChange) { + lifecycleRunner.runPolicyAfterStateChange(policyName, idxMeta); + } else { + lifecycleRunner.runPeriodicStep(policyName, idxMeta); + } safeToStop = false; // proven false! } } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/MoveToNextStepUpdateTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/MoveToNextStepUpdateTask.java index f7aabce777810..750fd1af5da42 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/MoveToNextStepUpdateTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/MoveToNextStepUpdateTask.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.indexlifecycle; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; @@ -15,22 +17,27 @@ import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings; import org.elasticsearch.xpack.core.indexlifecycle.Step; +import java.util.function.Consumer; import java.util.function.LongSupplier; public class MoveToNextStepUpdateTask extends ClusterStateUpdateTask { + private static final Logger logger = LogManager.getLogger(MoveToNextStepUpdateTask.class); + private final Index index; private final String policy; private final Step.StepKey currentStepKey; private final Step.StepKey nextStepKey; private final LongSupplier nowSupplier; + private final Consumer stateChangeConsumer; public MoveToNextStepUpdateTask(Index index, String policy, Step.StepKey currentStepKey, Step.StepKey nextStepKey, - LongSupplier nowSupplier) { + LongSupplier nowSupplier, Consumer stateChangeConsumer) { this.index = index; this.policy = policy; this.currentStepKey = currentStepKey; this.nextStepKey = nextStepKey; this.nowSupplier = nowSupplier; + this.stateChangeConsumer = stateChangeConsumer; } Index getIndex() { @@ -60,6 +67,7 @@ public ClusterState execute(ClusterState currentState) { LifecycleExecutionState indexILMData = LifecycleExecutionState.fromIndexMetadata(currentState.getMetaData().index(index)); if (policy.equals(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexSettings)) && currentStepKey.equals(IndexLifecycleRunner.getCurrentStepKey(indexILMData))) { + logger.trace("moving [{}] to next step ({})", index.getName(), nextStepKey); return IndexLifecycleRunner.moveClusterStateToNextStep(index, currentState, currentStepKey, nextStepKey, nowSupplier); } else { // either the policy has changed or the step is now @@ -69,6 +77,13 @@ public ClusterState execute(ClusterState currentState) { } } + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (oldState.equals(newState) == false) { + stateChangeConsumer.accept(newState); + } + } + @Override public void onFailure(String source, Exception e) { throw new ElasticsearchException("policy [" + policy + "] for index [" + index.getName() + "] failed trying to move from step [" diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/PolicyStepsRegistry.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/PolicyStepsRegistry.java index 89b8c9b431990..4baf7022e8d7e 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/PolicyStepsRegistry.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/PolicyStepsRegistry.java @@ -140,7 +140,11 @@ public LifecyclePolicyMetadata read(StreamInput in, String key) { private List parseStepsFromPhase(String policy, String currentPhase, String phaseDef) throws IOException { final PhaseExecutionInfo phaseExecutionInfo; - LifecyclePolicy currentPolicy = lifecyclePolicyMap.get(policy).getPolicy(); + LifecyclePolicyMetadata policyMetadata = lifecyclePolicyMap.get(policy); + if (policyMetadata == null) { + throw new IllegalStateException("unable to parse steps for policy [" + policy + "] as it doesn't exist"); + } + LifecyclePolicy currentPolicy = policyMetadata.getPolicy(); final LifecyclePolicy policyToExecute; if (InitializePolicyContextStep.INITIALIZATION_PHASE.equals(phaseDef) || TerminalPolicyStep.COMPLETED_PHASE.equals(phaseDef)) { @@ -200,7 +204,7 @@ public Step getStep(final IndexMetaData indexMetaData, final Step.StepKey stepKe throw new ElasticsearchException("failed to load cached steps for " + stepKey, e); } catch (XContentParseException parseErr) { throw new XContentParseException(parseErr.getLocation(), - "failed to load cached steps for " + stepKey + " from [" + phaseJson + "]", parseErr); + "failed to load steps for " + stepKey + " from [" + phaseJson + "]", parseErr); } assert phaseSteps.stream().allMatch(step -> step.getKey().getPhase().equals(phase)) : diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTaskTests.java index a10afb631ebbb..42900236a520a 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTaskTests.java @@ -144,7 +144,7 @@ public void testNeverExecuteNonClusterStateStep() throws IOException { setStateToKey(thirdStepKey); Step startStep = policyStepsRegistry.getStep(indexMetaData, thirdStepKey); long now = randomNonNegativeLong(); - ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, () -> now); + ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, null, () -> now); assertThat(task.execute(clusterState), sameInstance(clusterState)); } @@ -152,7 +152,7 @@ public void testExecuteUntilFirstNonClusterStateStep() throws IOException { setStateToKey(secondStepKey); Step startStep = policyStepsRegistry.getStep(indexMetaData, secondStepKey); long now = randomNonNegativeLong(); - ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, () -> now); + ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, null, () -> now); ClusterState newState = task.execute(clusterState); LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(newState.getMetaData().index(index)); StepKey currentStepKey = IndexLifecycleRunner.getCurrentStepKey(lifecycleState); @@ -180,7 +180,8 @@ public void testExecuteInvalidStartStep() throws IOException { Step invalidStep = new MockClusterStateActionStep(firstStepKey, secondStepKey); long now = randomNonNegativeLong(); - ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(invalidPolicyName, index, invalidStep, policyStepsRegistry, () -> now); + ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(invalidPolicyName, index, + invalidStep, policyStepsRegistry, null, () -> now); ClusterState newState = task.execute(clusterState); assertSame(newState, clusterState); } @@ -190,7 +191,7 @@ public void testExecuteIncompleteWaitStepNoInfo() throws IOException { setStateToKey(secondStepKey); Step startStep = policyStepsRegistry.getStep(indexMetaData, secondStepKey); long now = randomNonNegativeLong(); - ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, () -> now); + ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, null, () -> now); ClusterState newState = task.execute(clusterState); LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(newState.getMetaData().index(index)); StepKey currentStepKey = IndexLifecycleRunner.getCurrentStepKey(lifecycleState); @@ -209,7 +210,7 @@ public void testExecuteIncompleteWaitStepWithInfo() throws IOException { setStateToKey(secondStepKey); Step startStep = policyStepsRegistry.getStep(indexMetaData, secondStepKey); long now = randomNonNegativeLong(); - ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, () -> now); + ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, null, () -> now); ClusterState newState = task.execute(clusterState); LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(newState.getMetaData().index(index)); StepKey currentStepKey = IndexLifecycleRunner.getCurrentStepKey(lifecycleState); @@ -225,7 +226,7 @@ public void testOnFailure() throws IOException { setStateToKey(secondStepKey); Step startStep = policyStepsRegistry.getStep(indexMetaData, secondStepKey); long now = randomNonNegativeLong(); - ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, () -> now); + ExecuteStepsUpdateTask task = new ExecuteStepsUpdateTask(mixedPolicyName, index, startStep, policyStepsRegistry, null, () -> now); Exception expectedException = new RuntimeException(); ElasticsearchException exception = expectThrows(ElasticsearchException.class, () -> task.onFailure(randomAlphaOfLength(10), expectedException)); diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunnerTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunnerTests.java index 9994b56612ce8..828d1aecab96d 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunnerTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunnerTests.java @@ -12,6 +12,8 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; @@ -24,7 +26,10 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.Index; +import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexlifecycle.AbstractStepTestCase; import org.elasticsearch.xpack.core.indexlifecycle.AsyncActionStep; import org.elasticsearch.xpack.core.indexlifecycle.AsyncWaitStep; @@ -59,7 +64,10 @@ import java.util.Objects; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; @@ -69,14 +77,46 @@ import static org.mockito.Mockito.mock; public class IndexLifecycleRunnerTests extends ESTestCase { - private static final NamedXContentRegistry REGISTRY = new NamedXContentRegistry(new IndexLifecycle(Settings.EMPTY).getNamedXContent()); + private static final NamedXContentRegistry REGISTRY; - private PolicyStepsRegistry createOneStepPolicyStepRegistry(String policyName, Step step) { + static { + List entries = new ArrayList<>(new IndexLifecycle(Settings.EMPTY).getNamedXContent()); + REGISTRY = new NamedXContentRegistry(entries); + } + + /** A real policy steps registry where getStep can be overridden so that JSON doesn't have to be parsed */ + private class MockPolicyStepsRegistry extends PolicyStepsRegistry { + private BiFunction fn = null; + + MockPolicyStepsRegistry(SortedMap lifecyclePolicyMap, Map firstStepMap, + Map> stepMap, NamedXContentRegistry xContentRegistry, Client client) { + super(lifecyclePolicyMap, firstStepMap, stepMap, xContentRegistry, client); + } + + public void setResolver(BiFunction fn) { + this.fn = fn; + } + + @Override + public Step getStep(IndexMetaData indexMetaData, StepKey stepKey) { + if (fn == null) { + logger.info("--> retrieving step {}", stepKey); + return super.getStep(indexMetaData, stepKey); + } else { + logger.info("--> returning mock step"); + return fn.apply(indexMetaData, stepKey); + } + } + } + + private MockPolicyStepsRegistry createOneStepPolicyStepRegistry(String policyName, Step step) { return createOneStepPolicyStepRegistry(policyName, step, "test"); } - private PolicyStepsRegistry createOneStepPolicyStepRegistry(String policyName, Step step, String indexName) { - SortedMap lifecyclePolicyMap = null; // Not used in this test + private MockPolicyStepsRegistry createOneStepPolicyStepRegistry(String policyName, Step step, String indexName) { + LifecyclePolicy policy = new LifecyclePolicy(policyName, new HashMap<>()); + SortedMap lifecyclePolicyMap = new TreeMap<>(); + lifecyclePolicyMap.put(policyName, new LifecyclePolicyMetadata(policy, new HashMap<>(), 1, 1)); Map firstStepMap = new HashMap<>(); firstStepMap.put(policyName, step); Map> stepMap = new HashMap<>(); @@ -88,7 +128,9 @@ private PolicyStepsRegistry createOneStepPolicyStepRegistry(String policyName, S steps.add(step); Index index = new Index(indexName, indexName + "uuid"); indexSteps.put(index, steps); - return new PolicyStepsRegistry(lifecyclePolicyMap, firstStepMap, stepMap, NamedXContentRegistry.EMPTY, null); + Client client = mock(Client.class); + Mockito.when(client.settings()).thenReturn(Settings.EMPTY); + return new MockPolicyStepsRegistry(lifecyclePolicyMap, firstStepMap, stepMap, REGISTRY, client); } public void testRunPolicyTerminalPolicyStep() { @@ -100,7 +142,7 @@ public void testRunPolicyTerminalPolicyStep() { IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); - runner.runPolicy(policyName, indexMetaData, null, false); + runner.runPolicyAfterStateChange(policyName, indexMetaData); Mockito.verifyZeroInteractions(clusterService); } @@ -120,201 +162,323 @@ public void testRunPolicyErrorStep() { .putCustom(ILM_CUSTOM_METADATA_KEY, newState.build().asMap()) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); - runner.runPolicy(policyName, indexMetaData, null, false); + runner.runPolicyAfterStateChange(policyName, indexMetaData); Mockito.verifyZeroInteractions(clusterService); } - public void testRunPolicyClusterStateActionStep() { - String policyName = "cluster_state_action_policy"; + public void testRunStateChangePolicyWithNoNextStep() throws Exception { + String policyName = "foo"; StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step"); MockClusterStateActionStep step = new MockClusterStateActionStep(stepKey, null); PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); - ClusterService clusterService = mock(ClusterService.class); + ThreadPool threadPool = new TestThreadPool("name"); + IndexMetaData indexMetaData = IndexMetaData.builder("test") + .settings(Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(LifecycleSettings.LIFECYCLE_NAME, policyName)) + .build(); + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + DiscoveryNode node = clusterService.localNode(); + IndexLifecycleMetadata ilm = new IndexLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING); + ClusterState state = ClusterState.builder(new ClusterName("cluster")) + .metaData(MetaData.builder() + .put(indexMetaData, true) + .putCustom(IndexLifecycleMetadata.TYPE, ilm)) + .nodes(DiscoveryNodes.builder() + .add(node) + .masterNodeId(node.getId()) + .localNodeId(node.getId())) + .build(); + ClusterServiceUtils.setState(clusterService, state); IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); - IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) - .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); - runner.runPolicy(policyName, indexMetaData, null, randomBoolean()); + ClusterState before = clusterService.state(); + CountDownLatch latch = new CountDownLatch(1); + step.setLatch(latch); + runner.runPolicyAfterStateChange(policyName, indexMetaData); - Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"), - Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(indexMetaData.getIndex(), policyName, step))); - Mockito.verifyNoMoreInteractions(clusterService); + latch.await(5, TimeUnit.SECONDS); + ClusterState after = clusterService.state(); + + assertEquals(before, after); + assertThat(step.getExecuteCount(), equalTo(1L)); + clusterService.close(); + threadPool.shutdownNow(); } - public void testRunPolicyClusterStateWaitStep() { - String policyName = "cluster_state_action_policy"; + public void testRunStateChangePolicyWithNextStep() throws Exception { + String policyName = "foo"; StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step"); - MockClusterStateWaitStep step = new MockClusterStateWaitStep(stepKey, null); - step.setWillComplete(true); - PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); - ClusterService clusterService = mock(ClusterService.class); + StepKey nextStepKey = new StepKey("phase", "action", "next_cluster_state_action_step"); + MockClusterStateActionStep step = new MockClusterStateActionStep(stepKey, nextStepKey); + MockClusterStateActionStep nextStep = new MockClusterStateActionStep(nextStepKey, null); + MockPolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); + stepRegistry.setResolver((i, k) -> { + if (stepKey.equals(k)) { + return step; + } else if (nextStepKey.equals(k)) { + return nextStep; + } else { + fail("should not try to retrieve different step"); + return null; + } + }); + ThreadPool threadPool = new TestThreadPool("name"); + LifecycleExecutionState les = LifecycleExecutionState.builder() + .setPhase("phase") + .setAction("action") + .setStep("cluster_state_action_step") + .build(); + IndexMetaData indexMetaData = IndexMetaData.builder("test") + .settings(Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(LifecycleSettings.LIFECYCLE_NAME, policyName)) + .putCustom(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY, les.asMap()) + .build(); + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + DiscoveryNode node = clusterService.localNode(); + IndexLifecycleMetadata ilm = new IndexLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING); + ClusterState state = ClusterState.builder(new ClusterName("cluster")) + .metaData(MetaData.builder() + .put(indexMetaData, true) + .putCustom(IndexLifecycleMetadata.TYPE, ilm)) + .nodes(DiscoveryNodes.builder() + .add(node) + .masterNodeId(node.getId()) + .localNodeId(node.getId())) + .build(); + ClusterServiceUtils.setState(clusterService, state); IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); - IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) - .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); - - runner.runPolicy(policyName, indexMetaData, null, randomBoolean()); - - Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"), - Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(indexMetaData.getIndex(), policyName, step))); - Mockito.verifyNoMoreInteractions(clusterService); - } - public void testRunPolicyAsyncActionStepCompletes() { - String policyName = "async_action_policy"; - StepKey stepKey = new StepKey("phase", "action", "async_action_step"); - MockAsyncActionStep step = new MockAsyncActionStep(stepKey, null); - step.setWillComplete(true); - PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); - ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); - IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) - .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); + ClusterState before = clusterService.state(); + CountDownLatch latch = new CountDownLatch(1); + step.setLatch(latch); + runner.runPolicyAfterStateChange(policyName, indexMetaData); - runner.runPolicy(policyName, indexMetaData, null, false); + latch.await(5, TimeUnit.SECONDS); + ClusterState after = clusterService.state(); - assertEquals(1, step.getExecuteCount()); - Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"), - Mockito.argThat(new MoveToNextStepUpdateTaskMatcher(indexMetaData.getIndex(), policyName, stepKey, null))); - Mockito.verifyNoMoreInteractions(clusterService); + assertEquals(before, after); + assertThat(step.getExecuteCount(), equalTo(1L)); + clusterService.close(); + threadPool.shutdownNow(); } - public void testRunPolicyAsyncActionStepCompletesIndexDestroyed() { - String policyName = "async_action_policy"; + public void testRunAsyncActionDoesNotRun() { + String policyName = "foo"; StepKey stepKey = new StepKey("phase", "action", "async_action_step"); MockAsyncActionStep step = new MockAsyncActionStep(stepKey, null); - step.setWillComplete(true); - step.setIndexSurvives(false); PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); - ClusterService clusterService = mock(ClusterService.class); + ThreadPool threadPool = new TestThreadPool("name"); + IndexMetaData indexMetaData = IndexMetaData.builder("test") + .settings(Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(LifecycleSettings.LIFECYCLE_NAME, policyName)) + .build(); + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + DiscoveryNode node = clusterService.localNode(); + IndexLifecycleMetadata ilm = new IndexLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING); + ClusterState state = ClusterState.builder(new ClusterName("cluster")) + .metaData(MetaData.builder() + .put(indexMetaData, true) + .putCustom(IndexLifecycleMetadata.TYPE, ilm)) + .nodes(DiscoveryNodes.builder() + .add(node) + .masterNodeId(node.getId()) + .localNodeId(node.getId())) + .build(); + ClusterServiceUtils.setState(clusterService, state); IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); - IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) - .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); - runner.runPolicy(policyName, indexMetaData, null, false); + ClusterState before = clusterService.state(); + // State changes should not run AsyncAction steps + runner.runPolicyAfterStateChange(policyName, indexMetaData); - assertEquals(1, step.getExecuteCount()); - Mockito.verifyZeroInteractions(clusterService); + ClusterState after = clusterService.state(); + + assertEquals(before, after); + assertThat(step.getExecuteCount(), equalTo(0L)); + clusterService.close(); + threadPool.shutdownNow(); } - public void testRunPolicyAsyncActionStepNotComplete() { - String policyName = "async_action_policy"; - StepKey stepKey = new StepKey("phase", "action", "async_action_step"); - MockAsyncActionStep step = new MockAsyncActionStep(stepKey, null); - step.setWillComplete(false); - PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); - ClusterService clusterService = mock(ClusterService.class); + public void testRunStateChangePolicyWithAsyncActionNextStep() throws Exception { + String policyName = "foo"; + StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step"); + StepKey nextStepKey = new StepKey("phase", "action", "async_action_step"); + MockClusterStateActionStep step = new MockClusterStateActionStep(stepKey, nextStepKey); + MockAsyncActionStep nextStep = new MockAsyncActionStep(nextStepKey, null); + MockPolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); + stepRegistry.setResolver((i, k) -> { + if (stepKey.equals(k)) { + return step; + } else if (nextStepKey.equals(k)) { + return nextStep; + } else { + fail("should not try to retrieve different step"); + return null; + } + }); + ThreadPool threadPool = new TestThreadPool("name"); + LifecycleExecutionState les = LifecycleExecutionState.builder() + .setPhase("phase") + .setAction("action") + .setStep("cluster_state_action_step") + .build(); + IndexMetaData indexMetaData = IndexMetaData.builder("test") + .settings(Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(LifecycleSettings.LIFECYCLE_NAME, policyName)) + .putCustom(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY, les.asMap()) + .build(); + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + DiscoveryNode node = clusterService.localNode(); + IndexLifecycleMetadata ilm = new IndexLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING); + ClusterState state = ClusterState.builder(new ClusterName("cluster")) + .metaData(MetaData.builder() + .put(indexMetaData, true) + .putCustom(IndexLifecycleMetadata.TYPE, ilm)) + .nodes(DiscoveryNodes.builder() + .add(node) + .masterNodeId(node.getId()) + .localNodeId(node.getId())) + .build(); + logger.info("--> state: {}", state); + ClusterServiceUtils.setState(clusterService, state); IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); - IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) - .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); - runner.runPolicy(policyName, indexMetaData, null, false); + ClusterState before = clusterService.state(); + CountDownLatch latch = new CountDownLatch(1); + step.setLatch(latch); + runner.runPolicyAfterStateChange(policyName, indexMetaData); - assertEquals(1, step.getExecuteCount()); - Mockito.verifyZeroInteractions(clusterService); - } + // Wait for the cluster state action step + latch.await(5, TimeUnit.SECONDS); - public void testRunPolicyAsyncActionStepFails() { - String policyName = "async_action_policy"; - StepKey stepKey = new StepKey("phase", "action", "async_action_step"); - MockAsyncActionStep step = new MockAsyncActionStep(stepKey, null); - Exception expectedException = new RuntimeException(); - step.setException(expectedException); - PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); - ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); - IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) - .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); + CountDownLatch asyncLatch = new CountDownLatch(1); + nextStep.setLatch(asyncLatch); - runner.runPolicy(policyName, indexMetaData, null, false); + // Wait for the async action step + asyncLatch.await(5, TimeUnit.SECONDS); + ClusterState after = clusterService.state(); - assertEquals(1, step.getExecuteCount()); - Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"), - Mockito.argThat(new MoveToErrorStepUpdateTaskMatcher(indexMetaData.getIndex(), policyName, stepKey, expectedException))); - Mockito.verifyNoMoreInteractions(clusterService); + assertNotEquals(before, after); + assertThat(step.getExecuteCount(), equalTo(1L)); + assertThat(nextStep.getExecuteCount(), equalTo(1L)); + clusterService.close(); + threadPool.shutdownNow(); } - public void testRunPolicyAsyncActionStepClusterStateChangeIgnored() { - String policyName = "async_action_policy"; - StepKey stepKey = new StepKey("phase", "action", "async_action_step"); - MockAsyncActionStep step = new MockAsyncActionStep(stepKey, null); - Exception expectedException = new RuntimeException(); - step.setException(expectedException); - PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); - ClusterService clusterService = mock(ClusterService.class); + public void testRunPeriodicStep() throws Exception { + String policyName = "foo"; + StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step"); + StepKey nextStepKey = new StepKey("phase", "action", "async_action_step"); + MockAsyncWaitStep step = new MockAsyncWaitStep(stepKey, nextStepKey); + MockAsyncWaitStep nextStep = new MockAsyncWaitStep(nextStepKey, null); + MockPolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); + stepRegistry.setResolver((i, k) -> { + if (stepKey.equals(k)) { + return step; + } else if (nextStepKey.equals(k)) { + return nextStep; + } else { + fail("should not try to retrieve different step"); + return null; + } + }); + ThreadPool threadPool = new TestThreadPool("name"); + LifecycleExecutionState les = LifecycleExecutionState.builder() + .setPhase("phase") + .setAction("action") + .setStep("cluster_state_action_step") + .build(); + IndexMetaData indexMetaData = IndexMetaData.builder("test") + .settings(Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(LifecycleSettings.LIFECYCLE_NAME, policyName)) + .putCustom(LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY, les.asMap()) + .build(); + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + DiscoveryNode node = clusterService.localNode(); + IndexLifecycleMetadata ilm = new IndexLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING); + ClusterState state = ClusterState.builder(new ClusterName("cluster")) + .metaData(MetaData.builder() + .put(indexMetaData, true) + .putCustom(IndexLifecycleMetadata.TYPE, ilm)) + .nodes(DiscoveryNodes.builder() + .add(node) + .masterNodeId(node.getId()) + .localNodeId(node.getId())) + .build(); + logger.info("--> state: {}", state); + ClusterServiceUtils.setState(clusterService, state); IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); - IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) - .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); - runner.runPolicy(policyName, indexMetaData, null, true); + ClusterState before = clusterService.state(); + CountDownLatch latch = new CountDownLatch(1); + step.setLatch(latch); + runner.runPeriodicStep(policyName, indexMetaData); + latch.await(5, TimeUnit.SECONDS); - assertEquals(0, step.getExecuteCount()); - Mockito.verifyZeroInteractions(clusterService); + ClusterState after = clusterService.state(); + + assertEquals(before, after); + assertThat(step.getExecuteCount(), equalTo(1L)); + assertThat(nextStep.getExecuteCount(), equalTo(0L)); + clusterService.close(); + threadPool.shutdownNow(); } - public void testRunPolicyAsyncWaitStepCompletes() { - String policyName = "async_wait_policy"; - StepKey stepKey = new StepKey("phase", "action", "async_wait_step"); - MockAsyncWaitStep step = new MockAsyncWaitStep(stepKey, null); - step.setWillComplete(true); + public void testRunPolicyClusterStateActionStep() { + String policyName = "cluster_state_action_policy"; + StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step"); + MockClusterStateActionStep step = new MockClusterStateActionStep(stepKey, null); PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); ClusterService clusterService = mock(ClusterService.class); IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); - runner.runPolicy(policyName, indexMetaData, null, false); + runner.runPolicyAfterStateChange(policyName, indexMetaData); - assertEquals(1, step.getExecuteCount()); Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"), - Mockito.argThat(new MoveToNextStepUpdateTaskMatcher(indexMetaData.getIndex(), policyName, stepKey, null))); + Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(indexMetaData.getIndex(), policyName, step))); Mockito.verifyNoMoreInteractions(clusterService); } - public void testRunPolicyAsyncWaitStepNotComplete() { - String policyName = "async_wait_policy"; - StepKey stepKey = new StepKey("phase", "action", "async_wait_step"); - MockAsyncWaitStep step = new MockAsyncWaitStep(stepKey, null); - RandomStepInfo stepInfo = new RandomStepInfo(() -> randomAlphaOfLength(10)); - step.expectedInfo(stepInfo); - step.setWillComplete(false); + public void testRunPolicyClusterStateWaitStep() { + String policyName = "cluster_state_action_policy"; + StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step"); + MockClusterStateWaitStep step = new MockClusterStateWaitStep(stepKey, null); + step.setWillComplete(true); PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); ClusterService clusterService = mock(ClusterService.class); IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); - runner.runPolicy(policyName, indexMetaData, null, false); + runner.runPolicyAfterStateChange(policyName, indexMetaData); - assertEquals(1, step.getExecuteCount()); Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"), - Mockito.argThat(new SetStepInfoUpdateTaskMatcher(indexMetaData.getIndex(), policyName, stepKey, stepInfo))); + Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(indexMetaData.getIndex(), policyName, step))); Mockito.verifyNoMoreInteractions(clusterService); } - public void testRunPolicyAsyncWaitStepNotCompleteNoStepInfo() { - String policyName = "async_wait_policy"; - StepKey stepKey = new StepKey("phase", "action", "async_wait_step"); - MockAsyncWaitStep step = new MockAsyncWaitStep(stepKey, null); - RandomStepInfo stepInfo = null; - step.expectedInfo(stepInfo); - step.setWillComplete(false); - PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); - ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); - IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) - .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); - - runner.runPolicy(policyName, indexMetaData, null, false); - - assertEquals(1, step.getExecuteCount()); - Mockito.verifyZeroInteractions(clusterService); - } - - public void testRunPolicyAsyncWaitStepFails() { - String policyName = "async_wait_policy"; - StepKey stepKey = new StepKey("phase", "action", "async_wait_step"); - MockAsyncWaitStep step = new MockAsyncWaitStep(stepKey, null); + public void testRunPolicyAsyncActionStepClusterStateChangeIgnored() { + String policyName = "async_action_policy"; + StepKey stepKey = new StepKey("phase", "action", "async_action_step"); + MockAsyncActionStep step = new MockAsyncActionStep(stepKey, null); Exception expectedException = new RuntimeException(); step.setException(expectedException); PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); @@ -323,12 +487,10 @@ public void testRunPolicyAsyncWaitStepFails() { IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); - runner.runPolicy(policyName, indexMetaData, null, false); + runner.runPolicyAfterStateChange(policyName, indexMetaData); - assertEquals(1, step.getExecuteCount()); - Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ILM"), - Mockito.argThat(new MoveToErrorStepUpdateTaskMatcher(indexMetaData.getIndex(), policyName, stepKey, expectedException))); - Mockito.verifyNoMoreInteractions(clusterService); + assertEquals(0, step.getExecuteCount()); + Mockito.verifyZeroInteractions(clusterService); } public void testRunPolicyAsyncWaitStepClusterStateChangeIgnored() { @@ -343,7 +505,7 @@ public void testRunPolicyAsyncWaitStepClusterStateChangeIgnored() { IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); - runner.runPolicy(policyName, indexMetaData, null, true); + runner.runPolicyAfterStateChange(policyName, indexMetaData); assertEquals(0, step.getExecuteCount()); Mockito.verifyZeroInteractions(clusterService); @@ -357,24 +519,7 @@ public void testRunPolicyWithNoStepsInRegistry() { IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); // verify that no exception is thrown - runner.runPolicy(policyName, indexMetaData, null, randomBoolean()); - Mockito.verifyZeroInteractions(clusterService); - } - - public void testRunPolicyUnknownStepType() { - String policyName = "cluster_state_action_policy"; - StepKey stepKey = new StepKey("phase", "action", "cluster_state_action_step"); - MockStep step = new MockStep(stepKey, null); - PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); - ClusterService clusterService = mock(ClusterService.class); - IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); - IndexMetaData indexMetaData = IndexMetaData.builder("my_index").settings(settings(Version.CURRENT)) - .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); - - IllegalStateException exception = expectThrows(IllegalStateException.class, - () -> runner.runPolicy(policyName, indexMetaData, null, randomBoolean())); - assertEquals("Step with key [" + stepKey + "] is not a recognised type: [" + step.getClass().getName() + "]", - exception.getMessage()); + runner.runPolicyAfterStateChange(policyName, indexMetaData); Mockito.verifyZeroInteractions(clusterService); } @@ -606,14 +751,14 @@ public void testSuccessfulValidatedMoveClusterStateToNextStep() { public void testValidatedMoveClusterStateToNextStepWithoutPolicy() { String indexName = "my_index"; - String policyName = randomBoolean() ? null : ""; + String policyName = "policy"; StepKey currentStepKey = new StepKey("current_phase", "current_action", "current_step"); StepKey nextStepKey = new StepKey("next_phase", "next_action", "next_step"); long now = randomNonNegativeLong(); Step step = new MockStep(nextStepKey, nextStepKey); PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step); - Builder indexSettingsBuilder = Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policyName); + Builder indexSettingsBuilder = Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, randomBoolean() ? "" : null); LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder(); lifecycleState.setPhase(currentStepKey.getPhase()); lifecycleState.setAction(currentStepKey.getAction()); @@ -731,7 +876,7 @@ public void testMoveClusterStateToFailedStepIndexNotFound() { () -> runner.moveClusterStateToFailedStep(clusterState, new String[] { invalidIndexName })); assertThat(exception.getMessage(), equalTo("index [" + invalidIndexName + "] does not exist")); } -// + public void testMoveClusterStateToFailedStepInvalidPolicySetting() { String indexName = "my_index"; String[] indices = new String[] { indexName }; @@ -811,7 +956,7 @@ public void testSkipped() { PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policy, step); ClusterService clusterService = mock(ClusterService.class); IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, clusterService, () -> 0L); - runner.runPolicy(policy, clusterState.metaData().index(index), clusterState, randomBoolean()); + runner.runPolicyAfterStateChange(policy, clusterState.metaData().index(index)); Mockito.verifyZeroInteractions(clusterService); } @@ -1211,6 +1356,7 @@ private static class MockAsyncActionStep extends AsyncActionStep { private boolean willComplete; private boolean indexSurvives = true; private long executeCount = 0; + private CountDownLatch latch; MockAsyncActionStep(StepKey key, StepKey nextStepKey) { super(key, nextStepKey, null); @@ -1237,9 +1383,16 @@ long getExecuteCount() { return executeCount; } + public void setLatch(CountDownLatch latch) { + this.latch = latch; + } + @Override public void performAction(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) { executeCount++; + if (latch != null) { + latch.countDown(); + } if (exception == null) { listener.onResponse(willComplete); } else { @@ -1255,6 +1408,7 @@ private static class MockAsyncWaitStep extends AsyncWaitStep { private boolean willComplete; private long executeCount = 0; private ToXContentObject expectedInfo = null; + private CountDownLatch latch; MockAsyncWaitStep(StepKey key, StepKey nextStepKey) { super(key, nextStepKey, null); @@ -1276,9 +1430,16 @@ long getExecuteCount() { return executeCount; } + public void setLatch(CountDownLatch latch) { + this.latch = latch; + } + @Override - public void evaluateCondition(Index index, Listener listener) { + public void evaluateCondition(IndexMetaData indexMetaData, Listener listener) { executeCount++; + if (latch != null) { + latch.countDown(); + } if (exception == null) { listener.onResponse(willComplete, expectedInfo); } else { @@ -1292,6 +1453,7 @@ static class MockClusterStateActionStep extends ClusterStateActionStep { private RuntimeException exception; private long executeCount = 0; + private CountDownLatch latch; MockClusterStateActionStep(StepKey key, StepKey nextStepKey) { super(key, nextStepKey); @@ -1301,6 +1463,10 @@ public void setException(RuntimeException exception) { this.exception = exception; } + public void setLatch(CountDownLatch latch) { + this.latch = latch; + } + public long getExecuteCount() { return executeCount; } @@ -1308,6 +1474,9 @@ public long getExecuteCount() { @Override public ClusterState performAction(Index index, ClusterState clusterState) { executeCount++; + if (latch != null) { + latch.countDown(); + } if (exception != null) { throw exception; } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java index a1ea3c2cd7f13..13fe9c1c69002 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java @@ -177,7 +177,7 @@ public void testRequestedStopOnShrink() { return null; }).when(clusterService).submitStateUpdateTask(anyString(), any(ExecuteStepsUpdateTask.class)); indexLifecycleService.applyClusterState(event); - indexLifecycleService.triggerPolicies(currentState, randomBoolean()); + indexLifecycleService.triggerPolicies(currentState, true); assertTrue(executedShrink.get()); } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/MoveToNextStepUpdateTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/MoveToNextStepUpdateTaskTests.java index 16c4e332177be..f166bba25c986 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/MoveToNextStepUpdateTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/MoveToNextStepUpdateTaskTests.java @@ -15,8 +15,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.core.indexlifecycle.LifecycleExecutionState; import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata; +import org.elasticsearch.xpack.core.indexlifecycle.LifecycleExecutionState; import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy; import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicyMetadata; import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicyTests; @@ -28,6 +28,7 @@ import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.xpack.core.indexlifecycle.LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY; import static org.hamcrest.Matchers.equalTo; @@ -68,7 +69,9 @@ public void testExecuteSuccessfullyMoved() { setStateToKey(currentStepKey, now); - MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, nextStepKey, () -> now); + AtomicBoolean changed = new AtomicBoolean(false); + MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, nextStepKey, + () -> now, state -> changed.set(true)); ClusterState newState = task.execute(clusterState); LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(newState.getMetaData().index(index)); StepKey actualKey = IndexLifecycleRunner.getCurrentStepKey(lifecycleState); @@ -77,6 +80,7 @@ public void testExecuteSuccessfullyMoved() { assertThat(lifecycleState.getActionTime(), equalTo(now)); assertThat(lifecycleState.getStepTime(), equalTo(now)); task.clusterStateProcessed("source", clusterState, newState); + assertTrue(changed.get()); } public void testExecuteDifferentCurrentStep() { @@ -84,7 +88,7 @@ public void testExecuteDifferentCurrentStep() { StepKey notCurrentStepKey = new StepKey("not-current", "not-current", "not-current"); long now = randomNonNegativeLong(); setStateToKey(notCurrentStepKey, now); - MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, null, () -> now); + MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, null, () -> now, null); ClusterState newState = task.execute(clusterState); assertSame(newState, clusterState); } @@ -94,7 +98,7 @@ public void testExecuteDifferentPolicy() { long now = randomNonNegativeLong(); setStateToKey(currentStepKey, now); setStatePolicy("not-" + policy); - MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, null, () -> now); + MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, null, () -> now, null); ClusterState newState = task.execute(clusterState); assertSame(newState, clusterState); } @@ -108,7 +112,8 @@ public void testExecuteSuccessfulMoveWithInvalidNextStep() { setStateToKey(currentStepKey, now); SetOnce changed = new SetOnce<>(); - MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, invalidNextStep, () -> now); + MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, + invalidNextStep, () -> now, s -> changed.set(true)); ClusterState newState = task.execute(clusterState); LifecycleExecutionState lifecycleState = LifecycleExecutionState.fromIndexMetadata(newState.getMetaData().index(index)); StepKey actualKey = IndexLifecycleRunner.getCurrentStepKey(lifecycleState); @@ -117,6 +122,7 @@ public void testExecuteSuccessfulMoveWithInvalidNextStep() { assertThat(lifecycleState.getActionTime(), equalTo(now)); assertThat(lifecycleState.getStepTime(), equalTo(now)); task.clusterStateProcessed("source", clusterState, newState); + assertTrue(changed.get()); } public void testOnFailure() { @@ -126,7 +132,7 @@ public void testOnFailure() { setStateToKey(currentStepKey, now); - MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, nextStepKey, () -> now); + MoveToNextStepUpdateTask task = new MoveToNextStepUpdateTask(index, policy, currentStepKey, nextStepKey, () -> now, state -> {}); Exception expectedException = new RuntimeException(); ElasticsearchException exception = expectThrows(ElasticsearchException.class, () -> task.onFailure(randomAlphaOfLength(10), expectedException)); From 85fd74da870e4c906a89402d871aa08497b85435 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Fri, 28 Sep 2018 11:01:34 -0600 Subject: [PATCH 2/6] Remove TODO --- .../elasticsearch/xpack/core/indexlifecycle/RolloverStep.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/RolloverStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/RolloverStep.java index 17f9223c4bc5b..f95d4ffbf61fb 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/RolloverStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/RolloverStep.java @@ -91,7 +91,7 @@ public boolean equals(Object obj) { Objects.equals(maxDocs, other.maxDocs); } - // TODO: expand the information we provide? + // We currently have no information to provide for this AsyncWaitStep, so this is an empty object private class EmptyInfo implements ToXContentObject { private EmptyInfo() {} From 13a663391d37129007453c95c00511b6eb24d680 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Fri, 28 Sep 2018 11:04:47 -0600 Subject: [PATCH 3/6] Initialize nextStepKey to null --- .../xpack/indexlifecycle/ExecuteStepsUpdateTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTask.java index ec5f6ea20c13b..c60808487f1ce 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTask.java @@ -31,7 +31,7 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask { private final PolicyStepsRegistry policyStepsRegistry; private final IndexLifecycleRunner lifecycleRunner; private LongSupplier nowSupplier; - private Step.StepKey nextStepKey; + private Step.StepKey nextStepKey = null; public ExecuteStepsUpdateTask(String policy, Index index, Step startStep, PolicyStepsRegistry policyStepsRegistry, IndexLifecycleRunner lifecycleRunner, LongSupplier nowSupplier) { From 771c5db143eee07ea5047fb03d9807aff68c7add Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Fri, 28 Sep 2018 11:09:12 -0600 Subject: [PATCH 4/6] Check for shrink method when stopping so that we can move to stopped --- .../xpack/indexlifecycle/IndexLifecycleService.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java index 21cd6ad120bcd..01f078d966bc6 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java @@ -94,7 +94,7 @@ public void onMaster() { IndexLifecycleMetadata currentMetadata = clusterState.metaData().custom(IndexLifecycleMetadata.TYPE); if (currentMetadata != null) { OperationMode currentMode = currentMetadata.getOperationMode(); - if (OperationMode.STOPPED.equals(currentMode) || OperationMode.STOPPING.equals(currentMode)) { + if (OperationMode.STOPPED.equals(currentMode)) { return; } @@ -105,6 +105,13 @@ public void onMaster() { String policyName = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(idxMeta.getSettings()); if (Strings.isNullOrEmpty(policyName) == false) { StepKey stepKey = IndexLifecycleRunner.getCurrentStepKey(LifecycleExecutionState.fromIndexMetadata(idxMeta)); + if (OperationMode.STOPPING == currentMode && + stepKey != null && + IGNORE_ACTIONS_MAINTENANCE_REQUESTED.contains(stepKey.getAction()) == false) { + logger.info("skipping policy [{}] for index [{}]. stopping Index Lifecycle execution", + policyName, idxMeta.getIndex().getName()); + continue; + } lifecycleRunner.maybeRunAsyncAction(clusterState, idxMeta, policyName, stepKey); } } From 36ee1499f1e8007c555f63f843b07af009fd2c28 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Mon, 1 Oct 2018 11:11:01 -0600 Subject: [PATCH 5/6] re-organize ExecuteStepsUpdateTask --- .../ExecuteStepsUpdateTask.java | 41 +++++++++++-------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTask.java index c60808487f1ce..bc5317b605796 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTask.java @@ -67,7 +67,7 @@ Step getStartStep() { * @throws IOException if any exceptions occur */ @Override - public ClusterState execute(ClusterState currentState) throws IOException { + public ClusterState execute(final ClusterState currentState) throws IOException { Step currentStep = startStep; IndexMetaData indexMetaData = currentState.metaData().index(index); if (indexMetaData == null) { @@ -78,23 +78,24 @@ public ClusterState execute(ClusterState currentState) throws IOException { Step registeredCurrentStep = IndexLifecycleRunner.getCurrentStep(policyStepsRegistry, policy, indexMetaData, LifecycleExecutionState.fromIndexMetadata(indexMetaData)); if (currentStep.equals(registeredCurrentStep)) { + ClusterState state = currentState; // We can do cluster state steps all together until we // either get to a step that isn't a cluster state step or a // cluster state wait step returns not completed while (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) { + nextStepKey = currentStep.getNextStepKey(); if (currentStep instanceof ClusterStateActionStep) { // cluster state action step so do the action and - // move - // the cluster state to the next step + // move the cluster state to the next step logger.trace("[{}] performing cluster state action ({}) [{}], next: [{}]", index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), currentStep.getNextStepKey()); - currentState = ((ClusterStateActionStep) currentStep).performAction(index, currentState); + state = ((ClusterStateActionStep) currentStep).performAction(index, state); if (currentStep.getNextStepKey() == null) { - return currentState; - } - nextStepKey = currentStep.getNextStepKey(); - currentState = IndexLifecycleRunner.moveClusterStateToNextStep(index, currentState, currentStep.getKey(), + return state; + } else { + state = IndexLifecycleRunner.moveClusterStateToNextStep(index, state, currentStep.getKey(), currentStep.getNextStepKey(), nowSupplier); + } } else { // cluster state wait step so evaluate the // condition, if the condition is met move to the @@ -104,30 +105,34 @@ public ClusterState execute(ClusterState currentState) throws IOException { // condition again logger.trace("[{}] waiting for cluster state step condition ({}) [{}], next: [{}]", index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), currentStep.getNextStepKey()); - ClusterStateWaitStep.Result result = ((ClusterStateWaitStep) currentStep).isConditionMet(index, currentState); + ClusterStateWaitStep.Result result = ((ClusterStateWaitStep) currentStep).isConditionMet(index, state); if (result.isComplete()) { if (currentStep.getNextStepKey() == null) { - return currentState; - } - nextStepKey = currentStep.getNextStepKey(); - currentState = IndexLifecycleRunner.moveClusterStateToNextStep(index, currentState, currentStep.getKey(), + return state; + } else { + state = IndexLifecycleRunner.moveClusterStateToNextStep(index, state, currentStep.getKey(), currentStep.getNextStepKey(), nowSupplier); + } } else { - logger.debug("condition not met, returning existing state"); + logger.debug("[{}] condition not met ({}), returning existing state", index.getName(), currentStep.getKey()); ToXContentObject stepInfo = result.getInfomationContext(); if (stepInfo == null) { - return currentState; + return state; } else { - return IndexLifecycleRunner.addStepInfoToClusterState(index, currentState, stepInfo); + return IndexLifecycleRunner.addStepInfoToClusterState(index, state, stepInfo); } } } + // There are actions we need to take in the event a phase + // transition happens, so even if we would continue in the while + // loop, if we are about to go into a new phase, return so that + // other processing can occur if (currentStep.getKey().getPhase().equals(currentStep.getNextStepKey().getPhase()) == false) { - return currentState; + return state; } currentStep = policyStepsRegistry.getStep(indexMetaData, currentStep.getNextStepKey()); } - return currentState; + return state; } else { // either we are no longer the master or the step is now // not the same as when we submitted the update task. In From 80d368bc9023948aa31c64a6cf0346ec9117e11a Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 2 Oct 2018 13:13:04 -0600 Subject: [PATCH 6/6] Add missing "stop" triggering check --- .../xpack/indexlifecycle/IndexLifecycleService.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java index 01f078d966bc6..026d1c7aeef19 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java @@ -98,6 +98,8 @@ public void onMaster() { return; } + boolean safeToStop = true; // true until proven false by a run policy + // If we just became master, we need to kick off any async actions that // may have not been run due to master rollover for (ObjectCursor cursor : clusterState.metaData().indices().values()) { @@ -113,8 +115,12 @@ public void onMaster() { continue; } lifecycleRunner.maybeRunAsyncAction(clusterState, idxMeta, policyName, stepKey); + safeToStop = false; // proven false! } } + if (safeToStop && OperationMode.STOPPING == currentMode) { + submitOperationModeUpdate(OperationMode.STOPPED); + } } }