Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,26 +77,7 @@ public void testFullPolicy() throws Exception {
.put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, "alias"));

// create policy
Map<String, LifecycleAction> warmActions = new HashMap<>();
warmActions.put(ForceMergeAction.NAME, new ForceMergeAction(1));
warmActions.put(AllocateAction.NAME, new AllocateAction(1, singletonMap("_name", "node-1,node-2"), null, null));
warmActions.put(ShrinkAction.NAME, new ShrinkAction(1));
Map<String, Phase> phases = new HashMap<>();
phases.put("hot", new Phase("hot", TimeValue.ZERO, singletonMap(RolloverAction.NAME,
new RolloverAction(null, null, 1L))));
phases.put("warm", new Phase("warm", TimeValue.ZERO, warmActions));
phases.put("cold", new Phase("cold", TimeValue.ZERO, singletonMap(AllocateAction.NAME,
new AllocateAction(0, singletonMap("_name", "node-3"), null, null))));
phases.put("delete", new Phase("delete", TimeValue.ZERO, singletonMap(DeleteAction.NAME, new DeleteAction())));
LifecyclePolicy lifecyclePolicy = new LifecyclePolicy(policy, phases);
// PUT policy
XContentBuilder builder = jsonBuilder();
lifecyclePolicy.toXContent(builder, null);
final StringEntity entity = new StringEntity(
"{ \"policy\":" + Strings.toString(builder) + "}", ContentType.APPLICATION_JSON);
Request request = new Request("PUT", "_ilm/" + policy);
request.setEntity(entity);
assertOK(client().performRequest(request));
createFullPolicy(TimeValue.ZERO);
// update policy on index
updatePolicy(originalIndex, policy);
// index document {"foo": "bar"} to trigger rollover
Expand All @@ -106,6 +87,74 @@ public void testFullPolicy() throws Exception {
assertBusy(() -> assertFalse(indexExists(originalIndex)));
}

public void testMoveToAllocateStep() throws Exception {
String originalIndex = index + "-000001";
createIndexWithSettings(originalIndex, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 4)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.routing.allocation.include._name", "node-0")
.put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, "alias"));

// create policy
createFullPolicy(TimeValue.timeValueHours(10));
// update policy on index
updatePolicy(originalIndex, policy);

// move to a step
Request moveToStepRequest = new Request("POST", "_ilm/move/" + originalIndex);
assertBusy(() -> assertTrue(getStepKeyForIndex(originalIndex).equals(new StepKey("new", "complete", "complete"))));
moveToStepRequest.setJsonEntity("{\n" +
" \"current_step\": {\n" +
" \"phase\": \"new\",\n" +
" \"action\": \"complete\",\n" +
" \"name\": \"complete\"\n" +
" },\n" +
" \"next_step\": {\n" +
" \"phase\": \"cold\",\n" +
" \"action\": \"allocate\",\n" +
" \"name\": \"allocate\"\n" +
" }\n" +
"}");
client().performRequest(moveToStepRequest);
assertBusy(() -> assertFalse(indexExists(originalIndex)));
}


public void testMoveToRolloverStep() throws Exception {
String originalIndex = index + "-000001";
String shrunkenOriginalIndex = ShrinkAction.SHRUNKEN_INDEX_PREFIX + originalIndex;
String secondIndex = index + "-000002";
createIndexWithSettings(originalIndex, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 4)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.routing.allocation.include._name", "node-0")
.put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, "alias"));

createFullPolicy(TimeValue.timeValueHours(10));
// update policy on index
updatePolicy(originalIndex, policy);

// move to a step
Request moveToStepRequest = new Request("POST", "_ilm/move/" + originalIndex);
// index document to trigger rollover
index(client(), originalIndex, "_id", "foo", "bar");
logger.info(getStepKeyForIndex(originalIndex));
moveToStepRequest.setJsonEntity("{\n" +
" \"current_step\": {\n" +
" \"phase\": \"new\",\n" +
" \"action\": \"complete\",\n" +
" \"name\": \"complete\"\n" +
" },\n" +
" \"next_step\": {\n" +
" \"phase\": \"hot\",\n" +
" \"action\": \"rollover\",\n" +
" \"name\": \"attempt_rollover\"\n" +
" }\n" +
"}");
client().performRequest(moveToStepRequest);
assertBusy(() -> assertTrue(indexExists(secondIndex)));
assertBusy(() -> assertFalse(indexExists(shrunkenOriginalIndex)));
assertBusy(() -> assertFalse(indexExists(originalIndex)));
}

