Skip to content

Commit 485be0c

Browse files
committed
add new phase definition setting used for retrieving phase to execute
Since policies can be updated independent of execution plans for the current phase being executed, it would be nice to know what the phase that is executing looks like in JSON. This PR does just that, while also using that index setting to recontruct the phase steps to execute (for consistency)
1 parent 4a958ee commit 485be0c

File tree

15 files changed

+223
-70
lines changed

15 files changed

+223
-70
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/InitializePolicyContextStep.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
import org.elasticsearch.index.Index;
1313

1414
public final class InitializePolicyContextStep extends ClusterStateActionStep {
15-
public static final StepKey KEY = new StepKey("new", "init", "init");
15+
public static final String INITIALIZATION_PHASE = "new";
16+
public static final StepKey KEY = new StepKey(INITIALIZATION_PHASE, "init", "init");
1617

1718
public InitializePolicyContextStep(Step.StepKey key, StepKey nextStepKey) {
1819
super(key, nextStepKey);

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public LifecyclePolicy(StreamInput in) throws IOException {
9090
* a {@link Map} of {@link Phase}s which make up this
9191
* {@link LifecyclePolicy}.
9292
*/
93-
LifecyclePolicy(LifecycleType type, String name, Map<String, Phase> phases) {
93+
public LifecyclePolicy(LifecycleType type, String name, Map<String, Phase> phases) {
9494
this.name = name;
9595
this.phases = phases;
9696
this.type = type;

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecycleSettings.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public class LifecycleSettings {
2424
public static final String LIFECYCLE_FAILED_STEP = "index.lifecycle.failed_step";
2525
public static final String LIFECYCLE_STEP_INFO = "index.lifecycle.step_info";
2626
public static final String LIFECYCLE_SKIP = "index.lifecycle.skip";
27+
public static final String LIFECYCLE_PHASE_DEFINITION = "index.lifecycle.phase_definition";
2728

2829
public static final Setting<TimeValue> LIFECYCLE_POLL_INTERVAL_SETTING = Setting.positiveTimeSetting(LIFECYCLE_POLL_INTERVAL,
2930
TimeValue.timeValueMinutes(10), Setting.Property.Dynamic, Setting.Property.NodeScope);
@@ -49,4 +50,6 @@ public class LifecycleSettings {
4950
Setting.Property.IndexScope, Setting.Property.NotCopyableOnResize, Setting.Property.InternalIndex);
5051
public static final Setting<Boolean> LIFECYCLE_SKIP_SETTING = Setting.boolSetting(LIFECYCLE_SKIP, false,
5152
Setting.Property.Dynamic, Setting.Property.IndexScope);
53+
public static final Setting<String> LIFECYCLE_PHASE_DEFINITION_SETTING = Setting.simpleString(LIFECYCLE_PHASE_DEFINITION,
54+
Setting.Property.Dynamic, Setting.Property.IndexScope);
5255
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/TerminalPolicyStep.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
package org.elasticsearch.xpack.core.indexlifecycle;
77

88
public class TerminalPolicyStep extends Step {
9-
public static final StepKey KEY = new StepKey("completed", "completed", "completed");
9+
public static final String COMPLETED_PHASE = "completed";
10+
public static final StepKey KEY = new StepKey(COMPLETED_PHASE, "completed", "completed");
1011
public static final TerminalPolicyStep INSTANCE = new TerminalPolicyStep(KEY, null);
1112

1213
TerminalPolicyStep(StepKey key, StepKey nextStepKey) {

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ public List<Setting<?>> getSettings() {
134134
LifecycleSettings.LIFECYCLE_STEP_INFO_SETTING,
135135
LifecycleSettings.LIFECYCLE_FAILED_STEP_SETTING,
136136
LifecycleSettings.LIFECYCLE_SKIP_SETTING,
137+
LifecycleSettings.LIFECYCLE_PHASE_DEFINITION_SETTING,
137138
RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING);
138139
}
139140

@@ -146,7 +147,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
146147
return emptyList();
147148
}
148149
indexLifecycleInitialisationService
149-
.set(new IndexLifecycleService(settings, client, clusterService, getClock(), System::currentTimeMillis));
150+
.set(new IndexLifecycleService(settings, client, clusterService, getClock(), System::currentTimeMillis, xContentRegistry));
150151
return Collections.singletonList(indexLifecycleInitialisationService.get());
151152
}
152153

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleRunner.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,10 @@
2626
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateActionStep;
2727
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateWaitStep;
2828
import org.elasticsearch.xpack.core.indexlifecycle.ErrorStep;
29+
import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata;
2930
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
3031
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
32+
import org.elasticsearch.xpack.core.indexlifecycle.Phase;
3133
import org.elasticsearch.xpack.core.indexlifecycle.RolloverAction;
3234
import org.elasticsearch.xpack.core.indexlifecycle.Step;
3335
import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey;
@@ -206,19 +208,23 @@ static ClusterState moveClusterStateToStep(String indexName, ClusterState curren
206208
static ClusterState moveClusterStateToNextStep(Index index, ClusterState clusterState, StepKey currentStep, StepKey nextStep,
207209
LongSupplier nowSupplier) {
208210
IndexMetaData idxMeta = clusterState.getMetaData().index(index);
209-
Settings.Builder indexSettings = moveIndexSettingsToNextStep(idxMeta.getSettings(), currentStep, nextStep, nowSupplier);
211+
IndexLifecycleMetadata ilmMeta = clusterState.metaData().custom(IndexLifecycleMetadata.TYPE);
212+
LifecyclePolicy policy = ilmMeta.getPolicies().get(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(idxMeta.getSettings()));
213+
Settings.Builder indexSettings = moveIndexSettingsToNextStep(policy, idxMeta.getSettings(), currentStep, nextStep, nowSupplier);
210214
ClusterState.Builder newClusterStateBuilder = newClusterStateWithIndexSettings(index, clusterState, indexSettings);
211215
return newClusterStateBuilder.build();
212216
}
213217

214218
static ClusterState moveClusterStateToErrorStep(Index index, ClusterState clusterState, StepKey currentStep, Exception cause,
215219
LongSupplier nowSupplier) throws IOException {
216220
IndexMetaData idxMeta = clusterState.getMetaData().index(index);
221+
IndexLifecycleMetadata ilmMeta = clusterState.metaData().custom(IndexLifecycleMetadata.TYPE);
222+
LifecyclePolicy policy = ilmMeta.getPolicies().get(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(idxMeta.getSettings()));
217223
XContentBuilder causeXContentBuilder = JsonXContent.contentBuilder();
218224
causeXContentBuilder.startObject();
219225
ElasticsearchException.generateThrowableXContent(causeXContentBuilder, ToXContent.EMPTY_PARAMS, cause);
220226
causeXContentBuilder.endObject();
221-
Settings.Builder indexSettings = moveIndexSettingsToNextStep(idxMeta.getSettings(), currentStep,
227+
Settings.Builder indexSettings = moveIndexSettingsToNextStep(policy, idxMeta.getSettings(), currentStep,
222228
new StepKey(currentStep.getPhase(), currentStep.getAction(), ErrorStep.NAME), nowSupplier)
223229
.put(LifecycleSettings.LIFECYCLE_FAILED_STEP, currentStep.getName())
224230
.put(LifecycleSettings.LIFECYCLE_STEP_INFO, BytesReference.bytes(causeXContentBuilder).utf8ToString());
@@ -247,8 +253,8 @@ ClusterState moveClusterStateToFailedStep(ClusterState currentState, String[] in
247253
return newState;
248254
}
249255

250-
private static Settings.Builder moveIndexSettingsToNextStep(Settings existingSettings, StepKey currentStep, StepKey nextStep,
251-
LongSupplier nowSupplier) {
256+
private static Settings.Builder moveIndexSettingsToNextStep(LifecyclePolicy policy, Settings existingSettings,
257+
StepKey currentStep, StepKey nextStep, LongSupplier nowSupplier) {
252258
long nowAsMillis = nowSupplier.getAsLong();
253259
Settings.Builder newSettings = Settings.builder().put(existingSettings).put(LifecycleSettings.LIFECYCLE_PHASE, nextStep.getPhase())
254260
.put(LifecycleSettings.LIFECYCLE_ACTION, nextStep.getAction()).put(LifecycleSettings.LIFECYCLE_STEP, nextStep.getName())
@@ -257,6 +263,18 @@ private static Settings.Builder moveIndexSettingsToNextStep(Settings existingSet
257263
.put(LifecycleSettings.LIFECYCLE_FAILED_STEP, (String) null)
258264
.put(LifecycleSettings.LIFECYCLE_STEP_INFO, (String) null);
259265
if (currentStep.getPhase().equals(nextStep.getPhase()) == false) {
266+
final String newPhaseDefinition;
267+
if ("new".equals(nextStep.getPhase()) || TerminalPolicyStep.KEY.equals(nextStep)) {
268+
newPhaseDefinition = nextStep.getPhase();
269+
} else {
270+
Phase nextPhase = policy.getPhases().get(nextStep.getPhase());
271+
if (nextPhase == null) {
272+
newPhaseDefinition = null;
273+
} else {
274+
newPhaseDefinition = Strings.toString(nextPhase, false, false);
275+
}
276+
}
277+
newSettings.put(LifecycleSettings.LIFECYCLE_PHASE_DEFINITION, newPhaseDefinition);
260278
newSettings.put(LifecycleSettings.LIFECYCLE_PHASE_TIME, nowAsMillis);
261279
}
262280
if (currentStep.getAction().equals(nextStep.getAction()) == false) {
@@ -356,7 +374,7 @@ private static IndexMetaData.Builder setPolicyForIndex(final String newPolicyNam
356374
// next available step
357375
StepKey nextValidStepKey = newPolicy.getNextValidStep(currentStepKey);
358376
if (nextValidStepKey.equals(currentStepKey) == false) {
359-
newSettings = moveIndexSettingsToNextStep(idxSettings, currentStepKey, nextValidStepKey, nowSupplier);
377+
newSettings = moveIndexSettingsToNextStep(newPolicy, idxSettings, currentStepKey, nextValidStepKey, nowSupplier);
360378
}
361379
}
362380
newSettings.put(LifecycleSettings.LIFECYCLE_NAME_SETTING.getKey(), newPolicyName);

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.common.component.AbstractComponent;
2323
import org.elasticsearch.common.settings.Settings;
2424
import org.elasticsearch.common.unit.TimeValue;
25+
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
2526
import org.elasticsearch.threadpool.ThreadPool;
2627
import org.elasticsearch.xpack.core.indexlifecycle.OperationMode;
2728
import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata;
@@ -56,14 +57,15 @@ public class IndexLifecycleService extends AbstractComponent
5657
private LongSupplier nowSupplier;
5758
private SchedulerEngine.Job scheduledJob;
5859

59-
public IndexLifecycleService(Settings settings, Client client, ClusterService clusterService, Clock clock, LongSupplier nowSupplier) {
60+
public IndexLifecycleService(Settings settings, Client client, ClusterService clusterService, Clock clock, LongSupplier nowSupplier,
61+
NamedXContentRegistry xContentRegistry) {
6062
super(settings);
6163
this.client = client;
6264
this.clusterService = clusterService;
6365
this.clock = clock;
6466
this.nowSupplier = nowSupplier;
6567
this.scheduledJob = null;
66-
this.policyRegistry = new PolicyStepsRegistry();
68+
this.policyRegistry = new PolicyStepsRegistry(xContentRegistry);
6769
this.lifecycleRunner = new IndexLifecycleRunner(policyRegistry, clusterService, nowSupplier);
6870
this.pollInterval = LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.get(settings);
6971
clusterService.addStateApplier(this);
@@ -143,7 +145,6 @@ public void applyClusterState(ClusterChangedEvent event) {
143145
policyRegistry.removeIndices(event.indicesDeleted());
144146
}
145147
if (event.state().metaData().custom(IndexLifecycleMetadata.TYPE) != null) {
146-
// update policy steps registry
147148
policyRegistry.update(event.state(), client, nowSupplier);
148149
}
149150
}

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/PolicyStepsRegistry.java

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,23 @@
1616
import org.elasticsearch.common.Nullable;
1717
import org.elasticsearch.common.io.stream.StreamInput;
1818
import org.elasticsearch.common.io.stream.StreamOutput;
19+
import org.elasticsearch.common.xcontent.DeprecationHandler;
20+
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
21+
import org.elasticsearch.common.xcontent.XContentParser;
22+
import org.elasticsearch.common.xcontent.json.JsonXContent;
1923
import org.elasticsearch.index.Index;
2024
import org.elasticsearch.xpack.core.ClientHelper;
2125
import org.elasticsearch.xpack.core.indexlifecycle.ErrorStep;
2226
import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata;
27+
import org.elasticsearch.xpack.core.indexlifecycle.InitializePolicyContextStep;
28+
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
2329
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicyMetadata;
2430
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
31+
import org.elasticsearch.xpack.core.indexlifecycle.Phase;
2532
import org.elasticsearch.xpack.core.indexlifecycle.Step;
33+
import org.elasticsearch.xpack.core.indexlifecycle.TerminalPolicyStep;
2634

35+
import java.io.IOException;
2736
import java.util.ArrayList;
2837
import java.util.HashMap;
2938
import java.util.List;
@@ -44,21 +53,24 @@ public class PolicyStepsRegistry {
4453
private final Map<String, Map<Step.StepKey, Step>> stepMap;
4554
// A map of index to a list of compiled steps for the current phase
4655
private final Map<Index, List<Step>> indexPhaseSteps;
56+
private final NamedXContentRegistry xContentRegistry;
4757

48-
public PolicyStepsRegistry() {
58+
public PolicyStepsRegistry(NamedXContentRegistry xContentRegistry) {
4959
this.lifecyclePolicyMap = new TreeMap<>();
5060
this.firstStepMap = new HashMap<>();
5161
this.stepMap = new HashMap<>();
5262
this.indexPhaseSteps = new HashMap<>();
63+
this.xContentRegistry = xContentRegistry;
5364
}
5465

5566
PolicyStepsRegistry(SortedMap<String, LifecyclePolicyMetadata> lifecyclePolicyMap,
5667
Map<String, Step> firstStepMap, Map<String, Map<Step.StepKey, Step>> stepMap,
57-
Map<Index, List<Step>> indexPhaseSteps) {
68+
Map<Index, List<Step>> indexPhaseSteps, NamedXContentRegistry xContentRegistry) {
5869
this.lifecyclePolicyMap = lifecyclePolicyMap;
5970
this.firstStepMap = firstStepMap;
6071
this.stepMap = stepMap;
6172
this.indexPhaseSteps = indexPhaseSteps;
73+
this.xContentRegistry = xContentRegistry;
6274
}
6375

6476
SortedMap<String, LifecyclePolicyMetadata> getLifecyclePolicyMap() {
@@ -138,30 +150,54 @@ public LifecyclePolicyMetadata read(StreamInput in, String key) {
138150
for (ObjectCursor<IndexMetaData> imd : clusterState.metaData().getIndices().values()) {
139151
final Index index = imd.value.getIndex();
140152
final String policy = imd.value.getSettings().get(LifecycleSettings.LIFECYCLE_NAME);
141-
if (policy == null) {
153+
if (policy == null || lifecyclePolicyMap.containsKey(policy) == false) {
142154
indexPhaseSteps.remove(index);
143155
} else {
144156
final List<Step> currentSteps = indexPhaseSteps.get(index);
145157
// Get the current steps' phase, if there are steps stored
146158
final String existingPhase = (currentSteps == null || currentSteps.size() == 0) ?
147159
"_none_" : currentSteps.get(0).getKey().getPhase();
148160
// Retrieve the current phase, defaulting to "new" if no phase is set
149-
final String currentPhase = imd.value.getSettings().get(LifecycleSettings.LIFECYCLE_PHASE, "new");
161+
final String currentPhase = imd.value.getSettings().get(LifecycleSettings.LIFECYCLE_PHASE,
162+
InitializePolicyContextStep.INITIALIZATION_PHASE);
150163

151164
if (existingPhase.equals(currentPhase) == false) {
152165
logger.debug("index [{}] has transitioned phases [{} -> {}], rebuilding step list",
153166
index, existingPhase, currentPhase);
154-
// Only rebuild the index's steps if the phase of the existing steps does not match our index's current phase
155-
final Map<Step.StepKey, Step> steps = stepMap.get(policy);
156-
167+
// parse existing phase steps from the phase definition in the index settings
168+
String phaseDef = imd.value.getSettings().get(LifecycleSettings.LIFECYCLE_PHASE_DEFINITION,
169+
InitializePolicyContextStep.INITIALIZATION_PHASE);
170+
final Phase phase;
171+
LifecyclePolicy currentPolicy = lifecyclePolicyMap.get(policy).getPolicy();
172+
final LifecyclePolicy policyToExecute;
173+
if (InitializePolicyContextStep.INITIALIZATION_PHASE.equals(phaseDef)
174+
|| TerminalPolicyStep.COMPLETED_PHASE.equals(phaseDef)) {
175+
// It is ok to re-use potentially modified policy here since we are in an initialization or completed phase
176+
policyToExecute = currentPolicy;
177+
} else {
178+
// if the current phase definition describes an internal step/phase, do not parse
179+
try (XContentParser parser = JsonXContent.jsonXContent.createParser(xContentRegistry,
180+
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, phaseDef)) {
181+
phase = Phase.parse(parser, currentPhase);
182+
} catch (IOException e) {
183+
logger.error("failed to configure phase [" + currentPhase + "] for index [" + index.getName() + "]", e);
184+
indexPhaseSteps.remove(index);
185+
continue;
186+
}
187+
Map<String, Phase> phaseMap = new HashMap<>(currentPolicy.getPhases());
188+
if (phase != null) {
189+
phaseMap.put(currentPhase, phase);
190+
}
191+
policyToExecute = new LifecyclePolicy(currentPolicy.getType(), currentPolicy.getName(), phaseMap);
192+
}
193+
final List<Step> steps = policyToExecute.toSteps(client, nowSupplier);
157194
// Build a list of steps that correspond with the phase the index is currently in
158195
final List<Step> phaseSteps;
159196
if (steps == null) {
160197
phaseSteps = new ArrayList<>();
161198
} else {
162-
phaseSteps = steps.entrySet().stream()
199+
phaseSteps = steps.stream()
163200
.filter(e -> e.getKey().getPhase().equals(currentPhase))
164-
.map(Map.Entry::getValue)
165201
.collect(Collectors.toList());
166202
}
167203
indexPhaseSteps.put(index, phaseSteps);

0 commit comments

Comments
 (0)