Skip to content

Commit 176af36

Browse files
committed
Add Rollup ILM Action
this commit introduces a new Rollup ILM Action that allows indices to be rolled up according to a specific rollup config. The action also allows for the new rolled up index to be associated with a different policy than the original/source index. Optionally, the original index can be deleted. Relates #42720. Closes #48003.
1 parent c58937b commit 176af36

File tree

14 files changed

+716
-21
lines changed

14 files changed

+716
-21
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.elasticsearch.xpack.core.ilm.MigrateAction;
6060
import org.elasticsearch.xpack.core.ilm.ReadOnlyAction;
6161
import org.elasticsearch.xpack.core.ilm.RolloverAction;
62+
import org.elasticsearch.xpack.core.ilm.RollupILMAction;
6263
import org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction;
6364
import org.elasticsearch.xpack.core.ilm.SetPriorityAction;
6465
import org.elasticsearch.xpack.core.ilm.ShrinkAction;
@@ -420,7 +421,7 @@ public List<ActionType<? extends ActionResponse>> getClientActions() {
420421

421422
@Override
422423
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
423-
return Arrays.asList(
424+
List<NamedWriteableRegistry.Entry> namedWriteables = new ArrayList<>(Arrays.asList(
424425
// graph
425426
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.GRAPH, GraphFeatureSetUsage::new),
426427
// logstash
@@ -523,7 +524,13 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
523524
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.DATA_STREAMS, DataStreamFeatureSetUsage::new),
524525
// Data Tiers
525526
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.DATA_TIERS, DataTiersFeatureSetUsage::new)
526-
);
527+
));
528+
529+
if (RollupV2.isEnabled()) {
530+
namedWriteables.add(new NamedWriteableRegistry.Entry(LifecycleAction.class, RollupILMAction.NAME, RollupILMAction::new));
531+
}
532+
533+
return namedWriteables;
527534
}
528535