public void testRolloverAction() throws Exception {
String originalIndex = index + "-000001";
String secondIndex = index + "-000002";
Expand Down Expand Up @@ -296,6 +345,29 @@ public void testNonexistentPolicy() throws Exception {

}

private void createFullPolicy(TimeValue hotTime) throws IOException {
Map<String, LifecycleAction> warmActions = new HashMap<>();
warmActions.put(ForceMergeAction.NAME, new ForceMergeAction(1));
warmActions.put(AllocateAction.NAME, new AllocateAction(1, singletonMap("_name", "node-1,node-2"), null, null));
warmActions.put(ShrinkAction.NAME, new ShrinkAction(1));
Map<String, Phase> phases = new HashMap<>();
phases.put("hot", new Phase("hot", hotTime, singletonMap(RolloverAction.NAME,
new RolloverAction(null, null, 1L))));
phases.put("warm", new Phase("warm", TimeValue.ZERO, warmActions));
phases.put("cold", new Phase("cold", TimeValue.ZERO, singletonMap(AllocateAction.NAME,
new AllocateAction(0, singletonMap("_name", "node-3"), null, null))));
phases.put("delete", new Phase("delete", TimeValue.ZERO, singletonMap(DeleteAction.NAME, new DeleteAction())));
LifecyclePolicy lifecyclePolicy = new LifecyclePolicy(policy, phases);
// PUT policy
XContentBuilder builder = jsonBuilder();
lifecyclePolicy.toXContent(builder, null);
final StringEntity entity = new StringEntity(
"{ \"policy\":" + Strings.toString(builder) + "}", ContentType.APPLICATION_JSON);
Request request = new Request("PUT", "_ilm/" + policy);
request.setEntity(entity);
assertOK(client().performRequest(request));
}

private void createNewSingletonPolicy(String phaseName, LifecycleAction action) throws IOException {
createNewSingletonPolicy(phaseName, action, TimeValue.ZERO);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleExecutionState;
import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleExecutionState;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
import org.elasticsearch.xpack.core.indexlifecycle.OperationMode;
Expand Down Expand Up @@ -76,6 +76,11 @@ public IndexLifecycleService(Settings settings, Client client, ClusterService cl
this::updatePollInterval);
}

public void maybeRunAsyncAction(ClusterState clusterState, IndexMetaData indexMetaData, StepKey nextStepKey) {
String policyName = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexMetaData.getSettings());
lifecycleRunner.maybeRunAsyncAction(clusterState, indexMetaData, policyName, nextStepKey);
}

public ClusterState moveClusterStateToStep(ClusterState currentState, String indexName, StepKey currentStepKey, StepKey nextStepKey) {
return IndexLifecycleRunner.moveClusterStateToStep(indexName, currentState, currentStepKey, nextStepKey,
nowSupplier, policyRegistry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,18 @@ public ClusterState execute(ClusterState currentState) {
request.getNextStepKey());
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
IndexMetaData newIndexMetaData = newState.metaData().index(indexMetaData.getIndex());
if (newIndexMetaData == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this occur due to batching of updates?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After talking with @DaveCTurner it seems like we won't have any batching here because batching occurs within the same instance of ClusterStateTaskExecutor and we don't implement batching ourselves here. However I think this check is still nice to have in case there are other factors at play.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be assert newIndexMetaData != null?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like David's suggestion here, as it suggests this should never happen more strongly than an if

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An assert is less strong though? because the check will not be done in production code, only in tests.

If we can envisage any scenarios where the newState passed to this method can be different to the state we returned in execute() then I think this whould stay as an if statement so we don't end up in a situation where we have a NPE thrown here because the index was deleted. IF we are confident that this kind of scenario should never occur and assert is fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right. I guess I don't see this hurting, so I won't block it, but it may be misleading to people new to the code to walk through state in which this may be possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add something like assert false : "there should be no opportunity for the index to be deleted" inside the if - that way we can catch it in testing while still handling it in production if there's a case we missed. Does that sound reasonable, or is it too messy?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should leave this with the if statement so we are protected against NPEs. If we also want to add an assert to catch things in tests then that fine but I think the protection against a NPE in production should remain

// The index has somehow been deleted - there shouldn't be any opportunity for this to happen, but just in case.
logger.debug("index [" + indexMetaData.getIndex() + "] has been deleted after moving to step [" +
request.getNextStepKey() + "], skipping async action check");
return;
}
indexLifecycleService.maybeRunAsyncAction(newState, newIndexMetaData, request.getNextStepKey());
}

@Override
protected Response newResponse(boolean acknowledged) {
return new Response(acknowledged);
Expand Down