Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
package org.elasticsearch.xpack.core.indexlifecycle;

import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.index.Index;

public abstract class AsyncWaitStep extends Step {

Expand All @@ -22,7 +22,7 @@ protected Client getClient() {
return client;
}

public abstract void evaluateCondition(Index index, Listener listener);
public abstract void evaluateCondition(IndexMetaData indexMetaData, Listener listener);

public interface Listener {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,9 @@ public List<Step> toSteps(Client client) {
steps.add(new InitializePolicyContextStep(InitializePolicyContextStep.KEY, lastStepKey));

Collections.reverse(steps);
logger.debug("STEP COUNT: " + steps.size());
logger.trace("STEP COUNT: " + steps.size());
for (Step step : steps) {
logger.debug(step.getKey() + " -> " + step.getNextStepKey());
logger.trace(step.getKey() + " -> " + step.getNextStepKey());
}

return steps;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Locale;
import java.util.Objects;

public class RolloverStep extends AsyncActionStep {
public class RolloverStep extends AsyncWaitStep {
public static final String NAME = "attempt_rollover";

private ByteSizeValue maxSize;
Expand All @@ -33,7 +35,7 @@ public RolloverStep(StepKey key, StepKey nextStepKey, Client client, ByteSizeVal
}

@Override
public void performAction(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) {
public void evaluateCondition(IndexMetaData indexMetaData, Listener listener) {
String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetaData.getSettings());

if (Strings.isNullOrEmpty(rolloverAlias)) {
Expand All @@ -54,7 +56,7 @@ public void performAction(IndexMetaData indexMetaData, ClusterState currentState
rolloverRequest.addMaxIndexDocsCondition(maxDocs);
}
getClient().admin().indices().rolloverIndex(rolloverRequest,
ActionListener.wrap(response -> listener.onResponse(response.isRolledOver()), listener::onFailure));
ActionListener.wrap(response -> listener.onResponse(response.isRolledOver(), new EmptyInfo()), listener::onFailure));
}

ByteSizeValue getMaxSize() {
Expand Down Expand Up @@ -89,4 +91,13 @@ public boolean equals(Object obj) {
Objects.equals(maxDocs, other.maxDocs);
}

// We currently have no information to provide for this AsyncWaitStep, so this is an empty object
private class EmptyInfo implements ToXContentObject {
private EmptyInfo() {}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.Index;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -38,12 +38,14 @@ public int getMaxNumSegments() {
}

@Override
public void evaluateCondition(Index index, Listener listener) {
getClient().admin().indices().segments(new IndicesSegmentsRequest(index.getName()), ActionListener.wrap(response -> {
long numberShardsLeftToMerge = StreamSupport.stream(response.getIndices().get(index.getName()).spliterator(), false)
.filter(iss -> Arrays.stream(iss.getShards()).anyMatch(p -> p.getSegments().size() > maxNumSegments)).count();
listener.onResponse(numberShardsLeftToMerge == 0, new Info(numberShardsLeftToMerge));
}, listener::onFailure));
public void evaluateCondition(IndexMetaData indexMetaData, Listener listener) {
getClient().admin().indices().segments(new IndicesSegmentsRequest(indexMetaData.getIndex().getName()),
ActionListener.wrap(response -> {
long numberShardsLeftToMerge =
StreamSupport.stream(response.getIndices().get(indexMetaData.getIndex().getName()).spliterator(), false)
.filter(iss -> Arrays.stream(iss.getShards()).anyMatch(p -> p.getSegments().size() > maxNumSegments)).count();
listener.onResponse(numberShardsLeftToMerge == 0, new Info(numberShardsLeftToMerge));
}, listener::onFailure));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.indexlifecycle.AsyncActionStep.Listener;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
import org.junit.Before;
import org.mockito.Mockito;
Expand Down Expand Up @@ -148,10 +148,10 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
}).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any());

SetOnce<Boolean> actionCompleted = new SetOnce<>();
step.performAction(indexMetaData, null, new Listener() {
step.evaluateCondition(indexMetaData, new AsyncWaitStep.Listener() {

@Override
public void onResponse(boolean complete) {
public void onResponse(boolean complete, ToXContentObject obj) {
actionCompleted.set(complete);
}

Expand Down Expand Up @@ -205,10 +205,10 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
}).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any());

SetOnce<Boolean> actionCompleted = new SetOnce<>();
step.performAction(indexMetaData, null, new Listener() {
step.evaluateCondition(indexMetaData, new AsyncWaitStep.Listener() {

@Override
public void onResponse(boolean complete) {
public void onResponse(boolean complete, ToXContentObject obj) {
actionCompleted.set(complete);
}

Expand Down Expand Up @@ -263,10 +263,10 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
}).when(indicesClient).rolloverIndex(Mockito.any(), Mockito.any());

SetOnce<Boolean> exceptionThrown = new SetOnce<>();
step.performAction(indexMetaData, null, new Listener() {
step.evaluateCondition(indexMetaData, new AsyncWaitStep.Listener() {

@Override
public void onResponse(boolean complete) {
public void onResponse(boolean complete, ToXContentObject obj) {
throw new AssertionError("Unexpected method call");
}

Expand All @@ -292,9 +292,9 @@ public void testPerformActionInvalidNullOrEmptyAlias() {
RolloverStep step = createRandomInstance();

SetOnce<Exception> exceptionThrown = new SetOnce<>();
step.performAction(indexMetaData, null, new Listener() {
step.evaluateCondition(indexMetaData, new AsyncWaitStep.Listener() {
@Override
public void onResponse(boolean complete) {
public void onResponse(boolean complete, ToXContentObject obj) {
throw new AssertionError("Unexpected method call");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.elasticsearch.xpack.core.indexlifecycle;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.segments.IndexSegments;
import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
Expand All @@ -14,6 +15,8 @@
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.Segment;
Expand Down Expand Up @@ -41,6 +44,15 @@ public SegmentCountStep createRandomInstance() {
return new SegmentCountStep(stepKey, nextStepKey, null, maxNumSegments);
}

private IndexMetaData makeMeta(Index index) {
return IndexMetaData.builder(index.getName())
.settings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.build();
}

@Override
public SegmentCountStep mutateInstance(SegmentCountStep instance) {
StepKey key = instance.getKey();
Expand Down Expand Up @@ -109,7 +121,7 @@ public void testIsConditionMet() {
SetOnce<ToXContentObject> conditionInfo = new SetOnce<>();

SegmentCountStep step = new SegmentCountStep(stepKey, nextStepKey, client, maxNumSegments);
step.evaluateCondition(index, new AsyncWaitStep.Listener() {
step.evaluateCondition(makeMeta(index), new AsyncWaitStep.Listener() {
@Override
public void onResponse(boolean conditionMet, ToXContentObject info) {
conditionMetResult.set(conditionMet);
Expand Down Expand Up @@ -166,7 +178,7 @@ public void testIsConditionFails() {
SetOnce<ToXContentObject> conditionInfo = new SetOnce<>();

SegmentCountStep step = new SegmentCountStep(stepKey, nextStepKey, client, maxNumSegments);
step.evaluateCondition(index, new AsyncWaitStep.Listener() {
step.evaluateCondition(makeMeta(index), new AsyncWaitStep.Listener() {
@Override
public void onResponse(boolean conditionMet, ToXContentObject info) {
conditionMetResult.set(conditionMet);
Expand Down Expand Up @@ -206,7 +218,7 @@ public void testThrowsException() {
SetOnce<Boolean> exceptionThrown = new SetOnce<>();

SegmentCountStep step = new SegmentCountStep(stepKey, nextStepKey, client, maxNumSegments);
step.evaluateCondition(index, new AsyncWaitStep.Listener() {
step.evaluateCondition(makeMeta(index), new AsyncWaitStep.Listener() {
@Override
public void onResponse(boolean conditionMet, ToXContentObject info) {
throw new AssertionError("unexpected method call");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateWaitStep;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleExecutionState;
import org.elasticsearch.xpack.core.indexlifecycle.Step;
import org.elasticsearch.xpack.core.indexlifecycle.TerminalPolicyStep;

import java.io.IOException;
import java.util.function.LongSupplier;
Expand All @@ -28,15 +29,18 @@ public class ExecuteStepsUpdateTask extends ClusterStateUpdateTask {
private final Index index;
private final Step startStep;
private final PolicyStepsRegistry policyStepsRegistry;
private final IndexLifecycleRunner lifecycleRunner;
private LongSupplier nowSupplier;
private Step.StepKey nextStepKey = null;

public ExecuteStepsUpdateTask(String policy, Index index, Step startStep, PolicyStepsRegistry policyStepsRegistry,
LongSupplier nowSupplier) {
IndexLifecycleRunner lifecycleRunner, LongSupplier nowSupplier) {
this.policy = policy;
this.index = index;
this.startStep = startStep;
this.policyStepsRegistry = policyStepsRegistry;
this.nowSupplier = nowSupplier;
this.lifecycleRunner = lifecycleRunner;
}

String getPolicy() {
Expand All @@ -63,7 +67,7 @@ Step getStartStep() {
* @throws IOException if any exceptions occur
*/
@Override
public ClusterState execute(ClusterState currentState) throws IOException {
public ClusterState execute(final ClusterState currentState) throws IOException {
Step currentStep = startStep;
IndexMetaData indexMetaData = currentState.metaData().index(index);
if (indexMetaData == null) {
Expand All @@ -74,22 +78,24 @@ public ClusterState execute(ClusterState currentState) throws IOException {
Step registeredCurrentStep = IndexLifecycleRunner.getCurrentStep(policyStepsRegistry, policy, indexMetaData,
LifecycleExecutionState.fromIndexMetadata(indexMetaData));
if (currentStep.equals(registeredCurrentStep)) {
ClusterState state = currentState;
// We can do cluster state steps all together until we
// either get to a step that isn't a cluster state step or a
// cluster state wait step returns not completed
while (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) {
nextStepKey = currentStep.getNextStepKey();
if (currentStep instanceof ClusterStateActionStep) {
// cluster state action step so do the action and
// move
// the cluster state to the next step
// move the cluster state to the next step
logger.trace("[{}] performing cluster state action ({}) [{}], next: [{}]",
index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), currentStep.getNextStepKey());
currentState = ((ClusterStateActionStep) currentStep).performAction(index, currentState);
state = ((ClusterStateActionStep) currentStep).performAction(index, state);
if (currentStep.getNextStepKey() == null) {
return currentState;
}
currentState = IndexLifecycleRunner.moveClusterStateToNextStep(index, currentState, currentStep.getKey(),
return state;
} else {
state = IndexLifecycleRunner.moveClusterStateToNextStep(index, state, currentStep.getKey(),
currentStep.getNextStepKey(), nowSupplier);
}
} else {
// cluster state wait step so evaluate the
// condition, if the condition is met move to the
Expand All @@ -99,29 +105,34 @@ public ClusterState execute(ClusterState currentState) throws IOException {
// condition again
logger.trace("[{}] waiting for cluster state step condition ({}) [{}], next: [{}]",
index.getName(), currentStep.getClass().getSimpleName(), currentStep.getKey(), currentStep.getNextStepKey());
ClusterStateWaitStep.Result result = ((ClusterStateWaitStep) currentStep).isConditionMet(index, currentState);
ClusterStateWaitStep.Result result = ((ClusterStateWaitStep) currentStep).isConditionMet(index, state);
if (result.isComplete()) {
if (currentStep.getNextStepKey() == null) {
return currentState;
}
currentState = IndexLifecycleRunner.moveClusterStateToNextStep(index, currentState, currentStep.getKey(),
return state;
} else {
state = IndexLifecycleRunner.moveClusterStateToNextStep(index, state, currentStep.getKey(),
currentStep.getNextStepKey(), nowSupplier);
}
} else {
logger.debug("condition not met, returning existing state");
logger.debug("[{}] condition not met ({}), returning existing state", index.getName(), currentStep.getKey());
ToXContentObject stepInfo = result.getInfomationContext();
if (stepInfo == null) {
return currentState;
return state;
} else {
return IndexLifecycleRunner.addStepInfoToClusterState(index, currentState, stepInfo);
return IndexLifecycleRunner.addStepInfoToClusterState(index, state, stepInfo);
}
}
}
// There are actions we need to take in the event a phase
// transition happens, so even if we would continue in the while
// loop, if we are about to go into a new phase, return so that
// other processing can occur
if (currentStep.getKey().getPhase().equals(currentStep.getNextStepKey().getPhase()) == false) {
return currentState;
return state;
}
currentStep = policyStepsRegistry.getStep(indexMetaData, currentStep.getNextStepKey());
}
return currentState;
return state;
} else {
// either we are no longer the master or the step is now
// not the same as when we submitted the update task. In
Expand All @@ -130,6 +141,19 @@ public ClusterState execute(ClusterState currentState) throws IOException {
}
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (oldState.equals(newState) == false) {
IndexMetaData indexMetaData = newState.metaData().index(index);
if (nextStepKey != null && nextStepKey != TerminalPolicyStep.KEY && indexMetaData != null) {
// After the cluster state has been processed and we have moved
// to a new step, we need to conditionally execute the step iff
// it is an `AsyncAction` so that it is executed exactly once.
lifecycleRunner.maybeRunAsyncAction(newState, indexMetaData, policy, nextStepKey);
}
}
}

@Override
public void onFailure(String source, Exception e) {
throw new ElasticsearchException(
Expand Down
Loading