Skip to content

Commit 060100d

Browse files
authored
ILM action to wait for SLM policy execution (#50454)
This change add new ILM action to wait for SLM policy execution to ensure that index has snapshot before deletion. Closes #45067
1 parent e5eff68 commit 060100d

File tree

14 files changed

+540
-2
lines changed

14 files changed

+540
-2
lines changed

docs/reference/ilm/policy-definitions.asciidoc

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ policy definition.
109109
- <<ilm-allocate-action,Allocate>>
110110
- <<ilm-freeze-action,Freeze>>
111111
* Delete
112+
- <<ilm-delete-action,Wait For Snapshot>>
112113
- <<ilm-delete-action,Delete>>
113114

114115
[[ilm-allocate-action]]
@@ -220,6 +221,40 @@ PUT _ilm/policy/my_policy
220221
}
221222
--------------------------------------------------
222223

224+
[[ilm-wait-for-snapshot-action]]
225+
==== Wait For Snapshot
226+
227+
Phases allowed: delete.
228+
229+
The Wait For Snapshot Action waits for defined SLM policy to be executed to ensure that snapshot of index exists before
230+
deletion.
231+
232+
[[ilm-wait-for-snapshot-options]]
233+
.Wait For Snapshot
234+
[options="header"]
235+
|======
236+
| Name | Required | Default | Description
237+
| `policy` | yes | - | SLM policy name that this action should wait for
238+
|======
239+
240+
[source,console]
241+
--------------------------------------------------
242+
PUT _ilm/policy/my_policy
243+
{
244+
"policy": {
245+
"phases": {
246+
"delete": {
247+
"actions": {
248+
"wait_for_snapshot" : {
249+
"policy": "slm-policy-name"
250+
}
251+
}
252+
}
253+
}
254+
}
255+
}
256+
--------------------------------------------------
257+
223258
[[ilm-delete-action]]
224259
==== Delete
225260

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.elasticsearch.xpack.core.ilm.RolloverAction;
5555
import org.elasticsearch.xpack.core.ilm.SetPriorityAction;
5656
import org.elasticsearch.xpack.core.ilm.ShrinkAction;
57+
import org.elasticsearch.xpack.core.ilm.WaitForSnapshotAction;
5758
import org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType;
5859
import org.elasticsearch.xpack.core.ilm.UnfollowAction;
5960
import org.elasticsearch.xpack.core.ilm.action.DeleteLifecycleAction;
@@ -541,6 +542,7 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
541542
new NamedWriteableRegistry.Entry(LifecycleAction.class, FreezeAction.NAME, FreezeAction::new),
542543
new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new),
543544
new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new),
545+
new NamedWriteableRegistry.Entry(LifecycleAction.class, WaitForSnapshotAction.NAME, WaitForSnapshotAction::new),
544546
// Data Frame
545547
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.TRANSFORM, TransformFeatureSetUsage::new),
546548
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, TransformField.TASK_NAME, TransformTaskParams::new),

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class TimeseriesLifecycleType implements LifecycleType {
3737
AllocateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME);
3838
static final List<String> ORDERED_VALID_COLD_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, AllocateAction.NAME,
3939
FreezeAction.NAME);
40-
static final List<String> ORDERED_VALID_DELETE_ACTIONS = Arrays.asList(DeleteAction.NAME);
40+
static final List<String> ORDERED_VALID_DELETE_ACTIONS = Arrays.asList(WaitForSnapshotAction.NAME, DeleteAction.NAME);
4141
static final Set<String> VALID_HOT_ACTIONS = Sets.newHashSet(ORDERED_VALID_HOT_ACTIONS);
4242
static final Set<String> VALID_WARM_ACTIONS = Sets.newHashSet(ORDERED_VALID_WARM_ACTIONS);
4343
static final Set<String> VALID_COLD_ACTIONS = Sets.newHashSet(ORDERED_VALID_COLD_ACTIONS);
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.core.ilm;
7+
8+
import org.elasticsearch.client.Client;
9+
import org.elasticsearch.common.ParseField;
10+
import org.elasticsearch.common.Strings;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.common.io.stream.StreamOutput;
13+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
14+
import org.elasticsearch.common.xcontent.XContentBuilder;
15+
import org.elasticsearch.common.xcontent.XContentParser;
16+
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
17+
18+
import java.io.IOException;
19+
import java.util.Collections;
20+
import java.util.List;
21+
import java.util.Objects;
22+
23+
/**
24+
* A {@link LifecycleAction} which waits for snapshot to be taken (by configured SLM policy).
25+
*/
26+
public class WaitForSnapshotAction implements LifecycleAction {
27+
28+
public static final String NAME = "wait_for_snapshot";
29+
public static final ParseField POLICY_FIELD = new ParseField("policy");
30+
31+
private static final ConstructingObjectParser<WaitForSnapshotAction, Void> PARSER = new ConstructingObjectParser<>(NAME,
32+
a -> new WaitForSnapshotAction((String) a[0]));
33+
34+
static {
35+
PARSER.declareString(ConstructingObjectParser.constructorArg(), POLICY_FIELD);
36+
}
37+
38+
private final String policy;
39+
40+
public static WaitForSnapshotAction parse(XContentParser parser) {
41+
return PARSER.apply(parser, null);
42+
}
43+
44+
public WaitForSnapshotAction(String policy) {
45+
if (Strings.hasText(policy) == false) {
46+
throw new IllegalArgumentException("policy name must be specified");
47+
}
48+
this.policy = policy;
49+
}
50+
51+
public WaitForSnapshotAction(StreamInput in) throws IOException {
52+
this(in.readString());
53+
}
54+
55+
@Override
56+
public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
57+
StepKey waitForSnapshotKey = new StepKey(phase, NAME, WaitForSnapshotStep.NAME);
58+
return Collections.singletonList(new WaitForSnapshotStep(waitForSnapshotKey, nextStepKey, policy));
59+
}
60+
61+
@Override
62+
public boolean isSafeAction() {
63+
return true;
64+
}
65+
66+
@Override
67+
public String getWriteableName() {
68+
return NAME;
69+
}
70+
71+
@Override
72+
public void writeTo(StreamOutput out) throws IOException {
73+
out.writeString(policy);
74+
}
75+
76+
@Override
77+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
78+
builder.startObject();
79+
builder.field(POLICY_FIELD.getPreferredName(), policy);
80+
builder.endObject();
81+
return builder;
82+
}
83+
84+
@Override
85+
public boolean equals(Object o) {
86+
if (this == o) return true;
87+
if (o == null || getClass() != o.getClass()) return false;
88+
WaitForSnapshotAction that = (WaitForSnapshotAction) o;
89+
return policy.equals(that.policy);
90+
}
91+
92+
@Override
93+
public int hashCode() {
94+
return Objects.hash(policy);
95+
}
96+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.core.ilm;
7+
8+
import org.elasticsearch.cluster.ClusterState;
9+
import org.elasticsearch.cluster.metadata.IndexMetaData;
10+
import org.elasticsearch.common.xcontent.ToXContentObject;
11+
import org.elasticsearch.index.Index;
12+
import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata;
13+
import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicyMetadata;
14+
15+
import java.util.Date;
16+
import java.util.Locale;
17+
import java.util.Objects;
18+
19+
/***
20+
* A step that waits for snapshot to be taken by SLM to ensure we have backup before we delete the index.
21+
* It will signal error if it can't get data needed to do the check (phase time from ILM and SLM metadata)
22+
* and will only return success if execution of SLM policy took place after index entered deleted phase.
23+
*/
24+
public class WaitForSnapshotStep extends ClusterStateWaitStep {
25+
26+
static final String NAME = "wait-for-snapshot";
27+
28+
private static final String MESSAGE_FIELD = "message";
29+
private static final String POLICY_NOT_EXECUTED_MESSAGE = "waiting for policy '%s' to be executed since %s";
30+
private static final String POLICY_NOT_FOUND_MESSAGE = "configured policy '%s' not found";
31+
private static final String NO_INDEX_METADATA_MESSAGE = "no index metadata found for index '%s'";
32+
private static final String NO_PHASE_TIME_MESSAGE = "no information about ILM phase start in index metadata for index '%s'";
33+
34+
private final String policy;
35+
36+
WaitForSnapshotStep(StepKey key, StepKey nextStepKey, String policy) {
37+
super(key, nextStepKey);
38+
this.policy = policy;
39+
}
40+
41+
@Override
42+
public Result isConditionMet(Index index, ClusterState clusterState) {
43+
IndexMetaData indexMetaData = clusterState.metaData().index(index);
44+
if (indexMetaData == null) {
45+
throw error(NO_INDEX_METADATA_MESSAGE, index.getName());
46+
}
47+
48+
Long phaseTime = LifecycleExecutionState.fromIndexMetadata(indexMetaData).getPhaseTime();
49+
50+
if (phaseTime == null) {
51+
throw error(NO_PHASE_TIME_MESSAGE, index.getName());
52+
}
53+
54+
SnapshotLifecycleMetadata snapMeta = clusterState.metaData().custom(SnapshotLifecycleMetadata.TYPE);
55+
if (snapMeta == null || snapMeta.getSnapshotConfigurations().containsKey(policy) == false) {
56+
throw error(POLICY_NOT_FOUND_MESSAGE, policy);
57+
}
58+
SnapshotLifecyclePolicyMetadata snapPolicyMeta = snapMeta.getSnapshotConfigurations().get(policy);
59+
if (snapPolicyMeta.getLastSuccess() == null || snapPolicyMeta.getLastSuccess().getTimestamp() < phaseTime) {
60+
return new Result(false, notExecutedMessage(phaseTime));
61+
}
62+
63+
return new Result(true, null);
64+
}
65+
66+
public String getPolicy() {
67+
return policy;
68+
}
69+
70+
@Override
71+
public boolean isRetryable() {
72+
return true;
73+
}
74+
75+
private ToXContentObject notExecutedMessage(long time) {
76+
return (builder, params) -> {
77+
builder.startObject();
78+
builder.field(MESSAGE_FIELD, String.format(Locale.ROOT, POLICY_NOT_EXECUTED_MESSAGE, policy, new Date(time)));
79+
builder.endObject();
80+
return builder;
81+
};
82+
}
83+
84+
private IllegalStateException error(String message, Object... args) {
85+
return new IllegalStateException(String.format(Locale.ROOT, message, args));
86+
}
87+
88+
@Override
89+
public boolean equals(Object o) {
90+
if (this == o) return true;
91+
if (o == null || getClass() != o.getClass()) return false;
92+
if (!super.equals(o)) return false;
93+
WaitForSnapshotStep that = (WaitForSnapshotStep) o;
94+
return policy.equals(that.policy);
95+
}
96+
97+
@Override
98+
public int hashCode() {
99+
return Objects.hash(super.hashCode(), policy);
100+
}
101+
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/LifecyclePolicyMetadataTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ protected NamedWriteableRegistry getNamedWriteableRegistry() {
3939
new NamedWriteableRegistry.Entry(LifecycleType.class, TimeseriesLifecycleType.TYPE,
4040
(in) -> TimeseriesLifecycleType.INSTANCE),
4141
new NamedWriteableRegistry.Entry(LifecycleAction.class, AllocateAction.NAME, AllocateAction::new),
42+
new NamedWriteableRegistry.Entry(LifecycleAction.class, WaitForSnapshotAction.NAME, WaitForSnapshotAction::new),
4243
new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new),
4344
new NamedWriteableRegistry.Entry(LifecycleAction.class, ForceMergeAction.NAME, ForceMergeAction::new),
4445
new NamedWriteableRegistry.Entry(LifecycleAction.class, ReadOnlyAction.NAME, ReadOnlyAction::new),
@@ -57,6 +58,8 @@ protected NamedXContentRegistry xContentRegistry() {
5758
new NamedXContentRegistry.Entry(LifecycleType.class, new ParseField(TimeseriesLifecycleType.TYPE),
5859
(p) -> TimeseriesLifecycleType.INSTANCE),
5960
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(AllocateAction.NAME), AllocateAction::parse),
61+
new NamedXContentRegistry.Entry(LifecycleAction.class,
62+
new ParseField(WaitForSnapshotAction.NAME), WaitForSnapshotAction::parse),
6063
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse),
6164
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ForceMergeAction.NAME), ForceMergeAction::parse),
6265
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ReadOnlyAction.NAME), ReadOnlyAction::parse),

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/LifecyclePolicyTests.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ protected NamedWriteableRegistry getNamedWriteableRegistry() {
4848
new NamedWriteableRegistry.Entry(LifecycleType.class, TimeseriesLifecycleType.TYPE,
4949
(in) -> TimeseriesLifecycleType.INSTANCE),
5050
new NamedWriteableRegistry.Entry(LifecycleAction.class, AllocateAction.NAME, AllocateAction::new),
51+
new NamedWriteableRegistry.Entry(LifecycleAction.class, WaitForSnapshotAction.NAME, WaitForSnapshotAction::new),
5152
new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new),
5253
new NamedWriteableRegistry.Entry(LifecycleAction.class, ForceMergeAction.NAME, ForceMergeAction::new),
5354
new NamedWriteableRegistry.Entry(LifecycleAction.class, ReadOnlyAction.NAME, ReadOnlyAction::new),
@@ -66,6 +67,8 @@ protected NamedXContentRegistry xContentRegistry() {
6667
new NamedXContentRegistry.Entry(LifecycleType.class, new ParseField(TimeseriesLifecycleType.TYPE),
6768
(p) -> TimeseriesLifecycleType.INSTANCE),
6869
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(AllocateAction.NAME), AllocateAction::parse),
70+
new NamedXContentRegistry.Entry(LifecycleAction.class,
71+
new ParseField(WaitForSnapshotAction.NAME), WaitForSnapshotAction::parse),
6972
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse),
7073
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ForceMergeAction.NAME), ForceMergeAction::parse),
7174
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ReadOnlyAction.NAME), ReadOnlyAction::parse),
@@ -110,6 +113,8 @@ public static LifecyclePolicy randomTimeseriesLifecyclePolicyWithAllPhases(@Null
110113
return AllocateActionTests.randomInstance();
111114
case DeleteAction.NAME:
112115
return new DeleteAction();
116+
case WaitForSnapshotAction.NAME:
117+
return WaitForSnapshotActionTests.randomInstance();
113118
case ForceMergeAction.NAME:
114119
return ForceMergeActionTests.randomInstance();
115120
case ReadOnlyAction.NAME:
@@ -160,6 +165,8 @@ public static LifecyclePolicy randomTimeseriesLifecyclePolicy(@Nullable String l
160165
switch (action) {
161166
case AllocateAction.NAME:
162167
return AllocateActionTests.randomInstance();
168+
case WaitForSnapshotAction.NAME:
169+
return WaitForSnapshotActionTests.randomInstance();
163170
case DeleteAction.NAME:
164171
return new DeleteAction();
165172
case ForceMergeAction.NAME:

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleTypeTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
3434
private static final AllocateAction TEST_ALLOCATE_ACTION =
3535
new AllocateAction(2, Collections.singletonMap("node", "node1"),null, null);
3636
private static final DeleteAction TEST_DELETE_ACTION = new DeleteAction();
37+
private static final WaitForSnapshotAction TEST_WAIT_FOR_SNAPSHOT_ACTION = new WaitForSnapshotAction("policy");
3738
private static final ForceMergeAction TEST_FORCE_MERGE_ACTION = new ForceMergeAction(1);
3839
private static final RolloverAction TEST_ROLLOVER_ACTION = new RolloverAction(new ByteSizeValue(1), null, null);
3940
private static final ShrinkAction TEST_SHRINK_ACTION = new ShrinkAction(1);
@@ -556,6 +557,8 @@ private LifecycleAction getTestAction(String actionName) {
556557
switch (actionName) {
557558
case AllocateAction.NAME:
558559
return TEST_ALLOCATE_ACTION;
560+
case WaitForSnapshotAction.NAME:
561+
return TEST_WAIT_FOR_SNAPSHOT_ACTION;
559562
case DeleteAction.NAME:
560563
return TEST_DELETE_ACTION;
561564
case ForceMergeAction.NAME:
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.core.ilm;
7+
8+
import org.elasticsearch.common.io.stream.Writeable;
9+
import org.elasticsearch.common.xcontent.XContentParser;
10+
11+
import java.io.IOException;
12+
import java.util.List;
13+
14+
public class WaitForSnapshotActionTests extends AbstractActionTestCase<WaitForSnapshotAction> {
15+
16+
@Override
17+
public void testToSteps() {
18+
WaitForSnapshotAction action = createTestInstance();
19+
Step.StepKey nextStep = new Step.StepKey("", "", "");
20+
List<Step> steps = action.toSteps(null, "delete", nextStep);
21+
assertEquals(1, steps.size());
22+
Step step = steps.get(0);
23+
assertTrue(step instanceof WaitForSnapshotStep);
24+
assertEquals(nextStep, step.getNextStepKey());
25+
26+
Step.StepKey key = step.getKey();
27+
assertEquals("delete", key.getPhase());
28+
assertEquals(WaitForSnapshotAction.NAME, key.getAction());
29+
assertEquals(WaitForSnapshotStep.NAME, key.getName());
30+
}
31+
32+
@Override
33+
protected WaitForSnapshotAction doParseInstance(XContentParser parser) throws IOException {
34+
return WaitForSnapshotAction.parse(parser);
35+
}
36+
37+
@Override
38+
protected WaitForSnapshotAction createTestInstance() {
39+
return randomInstance();
40+
}
41+
42+
@Override
43+
protected Writeable.Reader<WaitForSnapshotAction> instanceReader() {
44+
return WaitForSnapshotAction::new;
45+
}
46+
47+
@Override
48+
protected WaitForSnapshotAction mutateInstance(WaitForSnapshotAction instance) throws IOException {
49+
return randomInstance();
50+
}
51+
52+
static WaitForSnapshotAction randomInstance() {
53+
return new WaitForSnapshotAction(randomAlphaOfLengthBetween(5, 10));
54+
}
55+
56+
}

0 commit comments

Comments
 (0)