529536
@Override
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.core.ilm;
7+
8+
import org.elasticsearch.client.Client;
9+
import org.elasticsearch.cluster.metadata.IndexMetadata;
10+
import org.elasticsearch.common.Nullable;
11+
import org.elasticsearch.common.ParseField;
12+
import org.elasticsearch.common.Strings;
13+
import org.elasticsearch.common.io.stream.StreamInput;
14+
import org.elasticsearch.common.io.stream.StreamOutput;
15+
import org.elasticsearch.common.settings.Settings;
16+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
17+
import org.elasticsearch.common.xcontent.ObjectParser;
18+
import org.elasticsearch.common.xcontent.XContentBuilder;
19+
import org.elasticsearch.common.xcontent.XContentParser;
20+
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
21+
import org.elasticsearch.xpack.core.rollup.v2.RollupActionConfig;
22+
23+
import java.io.IOException;
24+
import java.util.List;
25+
import java.util.Objects;
26+
27+
/**
28+
* A {@link LifecycleAction} which calls {@link org.elasticsearch.xpack.core.rollup.v2.RollupAction} on an index
29+
*/
30+
public class RollupILMAction implements LifecycleAction {
31+
public static final String NAME = "rollup";
32+
33+
private static final ParseField CONFIG_FIELD = new ParseField("config");
34+
private static final ParseField DELETE_FIELD = new ParseField("delete_original");
35+
private static final ParseField POLICY_FIELD = new ParseField("rollup_policy");
36+
37+
@SuppressWarnings("unchecked")
38+
private static final ConstructingObjectParser<RollupILMAction, Void> PARSER = new ConstructingObjectParser<>(NAME,
39+
a -> new RollupILMAction((RollupActionConfig) a[0], (boolean) a[1], (String) a[2]));
40+
41+
private final RollupActionConfig config;
42+
private final boolean deleteOriginalIndex;
43+
private final String rollupPolicy;
44+
45+
static {
46+
PARSER.declareField(ConstructingObjectParser.constructorArg(),
47+
(p, c) -> RollupActionConfig.fromXContent(p), CONFIG_FIELD, ObjectParser.ValueType.OBJECT);
48+
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), DELETE_FIELD);
49+
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), POLICY_FIELD);
50+
}
51+
52+
public static RollupILMAction parse(XContentParser parser) {
53+
return PARSER.apply(parser, null);
54+
}
55+
56+
public RollupILMAction(RollupActionConfig config, boolean deleteOriginalIndex, @Nullable String rollupPolicy) {
57+
this.config = config;
58+
this.deleteOriginalIndex = deleteOriginalIndex;
59+
this.rollupPolicy = rollupPolicy;
60+
}
61+
62+
public RollupILMAction(StreamInput in) throws IOException {
63+
this(new RollupActionConfig(in), in.readBoolean(), in.readOptionalString());
64+
}
65+
66+
@Override
67+
public String getWriteableName() {
68+
return NAME;
69+
}
70+
71+
RollupActionConfig config() {
72+
return config;
73+
}
74+
75+
boolean deleteOriginalIndex() {
76+
return deleteOriginalIndex;
77+
}
78+
79+
String rollupPolicy() {
80+
return rollupPolicy;
81+
}
82+
83+
@Override
84+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
85+
builder.startObject();
86+
builder.field(CONFIG_FIELD.getPreferredName(), config);
87+
builder.field(DELETE_FIELD.getPreferredName(), deleteOriginalIndex);
88+
if (rollupPolicy != null) {
89+
builder.field(POLICY_FIELD.getPreferredName(), rollupPolicy);
90+
}
91+
builder.endObject();
92+
return builder;
93+
}
94+
95+
@Override
96+
public void writeTo(StreamOutput out) throws IOException {
97+
config.writeTo(out);
98+
out.writeBoolean(deleteOriginalIndex);
99+
out.writeOptionalString(rollupPolicy);
100+
}
101+
102+
@Override
103+
public boolean isSafeAction() {
104+
return false;
105+
}
106+
107+
@Override
108+
public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
109+
StepKey checkNotWriteIndex = new StepKey(phase, NAME, CheckNotDataStreamWriteIndexStep.NAME);
110+
StepKey readOnlyKey = new StepKey(phase, NAME, ReadOnlyAction.NAME);
111+
StepKey rollupKey = new StepKey(phase, NAME, NAME);
112+
StepKey waitForNoFollowerStepKey = new StepKey(phase, NAME, WaitForNoFollowersStep.NAME);
113+
StepKey deleteStepKey = new StepKey(phase, NAME, DeleteStep.NAME);
114+
115+
Settings readOnlySettings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true).build();
116+
117+
CheckNotDataStreamWriteIndexStep checkNotWriteIndexStep = new CheckNotDataStreamWriteIndexStep(checkNotWriteIndex,
118+
readOnlyKey);
119+
UpdateSettingsStep readOnlyStep = new UpdateSettingsStep(readOnlyKey, rollupKey, client, readOnlySettings);
120+
WaitForNoFollowersStep waitForNoFollowersStep = new WaitForNoFollowersStep(waitForNoFollowerStepKey, deleteStepKey, client);
121+
122+
if (deleteOriginalIndex) {
123+
RollupStep rollupStep = new RollupStep(rollupKey, waitForNoFollowerStepKey, client, config, rollupPolicy);
124+
DeleteStep deleteStep = new DeleteStep(deleteStepKey, nextStepKey, client);
125+
return List.of(checkNotWriteIndexStep, readOnlyStep, rollupStep, waitForNoFollowersStep, deleteStep);
126+
} else {
127+
RollupStep rollupStep = new RollupStep(rollupKey, nextStepKey, client, config, rollupPolicy);
128+
return List.of(checkNotWriteIndexStep, readOnlyStep, rollupStep);
129+
}
130+
}
131+
132+
@Override
133+
public boolean equals(Object o) {
134+
if (this == o) return true;
135+
if (o == null || getClass() != o.getClass()) return false;
136+
137+
RollupILMAction that = (RollupILMAction) o;
138+
139+
return Objects.equals(this.config, that.config)
140+
&& Objects.equals(this.deleteOriginalIndex, that.deleteOriginalIndex)
141+
&& Objects.equals(this.rollupPolicy, that.rollupPolicy);
142+
}
143+
144+
@Override
145+
public int hashCode() {
146+
return Objects.hash(config, deleteOriginalIndex, rollupPolicy);
147+
}
148+
149+
@Override
150+
public String toString() {
151+
return Strings.toString(this);
152+
}
153+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.core.ilm;
7+
8+
import org.elasticsearch.action.ActionListener;
9+
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
10+
import org.elasticsearch.client.Client;
11+
import org.elasticsearch.cluster.ClusterState;
12+
import org.elasticsearch.cluster.ClusterStateObserver;
13+
import org.elasticsearch.cluster.metadata.IndexMetadata;
14+
import org.elasticsearch.common.settings.Settings;
15+
import org.elasticsearch.xpack.core.rollup.v2.RollupAction;
16+
import org.elasticsearch.xpack.core.rollup.v2.RollupActionConfig;
17+
18+
import java.util.Objects;
19+
20+
/**
21+
* Rolls up index using a {@link RollupActionConfig}
22+
*/
23+
public class RollupStep extends AsyncActionStep {
24+
public static final String NAME = "rollup";
25+
public static final String ROLLUP_INDEX_NAME_POSTFIX = "-rollup";
26+
27+
private final RollupActionConfig config;
28+
private final String rollupPolicy;
29+
30+
public RollupStep(StepKey key, StepKey nextStepKey, Client client, RollupActionConfig config, String rollupPolicy) {
31+
super(key, nextStepKey, client);
32+
this.config = config;
33+
this.rollupPolicy = rollupPolicy;
34+
}
35+
36+
@Override
37+
public boolean isRetryable() {
38+
return false;
39+
}
40+
41+
@Override
42+
public void performAction(IndexMetadata indexMetadata, ClusterState currentState, ClusterStateObserver observer, Listener listener) {
43+
String originalIndex = indexMetadata.getIndex().getName();
44+
String rollupIndex = originalIndex + ROLLUP_INDEX_NAME_POSTFIX;
45+
// TODO(talevy): change config to be immutable
46+
config.setRollupIndex(rollupIndex);
47+
RollupAction.Request request = new RollupAction.Request(originalIndex, config);
48+
if (rollupPolicy == null) {
49+
getClient().execute(RollupAction.INSTANCE, request,
50+
ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure));
51+
} else {
52+
Settings setPolicySettings = Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, rollupPolicy).build();
53+
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(rollupIndex)
54+
.masterNodeTimeout(getMasterTimeout(currentState)).settings(setPolicySettings);
55+
getClient().execute(RollupAction.INSTANCE, request,
56+
ActionListener.wrap(rollupResponse -> {
57+
getClient().admin().indices().updateSettings(updateSettingsRequest,
58+
ActionListener.wrap(settingsResponse -> listener.onResponse(true), listener::onFailure));
59+
}, listener::onFailure));
60+
}
61+
}
62+
63+
public RollupActionConfig getConfig() {
64+
return config;
65+
}
66+
67+
public String getRollupPolicy() {
68+
return rollupPolicy;
69+
}
70+
71+
@Override
72+
public int hashCode() {
73+
return Objects.hash(super.hashCode(), config, rollupPolicy);
74+
}
75+
76+
@Override
77+
public boolean equals(Object obj) {
78+
if (obj == null) {
79+
return false;
80+
}
81+
if (getClass() != obj.getClass()) {
82+
return false;
83+
}
84+
RollupStep other = (RollupStep) obj;
85+
return super.equals(obj)
86+
&& Objects.equals(config, other.config)
87+
&& Objects.equals(rollupPolicy, other.rollupPolicy);
88+
}
89+
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleType.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.elasticsearch.common.Strings;
99
import org.elasticsearch.common.io.stream.StreamOutput;
1010
import org.elasticsearch.common.util.set.Sets;
11+
import org.elasticsearch.rollup.RollupV2;
1112

