Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
import org.elasticsearch.index.Index;

public final class InitializePolicyContextStep extends ClusterStateActionStep {
public static final StepKey KEY = new StepKey("new", "init", "init");
public static final String INITIALIZATION_PHASE = "new";
public static final StepKey KEY = new StepKey(INITIALIZATION_PHASE, "init", "init");

public InitializePolicyContextStep(Step.StepKey key, StepKey nextStepKey) {
super(key, nextStepKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public LifecyclePolicy(StreamInput in) throws IOException {
* a {@link Map} of {@link Phase}s which make up this
* {@link LifecyclePolicy}.
*/
LifecyclePolicy(LifecycleType type, String name, Map<String, Phase> phases) {
public LifecyclePolicy(LifecycleType type, String name, Map<String, Phase> phases) {
this.name = name;
this.phases = phases;
this.type = type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class LifecycleSettings {
public static final String LIFECYCLE_FAILED_STEP = "index.lifecycle.failed_step";
public static final String LIFECYCLE_STEP_INFO = "index.lifecycle.step_info";
public static final String LIFECYCLE_SKIP = "index.lifecycle.skip";
public static final String LIFECYCLE_PHASE_DEFINITION = "index.lifecycle.phase_definition";

public static final Setting<TimeValue> LIFECYCLE_POLL_INTERVAL_SETTING = Setting.positiveTimeSetting(LIFECYCLE_POLL_INTERVAL,
TimeValue.timeValueMinutes(10), Setting.Property.Dynamic, Setting.Property.NodeScope);
Expand All @@ -49,4 +50,6 @@ public class LifecycleSettings {
Setting.Property.IndexScope, Setting.Property.NotCopyableOnResize, Setting.Property.InternalIndex);
public static final Setting<Boolean> LIFECYCLE_SKIP_SETTING = Setting.boolSetting(LIFECYCLE_SKIP, false,
Setting.Property.Dynamic, Setting.Property.IndexScope);
public static final Setting<String> LIFECYCLE_PHASE_DEFINITION_SETTING = Setting.simpleString(LIFECYCLE_PHASE_DEFINITION,
Setting.Property.Dynamic, Setting.Property.IndexScope);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
package org.elasticsearch.xpack.core.indexlifecycle;

public class TerminalPolicyStep extends Step {
public static final StepKey KEY = new StepKey("completed", "completed", "completed");
public static final String COMPLETED_PHASE = "completed";
public static final StepKey KEY = new StepKey(COMPLETED_PHASE, "completed", "completed");
public static final TerminalPolicyStep INSTANCE = new TerminalPolicyStep(KEY, null);

TerminalPolicyStep(StepKey key, StepKey nextStepKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public List<Setting<?>> getSettings() {
LifecycleSettings.LIFECYCLE_STEP_INFO_SETTING,
LifecycleSettings.LIFECYCLE_FAILED_STEP_SETTING,
LifecycleSettings.LIFECYCLE_SKIP_SETTING,
LifecycleSettings.LIFECYCLE_PHASE_DEFINITION_SETTING,
RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING);
}

Expand All @@ -145,7 +146,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
return emptyList();
}
indexLifecycleInitialisationService
.set(new IndexLifecycleService(settings, client, clusterService, getClock(), System::currentTimeMillis));
.set(new IndexLifecycleService(settings, client, clusterService, getClock(), System::currentTimeMillis, xContentRegistry));
return Collections.singletonList(indexLifecycleInitialisationService.get());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateActionStep;
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateWaitStep;
import org.elasticsearch.xpack.core.indexlifecycle.ErrorStep;
import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
import org.elasticsearch.xpack.core.indexlifecycle.Phase;
import org.elasticsearch.xpack.core.indexlifecycle.RolloverAction;
import org.elasticsearch.xpack.core.indexlifecycle.Step;
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
Expand Down Expand Up @@ -206,19 +208,23 @@ static ClusterState moveClusterStateToStep(String indexName, ClusterState curren
static ClusterState moveClusterStateToNextStep(Index index, ClusterState clusterState, StepKey currentStep, StepKey nextStep,
LongSupplier nowSupplier) {
IndexMetaData idxMeta = clusterState.getMetaData().index(index);
Settings.Builder indexSettings = moveIndexSettingsToNextStep(idxMeta.getSettings(), currentStep, nextStep, nowSupplier);
IndexLifecycleMetadata ilmMeta = clusterState.metaData().custom(IndexLifecycleMetadata.TYPE);
LifecyclePolicy policy = ilmMeta.getPolicies().get(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(idxMeta.getSettings()));
Settings.Builder indexSettings = moveIndexSettingsToNextStep(policy, idxMeta.getSettings(), currentStep, nextStep, nowSupplier);
ClusterState.Builder newClusterStateBuilder = newClusterStateWithIndexSettings(index, clusterState, indexSettings);
return newClusterStateBuilder.build();
}

static ClusterState moveClusterStateToErrorStep(Index index, ClusterState clusterState, StepKey currentStep, Exception cause,
LongSupplier nowSupplier) throws IOException {
IndexMetaData idxMeta = clusterState.getMetaData().index(index);
IndexLifecycleMetadata ilmMeta = clusterState.metaData().custom(IndexLifecycleMetadata.TYPE);
LifecyclePolicy policy = ilmMeta.getPolicies().get(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(idxMeta.getSettings()));
XContentBuilder causeXContentBuilder = JsonXContent.contentBuilder();
causeXContentBuilder.startObject();
ElasticsearchException.generateThrowableXContent(causeXContentBuilder, ToXContent.EMPTY_PARAMS, cause);
causeXContentBuilder.endObject();
Settings.Builder indexSettings = moveIndexSettingsToNextStep(idxMeta.getSettings(), currentStep,
Settings.Builder indexSettings = moveIndexSettingsToNextStep(policy, idxMeta.getSettings(), currentStep,
new StepKey(currentStep.getPhase(), currentStep.getAction(), ErrorStep.NAME), nowSupplier)
.put(LifecycleSettings.LIFECYCLE_FAILED_STEP, currentStep.getName())
.put(LifecycleSettings.LIFECYCLE_STEP_INFO, BytesReference.bytes(causeXContentBuilder).utf8ToString());
Expand Down Expand Up @@ -247,8 +253,8 @@ ClusterState moveClusterStateToFailedStep(ClusterState currentState, String[] in
return newState;
}

private static Settings.Builder moveIndexSettingsToNextStep(Settings existingSettings, StepKey currentStep, StepKey nextStep,
LongSupplier nowSupplier) {
private static Settings.Builder moveIndexSettingsToNextStep(LifecyclePolicy policy, Settings existingSettings,
StepKey currentStep, StepKey nextStep, LongSupplier nowSupplier) {
long nowAsMillis = nowSupplier.getAsLong();
Settings.Builder newSettings = Settings.builder().put(existingSettings).put(LifecycleSettings.LIFECYCLE_PHASE, nextStep.getPhase())
.put(LifecycleSettings.LIFECYCLE_ACTION, nextStep.getAction()).put(LifecycleSettings.LIFECYCLE_STEP, nextStep.getName())
Expand All @@ -257,6 +263,18 @@ private static Settings.Builder moveIndexSettingsToNextStep(Settings existingSet
.put(LifecycleSettings.LIFECYCLE_FAILED_STEP, (String) null)
.put(LifecycleSettings.LIFECYCLE_STEP_INFO, (String) null);
if (currentStep.getPhase().equals(nextStep.getPhase()) == false) {
final String newPhaseDefinition;
if ("new".equals(nextStep.getPhase()) || TerminalPolicyStep.KEY.equals(nextStep)) {
newPhaseDefinition = nextStep.getPhase();
} else {
Phase nextPhase = policy.getPhases().get(nextStep.getPhase());
if (nextPhase == null) {
newPhaseDefinition = null;
} else {
newPhaseDefinition = Strings.toString(nextPhase, false, false);
}
}
newSettings.put(LifecycleSettings.LIFECYCLE_PHASE_DEFINITION, newPhaseDefinition);
newSettings.put(LifecycleSettings.LIFECYCLE_PHASE_TIME, nowAsMillis);
}
if (currentStep.getAction().equals(nextStep.getAction()) == false) {
Expand Down Expand Up @@ -356,7 +374,7 @@ private static IndexMetaData.Builder setPolicyForIndex(final String newPolicyNam
// next available step
StepKey nextValidStepKey = newPolicy.getNextValidStep(currentStepKey);
if (nextValidStepKey.equals(currentStepKey) == false) {
newSettings = moveIndexSettingsToNextStep(idxSettings, currentStepKey, nextValidStepKey, nowSupplier);
newSettings = moveIndexSettingsToNextStep(newPolicy, idxSettings, currentStepKey, nextValidStepKey, nowSupplier);
}
}
newSettings.put(LifecycleSettings.LIFECYCLE_NAME_SETTING.getKey(), newPolicyName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata;
Expand Down Expand Up @@ -57,14 +58,15 @@ public class IndexLifecycleService extends AbstractComponent
private LongSupplier nowSupplier;
private SchedulerEngine.Job scheduledJob;

public IndexLifecycleService(Settings settings, Client client, ClusterService clusterService, Clock clock, LongSupplier nowSupplier) {
public IndexLifecycleService(Settings settings, Client client, ClusterService clusterService, Clock clock, LongSupplier nowSupplier,
NamedXContentRegistry xContentRegistry) {
super(settings);
this.client = client;
this.clusterService = clusterService;
this.clock = clock;
this.nowSupplier = nowSupplier;
this.scheduledJob = null;
this.policyRegistry = new PolicyStepsRegistry();
this.policyRegistry = new PolicyStepsRegistry(xContentRegistry);
this.lifecycleRunner = new IndexLifecycleRunner(policyRegistry, clusterService, nowSupplier);
this.pollInterval = LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.get(settings);
clusterService.addStateApplier(this);
Expand Down Expand Up @@ -144,7 +146,6 @@ public void applyClusterState(ClusterChangedEvent event) {
policyRegistry.removeIndices(event.indicesDeleted());
}
if (event.state().metaData().custom(IndexLifecycleMetadata.TYPE) != null) {
// update policy steps registry
policyRegistry.update(event.state(), client, nowSupplier);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,23 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.Index;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.indexlifecycle.ErrorStep;
import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata;
import org.elasticsearch.xpack.core.indexlifecycle.InitializePolicyContextStep;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicyMetadata;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
import org.elasticsearch.xpack.core.indexlifecycle.Phase;
import org.elasticsearch.xpack.core.indexlifecycle.Step;
import org.elasticsearch.xpack.core.indexlifecycle.TerminalPolicyStep;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -44,21 +53,24 @@ public class PolicyStepsRegistry {
private final Map<String, Map<Step.StepKey, Step>> stepMap;
// A map of index to a list of compiled steps for the current phase
private final Map<Index, List<Step>> indexPhaseSteps;
private final NamedXContentRegistry xContentRegistry;

public PolicyStepsRegistry() {
public PolicyStepsRegistry(NamedXContentRegistry xContentRegistry) {
this.lifecyclePolicyMap = new TreeMap<>();
this.firstStepMap = new HashMap<>();
this.stepMap = new HashMap<>();
this.indexPhaseSteps = new HashMap<>();
this.xContentRegistry = xContentRegistry;
}

PolicyStepsRegistry(SortedMap<String, LifecyclePolicyMetadata> lifecyclePolicyMap,
Map<String, Step> firstStepMap, Map<String, Map<Step.StepKey, Step>> stepMap,
Map<Index, List<Step>> indexPhaseSteps) {
Map<Index, List<Step>> indexPhaseSteps, NamedXContentRegistry xContentRegistry) {
this.lifecyclePolicyMap = lifecyclePolicyMap;
this.firstStepMap = firstStepMap;
this.stepMap = stepMap;
this.indexPhaseSteps = indexPhaseSteps;
this.xContentRegistry = xContentRegistry;
}

SortedMap<String, LifecyclePolicyMetadata> getLifecyclePolicyMap() {
Expand Down Expand Up @@ -138,30 +150,56 @@ public LifecyclePolicyMetadata read(StreamInput in, String key) {
for (ObjectCursor<IndexMetaData> imd : clusterState.metaData().getIndices().values()) {
final Index index = imd.value.getIndex();
final String policy = imd.value.getSettings().get(LifecycleSettings.LIFECYCLE_NAME);
if (policy == null) {
if (policy == null || lifecyclePolicyMap.containsKey(policy) == false) {
indexPhaseSteps.remove(index);
} else {
final List<Step> currentSteps = indexPhaseSteps.get(index);
// Get the current steps' phase, if there are steps stored
final String existingPhase = (currentSteps == null || currentSteps.size() == 0) ?
"_none_" : currentSteps.get(0).getKey().getPhase();
// Retrieve the current phase, defaulting to "new" if no phase is set
final String currentPhase = imd.value.getSettings().get(LifecycleSettings.LIFECYCLE_PHASE, "new");
final String currentPhase = imd.value.getSettings().get(LifecycleSettings.LIFECYCLE_PHASE,
InitializePolicyContextStep.INITIALIZATION_PHASE);

if (existingPhase.equals(currentPhase) == false) {
logger.debug("index [{}] has transitioned phases [{} -> {}], rebuilding step list",
index, existingPhase, currentPhase);
// Only rebuild the index's steps if the phase of the existing steps does not match our index's current phase
final Map<Step.StepKey, Step> steps = stepMap.get(policy);

// parse existing phase steps from the phase definition in the index settings
String phaseDef = imd.value.getSettings().get(LifecycleSettings.LIFECYCLE_PHASE_DEFINITION,
InitializePolicyContextStep.INITIALIZATION_PHASE);
final Phase phase;
LifecyclePolicy currentPolicy = lifecyclePolicyMap.get(policy).getPolicy();
final LifecyclePolicy policyToExecute;
if (InitializePolicyContextStep.INITIALIZATION_PHASE.equals(phaseDef)
|| TerminalPolicyStep.COMPLETED_PHASE.equals(phaseDef)) {
// It is ok to re-use potentially modified policy here since we are in an initialization or completed phase
policyToExecute = currentPolicy;
} else {
// if the current phase definition describes an internal step/phase, do not parse
try (XContentParser parser = JsonXContent.jsonXContent.createParser(xContentRegistry,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, phaseDef)) {
phase = Phase.parse(parser, currentPhase);
} catch (IOException e) {
logger.error("failed to configure phase [" + currentPhase + "] for index [" + index.getName() + "]", e);
indexPhaseSteps.remove(index);
continue;
}
Map<String, Phase> phaseMap = new HashMap<>(currentPolicy.getPhases());
if (phase != null) {
phaseMap.put(currentPhase, phase);
}
policyToExecute = new LifecyclePolicy(currentPolicy.getType(), currentPolicy.getName(), phaseMap);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to create a fake policy here with only 1 phase in it?

Copy link
Contributor Author

@talevy talevy Aug 31, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the last step in the phase needs a correct nextstep, no? thinking this, and for the reason that step compilation only happens on the policy level, I thought it would be clear to just swap out the potential new version of the phase (or lack thereof) with the phase defined in settings.

I do think things in step compilation can be cleaned up to make this less wasteful and cleaner, but I thought it was reasonable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@talevy and I spoke about this and although this will want to change at some point we can't change to compile directly from the phase right now whilst the PhaseAfterStep still exists (which is being removed in a different task).

}
LifecyclePolicySecurityClient policyClient = new LifecyclePolicySecurityClient(client,
ClientHelper.INDEX_LIFECYCLE_ORIGIN, lifecyclePolicyMap.get(policy).getHeaders());
final List<Step> steps = policyToExecute.toSteps(policyClient, nowSupplier);
// Build a list of steps that correspond with the phase the index is currently in
final List<Step> phaseSteps;
if (steps == null) {
phaseSteps = new ArrayList<>();
} else {
phaseSteps = steps.entrySet().stream()
phaseSteps = steps.stream()
.filter(e -> e.getKey().getPhase().equals(currentPhase))
.map(Map.Entry::getValue)
.collect(Collectors.toList());
}
indexPhaseSteps.put(index, phaseSteps);
Expand Down
Loading