Skip to content

Commit 2e05e3e

Browse files
authored
API for adding and removing indices from a data stream (#79279)
1 parent 88250c2 commit 2e05e3e

File tree

14 files changed

+694
-58
lines changed

14 files changed

+694
-58
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
{
2+
"indices.modify_data_stream":{
3+
"documentation":{
4+
"url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html",
5+
"description":"Modifies a data stream"
6+
},
7+
"stability":"stable",
8+
"visibility":"public",
9+
"headers":{
10+
"accept": [ "application/json"],
11+
"content_type": ["application/json"]
12+
},
13+
"url":{
14+
"paths":[
15+
{
16+
"path":"/_data_stream/_modify",
17+
"methods":["POST"]
18+
}
19+
]
20+
},
21+
"params":{
22+
},
23+
"body":{
24+
"description":"The data stream modifications",
25+
"required":true
26+
}
27+
}
28+
}

server/src/main/java/org/elasticsearch/action/ActionModule.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,8 @@
189189
import org.elasticsearch.action.bulk.BulkAction;
190190
import org.elasticsearch.action.bulk.TransportBulkAction;
191191
import org.elasticsearch.action.bulk.TransportShardBulkAction;
192+
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
193+
import org.elasticsearch.action.datastreams.ModifyDataStreamsTransportAction;
192194
import org.elasticsearch.action.delete.DeleteAction;
193195
import org.elasticsearch.action.delete.TransportDeleteAction;
194196
import org.elasticsearch.action.explain.ExplainAction;
@@ -370,6 +372,7 @@
370372
import org.elasticsearch.rest.action.cat.RestTasksAction;
371373
import org.elasticsearch.rest.action.cat.RestTemplatesAction;
372374
import org.elasticsearch.rest.action.cat.RestThreadPoolAction;
375+
import org.elasticsearch.rest.action.datastreams.RestModifyDataStreamsAction;
373376
import org.elasticsearch.rest.action.document.RestBulkAction;
374377
import org.elasticsearch.rest.action.document.RestDeleteAction;
375378
import org.elasticsearch.rest.action.document.RestGetAction;
@@ -599,6 +602,9 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
599602
actions.register(AnalyzeIndexDiskUsageAction.INSTANCE, TransportAnalyzeIndexDiskUsageAction.class);
600603
actions.register(FieldUsageStatsAction.INSTANCE, TransportFieldUsageAction.class);
601604

605+
//Data streams
606+
actions.register(ModifyDataStreamsAction.INSTANCE, ModifyDataStreamsTransportAction.class);
607+
602608
//Indexed scripts
603609
actions.register(PutStoredScriptAction.INSTANCE, TransportPutStoredScriptAction.class);
604610
actions.register(GetStoredScriptAction.INSTANCE, TransportGetStoredScriptAction.class);
@@ -763,6 +769,9 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
763769

764770
registerHandler.accept(new RestReloadSecureSettingsAction());
765771

772+
// Data streams
773+
registerHandler.accept(new RestModifyDataStreamsAction());
774+
766775
// Scripts API
767776
registerHandler.accept(new RestGetStoredScriptAction());
768777
registerHandler.accept(new RestPutStoredScriptAction());
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.action.datastreams;
10+
11+
import org.elasticsearch.action.ActionRequestValidationException;
12+
import org.elasticsearch.action.ActionType;
13+
import org.elasticsearch.action.IndicesRequest;
14+
import org.elasticsearch.action.support.IndicesOptions;
15+
import org.elasticsearch.action.support.master.AcknowledgedRequest;
16+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
17+
import org.elasticsearch.cluster.metadata.DataStreamAction;
18+
import org.elasticsearch.common.io.stream.StreamInput;
19+
import org.elasticsearch.common.io.stream.StreamOutput;
20+
import org.elasticsearch.xcontent.ConstructingObjectParser;
21+
import org.elasticsearch.xcontent.ParseField;
22+
import org.elasticsearch.xcontent.ToXContentObject;
23+
import org.elasticsearch.xcontent.XContentBuilder;
24+
25+
import java.io.IOException;
26+
import java.util.Arrays;
27+
import java.util.Collections;
28+
import java.util.List;
29+
import java.util.Objects;
30+
31+
import static org.elasticsearch.action.ValidateActions.addValidationError;
32+
33+
public class ModifyDataStreamsAction extends ActionType<AcknowledgedResponse> {
34+
35+
public static final ModifyDataStreamsAction INSTANCE = new ModifyDataStreamsAction();
36+
public static final String NAME = "indices:admin/data_stream/modify";
37+
38+
private ModifyDataStreamsAction() {
39+
super(NAME, AcknowledgedResponse::readFrom);
40+
}
41+
42+
public static final class Request
43+
extends AcknowledgedRequest<Request>
44+
implements IndicesRequest, ToXContentObject {
45+
46+
// relevant only for authorizing the request, so require every specified
47+
// index to exist, expand wildcards only to open indices, prohibit
48+
// wildcard expressions that resolve to zero indices, and do not attempt
49+
// to resolve expressions as aliases
50+
private static final IndicesOptions INDICES_OPTIONS =
51+
IndicesOptions.fromOptions(false, false, true, false, true, false, true, false);
52+
53+
private final List<DataStreamAction> actions;
54+
55+
public Request(StreamInput in) throws IOException {
56+
super(in);
57+
actions = in.readList(DataStreamAction::new);
58+
}
59+
60+
@Override
61+
public void writeTo(StreamOutput out) throws IOException {
62+
super.writeTo(out);
63+
out.writeList(actions);
64+
}
65+
66+
public Request(List<DataStreamAction> actions) {
67+
this.actions = Collections.unmodifiableList(actions);
68+
}
69+
70+
public List<DataStreamAction> getActions() {
71+
return actions;
72+
}
73+
74+
@Override
75+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
76+
builder.startObject();
77+
builder.startArray("actions");
78+
for (DataStreamAction action : actions) {
79+
action.toXContent(builder, params);
80+
}
81+
builder.endArray();
82+
builder.endObject();
83+
return builder;
84+
}
85+
86+
@Override
87+
public ActionRequestValidationException validate() {
88+
if (actions.isEmpty()) {
89+
return addValidationError("must specify at least one data stream modification action", null);
90+
}
91+
return null;
92+
}
93+
94+
@SuppressWarnings("unchecked")
95+
public static final ConstructingObjectParser<Request, Void> PARSER = new ConstructingObjectParser<>(
96+
"data_stream_actions",
97+
args -> new Request(((List<DataStreamAction>) args[0]))
98+
);
99+
static {
100+
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), DataStreamAction.PARSER, new ParseField("actions"));
101+
}
102+
103+
@Override
104+
public String[] indices() {
105+
return actions.stream().map(DataStreamAction::getDataStream).toArray(String[]::new);
106+
}
107+
108+
@Override
109+
public IndicesOptions indicesOptions() {
110+
return INDICES_OPTIONS;
111+
}
112+
113+
@Override
114+
public boolean includeDataStreams() {
115+
return true;
116+
}
117+
118+
@Override
119+
public boolean equals(Object obj) {
120+
if (obj == null || obj.getClass() != getClass()) {
121+
return false;
122+
}
123+
Request other = (Request) obj;
124+
return Arrays.equals(actions.toArray(), other.actions.toArray());
125+
}
126+
127+
@Override
128+
public int hashCode() {
129+
return Objects.hash(actions);
130+
}
131+
132+
}
133+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.action.datastreams;
10+
11+
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.action.support.ActionFilters;
13+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
14+
import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
15+
import org.elasticsearch.cluster.ClusterState;
16+
import org.elasticsearch.cluster.block.ClusterBlockException;
17+
import org.elasticsearch.cluster.block.ClusterBlockLevel;
18+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
19+
import org.elasticsearch.cluster.metadata.MetadataDataStreamsService;
20+
import org.elasticsearch.cluster.service.ClusterService;
21+
import org.elasticsearch.common.inject.Inject;
22+
import org.elasticsearch.tasks.Task;
23+
import org.elasticsearch.threadpool.ThreadPool;
24+
import org.elasticsearch.transport.TransportService;
25+
26+
public class ModifyDataStreamsTransportAction extends AcknowledgedTransportMasterNodeAction<
27+
ModifyDataStreamsAction.Request> {
28+
29+
private final MetadataDataStreamsService metadataDataStreamsService;
30+
31+
@Inject
32+
public ModifyDataStreamsTransportAction(
33+
TransportService transportService,
34+
ClusterService clusterService,
35+
ThreadPool threadPool,
36+
ActionFilters actionFilters,
37+
IndexNameExpressionResolver indexNameExpressionResolver,
38+
MetadataDataStreamsService metadataDataStreamsService
39+
) {
40+
super(
41+
ModifyDataStreamsAction.NAME,
42+
transportService,
43+
clusterService,
44+
threadPool,
45+
actionFilters,
46+
ModifyDataStreamsAction.Request::new,
47+
indexNameExpressionResolver,
48+
ThreadPool.Names.SAME
49+
);
50+
this.metadataDataStreamsService = metadataDataStreamsService;
51+
}
52+
53+
@Override
54+
protected void masterOperation(
55+
Task task,
56+
ModifyDataStreamsAction.Request request,
57+
ClusterState state,
58+
ActionListener<AcknowledgedResponse> listener
59+
) throws Exception {
60+
metadataDataStreamsService.modifyDataStream(request, listener);
61+
}
62+
63+
@Override
64+
protected ClusterBlockException checkBlock(ModifyDataStreamsAction.Request request, ClusterState state) {
65+
ClusterBlockException globalBlock = state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
66+
if (globalBlock != null) {
67+
return globalBlock;
68+
}
69+
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE,
70+
indexNameExpressionResolver.concreteIndexNames(state, request));
71+
}
72+
73+
}

server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ public DataStream removeBackingIndex(Index index) {
214214
index.getName(), name
215215
));
216216
}
217-
if (generation == (backingIndexPosition + 1)) {
217+
if (indices.size() == (backingIndexPosition + 1)) {
218218
throw new IllegalArgumentException(String.format(
219219
Locale.ROOT,
220220
"cannot remove backing index [%s] of data stream [%s] because it is the write index",

0 commit comments

Comments
 (0)