1213
import java.io.IOException;
1314
import java.util.ArrayList;
@@ -44,25 +45,36 @@ public class TimeseriesLifecycleType implements LifecycleType {
4445
ReadOnlyAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME, SearchableSnapshotAction.NAME);
4546
static final List<String> ORDERED_VALID_WARM_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, ReadOnlyAction.NAME,
4647
AllocateAction.NAME, MigrateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME);
47-
static final List<String> ORDERED_VALID_COLD_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, AllocateAction.NAME,
48-
MigrateAction.NAME, FreezeAction.NAME, SearchableSnapshotAction.NAME);
48+
static final List<String> ORDERED_VALID_COLD_ACTIONS;
4949
static final List<String> ORDERED_VALID_DELETE_ACTIONS = Arrays.asList(WaitForSnapshotAction.NAME, DeleteAction.NAME);
5050
static final Set<String> VALID_HOT_ACTIONS = Sets.newHashSet(ORDERED_VALID_HOT_ACTIONS);
5151
static final Set<String> VALID_WARM_ACTIONS = Sets.newHashSet(ORDERED_VALID_WARM_ACTIONS);
52-
static final Set<String> VALID_COLD_ACTIONS = Sets.newHashSet(ORDERED_VALID_COLD_ACTIONS);
52+
static final Set<String> VALID_COLD_ACTIONS;
5353
static final Set<String> VALID_DELETE_ACTIONS = Sets.newHashSet(ORDERED_VALID_DELETE_ACTIONS);
54-
private static final Map<String, Set<String>> ALLOWED_ACTIONS = Map.of(
55-
HOT_PHASE, VALID_HOT_ACTIONS,
56-
WARM_PHASE, VALID_WARM_ACTIONS,
57-
COLD_PHASE, VALID_COLD_ACTIONS,
58-
DELETE_PHASE, VALID_DELETE_ACTIONS);
54+
private static final Map<String, Set<String>> ALLOWED_ACTIONS;
5955

