Skip to content

Commit b11d552

Browse files
authored
Initial data stream lifecycle support for downsampling (#98609)
This adds data stream lifecycle service implementation support for downsampling. Time series backing indices for a data stream with a lifecycle that configures downsampling will be marked as read-only, downsampled, removed from the data stream, replaced with the corresponding downsample index, and deleted. Multiple rounds can be configured for a data stream, and the latest matching round will be the first one to be executed. If one downsampling operation is in progress, we wait until it's finished and then we start the next downsampling operation. Note that in this scenario a data stream could have the following backing indices: ``` [.ds-metrics-2023.08.22-000002, downsample-10s-.ds-metrics-2023.08.22-000001] ``` If this data stream has multiple rounds of downsampling configured, the first generation index will subsequently be downsampled again (and again).
1 parent 4d29cb2 commit b11d552

File tree

16 files changed

+2350
-69
lines changed

16 files changed

+2350
-69
lines changed

modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ public class DataStreamsPlugin extends Plugin implements ActionPlugin {
104104
Setting.Property.IndexScope,
105105
Setting.Property.Dynamic
106106
);
107+
public static final String LIFECYCLE_CUSTOM_INDEX_METADATA_KEY = "data_stream_lifecycle";
108+
107109
// The dependency of index.look_ahead_time is a cluster setting and currently there is no clean validation approach for this:
108110
private final SetOnce<UpdateTimeSeriesRangeService> updateTimeSeriesRangeService = new SetOnce<>();
109111
private final SetOnce<DataStreamLifecycleErrorStore> errorStoreInitialisationService = new SetOnce<>();

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java

Lines changed: 526 additions & 66 deletions
Large diffs are not rendered by default.

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/PutDataStreamLifecycleAction.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,11 @@ public Request(String[] names, @Nullable TimeValue dataRetention) {
8888
this(names, dataRetention, null);
8989
}
9090

91+
public Request(String[] names, DataStreamLifecycle lifecycle) {
92+
this.names = names;
93+
this.lifecycle = lifecycle;
94+
}
95+
9196
public Request(String[] names, @Nullable TimeValue dataRetention, @Nullable Boolean enabled) {
9297
this.names = names;
9398
this.lifecycle = DataStreamLifecycle.newBuilder().dataRetention(dataRetention).enabled(enabled == null || enabled).build();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.datastreams.lifecycle.downsampling;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.elasticsearch.action.ActionListener;
14+
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
15+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
16+
import org.elasticsearch.client.internal.Client;
17+
import org.elasticsearch.cluster.ClusterState;
18+
import org.elasticsearch.cluster.SimpleBatchedExecutor;
19+
import org.elasticsearch.core.Strings;
20+
import org.elasticsearch.core.Tuple;
21+
import org.elasticsearch.index.IndexNotFoundException;
22+
import org.elasticsearch.snapshots.SnapshotInProgressException;
23+
24+
/**
25+
* Cluster service task (batched) executor that executes the replacement of data stream backing index with its
26+
* downsampled index.
27+
* After the task is executed the executor issues a delete API call for the source index however, it doesn't
28+
* hold up the task listener (nb we notify the listener before we call the delete API so we don't introduce
29+
* weird partial failure scenarios - if the delete API fails the
30+
* {@link org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService} will retry on the next run so the source index will get
31+
* deleted)
32+
*/
33+
public class ReplaceBackingWithDownsampleIndexExecutor extends SimpleBatchedExecutor<ReplaceSourceWithDownsampleIndexTask, Void> {
34+
private static final Logger LOGGER = LogManager.getLogger(ReplaceSourceWithDownsampleIndexTask.class);
35+
private final Client client;
36+
37+
public ReplaceBackingWithDownsampleIndexExecutor(Client client) {
38+
this.client = client;
39+
}
40+
41+
@Override
42+
public Tuple<ClusterState, Void> executeTask(ReplaceSourceWithDownsampleIndexTask task, ClusterState clusterState) throws Exception {
43+
return Tuple.tuple(task.execute(clusterState), null);
44+
}
45+
46+
@Override
47+
public void taskSucceeded(ReplaceSourceWithDownsampleIndexTask task, Void unused) {
48+
LOGGER.trace(
49+
"Updated cluster state and replaced index [{}] with index [{}] in data stream [{}]",
50+
task.getSourceBackingIndex(),
51+
task.getDownsampleIndex(),
52+
task.getDataStreamName()
53+
);
54+
task.getListener().onResponse(null);
55+
56+
// chain an optimistic delete of the source index call here (if it fails it'll be retried by the data stream lifecycle loop)
57+
client.admin().indices().delete(new DeleteIndexRequest(task.getSourceBackingIndex()), new ActionListener<>() {
58+
@Override
59+
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
60+
if (acknowledgedResponse.isAcknowledged()) {
61+
LOGGER.info(
62+
"Data stream lifecycle successfully deleted index [{}] due to being replaced by the downsampled index [{}] in"
63+
+ " data stream [{}]",
64+
task.getSourceBackingIndex(),
65+
task.getDownsampleIndex(),
66+
task.getDataStreamName()
67+
);
68+
} else {
69+
LOGGER.trace(
70+
"The delete request for index [{}] was not acknowledged. Data stream lifecycle service will retry on the"
71+
+ " next run if the index still exists",
72+
task.getSourceBackingIndex()
73+
);
74+
}
75+
}
76+
77+
@Override
78+
public void onFailure(Exception e) {
79+
if (e instanceof IndexNotFoundException) {
80+
// index was already deleted, treat this as a success
81+
return;
82+
}
83+
84+
if (e instanceof SnapshotInProgressException) {
85+
LOGGER.info(
86+
"Data stream lifecycle is unable to delete index [{}] because it's part of an ongoing snapshot. Retrying on "
87+
+ "the next data stream lifecycle run",
88+
task.getSourceBackingIndex()
89+
);
90+
} else {
91+
LOGGER.error(
92+
() -> Strings.format(
93+
"Data stream lifecycle encountered an error trying to delete index [%s]. It will retry on its next run.",
94+
task.getSourceBackingIndex()
95+
),
96+
e
97+
);
98+
}
99+
}
100+
});
101+
}
102+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.datastreams.lifecycle.downsampling;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.elasticsearch.action.ActionListener;
14+
import org.elasticsearch.cluster.ClusterState;
15+
import org.elasticsearch.cluster.ClusterStateTaskListener;
16+
import org.elasticsearch.cluster.metadata.DataStream;
17+
import org.elasticsearch.cluster.metadata.IndexAbstraction;
18+
import org.elasticsearch.cluster.metadata.IndexMetadata;
19+
import org.elasticsearch.cluster.metadata.Metadata;
20+
import org.elasticsearch.common.settings.Settings;
21+
import org.elasticsearch.core.TimeValue;
22+
import org.elasticsearch.index.IndexSettings;
23+
24+
import java.util.HashMap;
25+
import java.util.Locale;
26+
import java.util.Map;
27+
import java.util.Objects;
28+
29+
import static org.elasticsearch.datastreams.DataStreamsPlugin.LIFECYCLE_CUSTOM_INDEX_METADATA_KEY;
30+
31+
/**
32+
* Cluster state task that replaces a source index in a data stream with its downsample index.
33+
* In the process it will configure the origination date for the downsample index (so it can
34+
* have a correct generation time).
35+
*/
36+
public class ReplaceSourceWithDownsampleIndexTask implements ClusterStateTaskListener {
37+
private static final Logger LOGGER = LogManager.getLogger(ReplaceSourceWithDownsampleIndexTask.class);
38+
public static final String REPLACEMENT_SOURCE_INDEX = "replacement_source_index";
39+
private ActionListener<Void> listener;
40+
private final String dataStreamName;
41+
private final String sourceBackingIndex;
42+
private final String downsampleIndex;
43+
44+
public ReplaceSourceWithDownsampleIndexTask(
45+
String dataStreamName,
46+
String sourceBackingIndex,
47+
String downsampleIndex,
48+
ActionListener<Void> listener
49+
) {
50+
this.dataStreamName = dataStreamName;
51+
this.sourceBackingIndex = sourceBackingIndex;
52+
this.downsampleIndex = downsampleIndex;
53+
this.listener = listener;
54+
}
55+
56+
ClusterState execute(ClusterState state) {
57+
LOGGER.trace(
58+
"Updating cluster state to replace index [{}] with [{}] in data stream [{}]",
59+
sourceBackingIndex,
60+
downsampleIndex,
61+
dataStreamName
62+
);
63+
IndexAbstraction sourceIndexAbstraction = state.metadata().getIndicesLookup().get(sourceBackingIndex);
64+
IndexMetadata downsampleIndexMeta = state.metadata().index(downsampleIndex);
65+
if (downsampleIndexMeta == null) {
66+
// the downsample index doesn't exist anymore so nothing to replace here
67+
LOGGER.trace(
68+
"Received request replace index [{}] with [{}] in data stream [{}] but the replacement index [{}] doesn't exist."
69+
+ "Nothing to do here.",
70+
sourceBackingIndex,
71+
downsampleIndex,
72+
dataStreamName,
73+
downsampleIndex
74+
);
75+
return state;
76+
}
77+
IndexMetadata sourceIndexMeta = state.metadata().index(sourceBackingIndex);
78+
DataStream dataStream = state.metadata().dataStreams().get(dataStreamName);
79+
if (sourceIndexAbstraction == null) {
80+
// index was deleted in the meantime, so let's check if we can make sure the downsample index ends up in the
81+
// data stream (if not already there)
82+
if (dataStream != null
83+
&& dataStream.getIndices().stream().filter(index -> index.getName().equals(downsampleIndex)).findAny().isEmpty()) {
84+
// add downsample index to data stream
85+
LOGGER.trace(
86+
"unable find source index [{}] but adding index [{}] to data stream [{}]",
87+
sourceBackingIndex,
88+
downsampleIndex,
89+
dataStreamName
90+
);
91+
Metadata.Builder newMetaData = Metadata.builder(state.metadata())
92+
.put(dataStream.addBackingIndex(state.metadata(), downsampleIndexMeta.getIndex()));
93+
return ClusterState.builder(state).metadata(newMetaData).build();
94+
}
95+
} else {
96+
// the source index exists
97+
DataStream sourceParentDataStream = sourceIndexAbstraction.getParentDataStream();
98+
if (sourceParentDataStream != null) {
99+
assert sourceParentDataStream.getName().equals(dataStreamName)
100+
: "the backing index must be part of the provided data "
101+
+ "stream ["
102+
+ dataStreamName
103+
+ "] but it is instead part of data stream ["
104+
+ sourceParentDataStream.getName()
105+
+ "]";
106+
if (sourceParentDataStream.getWriteIndex().getName().equals(sourceBackingIndex)) {
107+
String errorMessage = String.format(
108+
Locale.ROOT,
109+
"index [%s] is the write index for data stream [%s] and cannot be replaced",
110+
sourceBackingIndex,
111+
sourceParentDataStream.getName()
112+
);
113+
throw new IllegalStateException(errorMessage);
114+
}
115+
if (sourceIndexMeta != null) {
116+
// both indices exist, let's copy the origination date from the source index to the downsample index
117+
Metadata.Builder newMetaData = Metadata.builder(state.getMetadata());
118+
TimeValue generationLifecycleDate = dataStream.getGenerationLifecycleDate(sourceIndexMeta);
119+
assert generationLifecycleDate != null : "write index must never be downsampled, or replaced";
120+
IndexMetadata updatedDownsampleMetadata = copyDataStreamLifecycleState(
121+
sourceIndexMeta,
122+
downsampleIndexMeta,
123+
generationLifecycleDate.millis()
124+
);
125+
126+
newMetaData.put(updatedDownsampleMetadata, true);
127+
// replace source with downsample
128+
newMetaData.put(dataStream.replaceBackingIndex(sourceIndexMeta.getIndex(), downsampleIndexMeta.getIndex()));
129+
return ClusterState.builder(state).metadata(newMetaData).build();
130+
}
131+
} else {
132+
// the source index is not part of a data stream, so let's check if we can make sure the downsample index ends up in the
133+
// data stream
134+
if (dataStream != null
135+
&& dataStream.getIndices().stream().filter(index -> index.getName().equals(downsampleIndex)).findAny().isEmpty()) {
136+
Metadata.Builder newMetaData = Metadata.builder(state.getMetadata());
137+
TimeValue generationLifecycleDate = dataStream.getGenerationLifecycleDate(sourceIndexMeta);
138+
assert generationLifecycleDate != null : "write index must never be downsampled, or replaced";
139+
140+
IndexMetadata updatedDownsampleMetadata = copyDataStreamLifecycleState(
141+
sourceIndexMeta,
142+
downsampleIndexMeta,
143+
generationLifecycleDate.millis()
144+
);
145+
newMetaData.put(updatedDownsampleMetadata, true);
146+
// add downsample index to data stream
147+
newMetaData.put(dataStream.addBackingIndex(state.metadata(), downsampleIndexMeta.getIndex()));
148+
return ClusterState.builder(state).metadata(newMetaData).build();
149+
}
150+
}
151+
}
152+
153+
return state;
154+
}
155+
156+
/**
157+
* Copies the data stream lifecycle state information from the source index to the destination.
158+
* This ensures the destination index will have a generation time by setting the {@link IndexSettings#LIFECYCLE_ORIGINATION_DATE} and
159+
* that the source index is confingured in the
160+
* {@link org.elasticsearch.datastreams.DataStreamsPlugin#LIFECYCLE_CUSTOM_INDEX_METADATA_KEY} custom.
161+
*/
162+
private static IndexMetadata copyDataStreamLifecycleState(
163+
IndexMetadata source,
164+
IndexMetadata dest,
165+
long sourceIndexGenerationTimeMillis
166+
) {
167+
IndexMetadata.Builder downsampleIndexBuilder = IndexMetadata.builder(dest);
168+
Map<String, String> lifecycleCustomMetadata = source.getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY);
169+
Map<String, String> newCustomMetadata = new HashMap<>();
170+
if (lifecycleCustomMetadata != null) {
171+
newCustomMetadata.putAll(lifecycleCustomMetadata);
172+
}
173+
newCustomMetadata.put(REPLACEMENT_SOURCE_INDEX, source.getIndex().getName());
174+
downsampleIndexBuilder.putCustom(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY, newCustomMetadata);
175+
176+
if (IndexSettings.LIFECYCLE_ORIGINATION_DATE_SETTING.exists(dest.getSettings()) == false) {
177+
downsampleIndexBuilder.settings(
178+
Settings.builder()
179+
.put(dest.getSettings())
180+
.put(IndexSettings.LIFECYCLE_ORIGINATION_DATE, sourceIndexGenerationTimeMillis)
181+
.build()
182+
).settingsVersion(dest.getSettingsVersion() + 1L);
183+
}
184+
return downsampleIndexBuilder.build();
185+
}
186+
187+
@Override
188+
public void onFailure(Exception e) {
189+
if (listener != null) {
190+
listener.onFailure(e);
191+
}
192+
}
193+
194+
public String getDataStreamName() {
195+
return dataStreamName;
196+
}
197+
198+
public String getSourceBackingIndex() {
199+
return sourceBackingIndex;
200+
}
201+
202+
public String getDownsampleIndex() {
203+
return downsampleIndex;
204+
}
205+
206+
public ActionListener<Void> getListener() {
207+
return listener;
208+
}
209+
210+
public void setListener(ActionListener<Void> listener) {
211+
this.listener = listener;
212+
}
213+
214+
@Override
215+
public boolean equals(Object o) {
216+
if (this == o) {
217+
return true;
218+
}
219+
if (o == null || getClass() != o.getClass()) {
220+
return false;
221+
}
222+
ReplaceSourceWithDownsampleIndexTask that = (ReplaceSourceWithDownsampleIndexTask) o;
223+
return Objects.equals(dataStreamName, that.dataStreamName)
224+
&& Objects.equals(sourceBackingIndex, that.sourceBackingIndex)
225+
&& Objects.equals(downsampleIndex, that.downsampleIndex);
226+
}
227+
228+
@Override
229+
public int hashCode() {
230+
return Objects.hash(dataStreamName, sourceBackingIndex, downsampleIndex);
231+
}
232+
}

modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleFixtures.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
*/
4747
public class DataStreamLifecycleFixtures {
4848

49-
static DataStream createDataStream(
49+
public static DataStream createDataStream(
5050
Metadata.Builder builder,
5151
String dataStreamName,
5252
int backingIndicesCount,

0 commit comments

Comments
 (0)