6056
static final Set<String> HOT_ACTIONS_THAT_REQUIRE_ROLLOVER = Sets.newHashSet(ReadOnlyAction.NAME, ShrinkAction.NAME,
6157
ForceMergeAction.NAME, SearchableSnapshotAction.NAME);
6258
// a set of actions that cannot be defined (executed) after the managed index has been mounted as searchable snapshot
6359
static final Set<String> ACTIONS_CANNOT_FOLLOW_SEARCHABLE_SNAPSHOT = Sets.newHashSet(ShrinkAction.NAME, ForceMergeAction.NAME,
6460
FreezeAction.NAME, SearchableSnapshotAction.NAME);
6561

62+
static {
63+
if (RollupV2.isEnabled()) {
64+
ORDERED_VALID_COLD_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, AllocateAction.NAME,
65+
MigrateAction.NAME, FreezeAction.NAME, SearchableSnapshotAction.NAME, RollupILMAction.NAME);
66+
} else {
67+
ORDERED_VALID_COLD_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, AllocateAction.NAME,
68+
MigrateAction.NAME, FreezeAction.NAME, SearchableSnapshotAction.NAME);
69+
}
70+
VALID_COLD_ACTIONS = Sets.newHashSet(ORDERED_VALID_COLD_ACTIONS);
71+
ALLOWED_ACTIONS = new HashMap<>();
72+
ALLOWED_ACTIONS.put(HOT_PHASE, VALID_HOT_ACTIONS);
73+
ALLOWED_ACTIONS.put(WARM_PHASE, VALID_WARM_ACTIONS);
74+
ALLOWED_ACTIONS.put(COLD_PHASE, VALID_COLD_ACTIONS);
75+
ALLOWED_ACTIONS.put(DELETE_PHASE, VALID_DELETE_ACTIONS);
76+
}
77+
6678
private TimeseriesLifecycleType() {
6779
}
6880

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/LifecyclePolicyMetadataTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ protected NamedWriteableRegistry getNamedWriteableRegistry() {
4949
new NamedWriteableRegistry.Entry(LifecycleAction.class, FreezeAction.NAME, FreezeAction::new),
5050
new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new),
5151
new NamedWriteableRegistry.Entry(LifecycleAction.class, MigrateAction.NAME, MigrateAction::new),
52-
new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new)
52+
new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new),
53+
new NamedWriteableRegistry.Entry(LifecycleAction.class, RollupILMAction.NAME, RollupILMAction::new)
5354
));
5455
}
5556

@@ -72,7 +73,8 @@ protected NamedXContentRegistry xContentRegistry() {
7273
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(FreezeAction.NAME), FreezeAction::parse),
7374
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse),
7475
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(MigrateAction.NAME), MigrateAction::parse),
75-
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse)
76+
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse),
77+
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RollupILMAction.NAME), RollupILMAction::parse)
7678
));
7779
return new NamedXContentRegistry(entries);
7880
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/LifecyclePolicyTests.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ protected NamedWriteableRegistry getNamedWriteableRegistry() {
5959
new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new),
6060
new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new),
6161
new NamedWriteableRegistry.Entry(LifecycleAction.class, MigrateAction.NAME, MigrateAction::new),
62-
new NamedWriteableRegistry.Entry(LifecycleAction.class, SearchableSnapshotAction.NAME, SearchableSnapshotAction::new)
62+
new NamedWriteableRegistry.Entry(LifecycleAction.class, SearchableSnapshotAction.NAME, SearchableSnapshotAction::new),
63+
new NamedWriteableRegistry.Entry(LifecycleAction.class, RollupILMAction.NAME, RollupILMAction::new)
6364
));
6465
}
6566

@@ -82,7 +83,8 @@ protected NamedXContentRegistry xContentRegistry() {
8283
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse),
8384
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(MigrateAction.NAME), MigrateAction::parse),
8485
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SearchableSnapshotAction.NAME),
85-
SearchableSnapshotAction::parse)
86+
SearchableSnapshotAction::parse),
87+
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RollupILMAction.NAME), RollupILMAction::parse)
8688
));
8789
return new NamedXContentRegistry(entries);
8890
}
@@ -205,6 +207,8 @@ private static Function<String, LifecycleAction> getNameToActionFunction() {
205207
return new SearchableSnapshotAction(randomAlphaOfLengthBetween(1, 10));
206208
case MigrateAction.NAME:
207209
return new MigrateAction(false);
210+
case RollupILMAction.NAME:
211+
return RollupILMActionTests.randomInstance();
208212
default:
209213
throw new IllegalArgumentException("invalid action [" + action + "]");
210214
}};

0 commit comments

Comments
 (0)