diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.modify_data_stream.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.modify_data_stream.json new file mode 100644 index 0000000000000..ea095289b72bc --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.modify_data_stream.json @@ -0,0 +1,28 @@ +{ + "indices.modify_data_stream":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html", + "description":"Modifies a data stream" + }, + "stability":"stable", + "visibility":"public", + "headers":{ + "accept": [ "application/json"], + "content_type": ["application/json"] + }, + "url":{ + "paths":[ + { + "path":"/_data_stream/_modify", + "methods":["POST"] + } + ] + }, + "params":{ + }, + "body":{ + "description":"The data stream modifications", + "required":true + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 197f1c567a416..5f5793d4afcc4 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -189,6 +189,8 @@ import org.elasticsearch.action.bulk.BulkAction; import org.elasticsearch.action.bulk.TransportBulkAction; import org.elasticsearch.action.bulk.TransportShardBulkAction; +import org.elasticsearch.action.datastreams.ModifyDataStreamsAction; +import org.elasticsearch.action.datastreams.ModifyDataStreamsTransportAction; import org.elasticsearch.action.delete.DeleteAction; import org.elasticsearch.action.delete.TransportDeleteAction; import org.elasticsearch.action.explain.ExplainAction; @@ -370,6 +372,7 @@ import org.elasticsearch.rest.action.cat.RestTasksAction; import org.elasticsearch.rest.action.cat.RestTemplatesAction; import org.elasticsearch.rest.action.cat.RestThreadPoolAction; +import org.elasticsearch.rest.action.datastreams.RestModifyDataStreamsAction; import org.elasticsearch.rest.action.document.RestBulkAction; import org.elasticsearch.rest.action.document.RestDeleteAction; import org.elasticsearch.rest.action.document.RestGetAction; @@ -599,6 +602,9 @@ public void reg actions.register(AnalyzeIndexDiskUsageAction.INSTANCE, TransportAnalyzeIndexDiskUsageAction.class); actions.register(FieldUsageStatsAction.INSTANCE, TransportFieldUsageAction.class); + //Data streams + actions.register(ModifyDataStreamsAction.INSTANCE, ModifyDataStreamsTransportAction.class); + //Indexed scripts actions.register(PutStoredScriptAction.INSTANCE, TransportPutStoredScriptAction.class); actions.register(GetStoredScriptAction.INSTANCE, TransportGetStoredScriptAction.class); @@ -763,6 +769,9 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestReloadSecureSettingsAction()); + // Data streams + registerHandler.accept(new RestModifyDataStreamsAction()); + // Scripts API registerHandler.accept(new RestGetStoredScriptAction()); registerHandler.accept(new RestPutStoredScriptAction()); diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/ModifyDataStreamsAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/ModifyDataStreamsAction.java new file mode 100644 index 0000000000000..15a7aaf73f3d2 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/datastreams/ModifyDataStreamsAction.java @@ -0,0 +1,133 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.datastreams; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.metadata.DataStreamAction; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +public class ModifyDataStreamsAction extends ActionType { + + public static final ModifyDataStreamsAction INSTANCE = new ModifyDataStreamsAction(); + public static final String NAME = "indices:admin/data_stream/modify"; + + private ModifyDataStreamsAction() { + super(NAME, AcknowledgedResponse::readFrom); + } + + public static final class Request + extends AcknowledgedRequest + implements IndicesRequest, ToXContentObject { + + // relevant only for authorizing the request, so require every specified + // index to exist, expand wildcards only to open indices, prohibit + // wildcard expressions that resolve to zero indices, and do not attempt + // to resolve expressions as aliases + private static final IndicesOptions INDICES_OPTIONS = + IndicesOptions.fromOptions(false, false, true, false, true, false, true, false); + + private final List actions; + + public Request(StreamInput in) throws IOException { + super(in); + actions = in.readList(DataStreamAction::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeList(actions); + } + + public Request(List actions) { + this.actions = Collections.unmodifiableList(actions); + } + + public List getActions() { + return actions; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.startArray("actions"); + for (DataStreamAction action : actions) { + action.toXContent(builder, params); + } + builder.endArray(); + builder.endObject(); + return builder; + } + + @Override + public ActionRequestValidationException validate() { + if (actions.isEmpty()) { + return addValidationError("must specify at least one data stream modification action", null); + } + return null; + } + + @SuppressWarnings("unchecked") + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "data_stream_actions", + args -> new Request(((List) args[0])) + ); + static { + PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), DataStreamAction.PARSER, new ParseField("actions")); + } + + @Override + public String[] indices() { + return actions.stream().map(DataStreamAction::getDataStream).toArray(String[]::new); + } + + @Override + public IndicesOptions indicesOptions() { + return INDICES_OPTIONS; + } + + @Override + public boolean includeDataStreams() { + return true; + } + + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != getClass()) { + return false; + } + Request other = (Request) obj; + return Arrays.equals(actions.toArray(), other.actions.toArray()); + } + + @Override + public int hashCode() { + return Objects.hash(actions); + } + + } +} diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/ModifyDataStreamsTransportAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/ModifyDataStreamsTransportAction.java new file mode 100644 index 0000000000000..cb174e0f71a0d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/datastreams/ModifyDataStreamsTransportAction.java @@ -0,0 +1,73 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.datastreams; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetadataDataStreamsService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +public class ModifyDataStreamsTransportAction extends AcknowledgedTransportMasterNodeAction< + ModifyDataStreamsAction.Request> { + + private final MetadataDataStreamsService metadataDataStreamsService; + + @Inject + public ModifyDataStreamsTransportAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + MetadataDataStreamsService metadataDataStreamsService + ) { + super( + ModifyDataStreamsAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + ModifyDataStreamsAction.Request::new, + indexNameExpressionResolver, + ThreadPool.Names.SAME + ); + this.metadataDataStreamsService = metadataDataStreamsService; + } + + @Override + protected void masterOperation( + Task task, + ModifyDataStreamsAction.Request request, + ClusterState state, + ActionListener listener + ) throws Exception { + metadataDataStreamsService.modifyDataStream(request, listener); + } + + @Override + protected ClusterBlockException checkBlock(ModifyDataStreamsAction.Request request, ClusterState state) { + ClusterBlockException globalBlock = state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + if (globalBlock != null) { + return globalBlock; + } + return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, + indexNameExpressionResolver.concreteIndexNames(state, request)); + } + +} diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 0fa3ebe26187e..714d7850dbfa8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -214,7 +214,7 @@ public DataStream removeBackingIndex(Index index) { index.getName(), name )); } - if (generation == (backingIndexPosition + 1)) { + if (indices.size() == (backingIndexPosition + 1)) { throw new IllegalArgumentException(String.format( Locale.ROOT, "cannot remove backing index [%s] of data stream [%s] because it is the write index", diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamAction.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamAction.java index 63e9d0e10ed0b..48b9cd9100360 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamAction.java @@ -9,73 +9,190 @@ package org.elasticsearch.cluster.metadata; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Objects; +import java.util.function.Supplier; + +import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg; /** - * Operations on data streams + * Operations on data streams. Currently supports adding and removing backing indices. */ -public abstract class DataStreamAction { - private final String dataStream; +public class DataStreamAction implements Writeable, ToXContentObject { + + private static final ParseField DATA_STREAM = new ParseField("data_stream"); + private static final ParseField INDEX = new ParseField("index"); + + private static final ParseField ADD_BACKING_INDEX = new ParseField("add_backing_index"); + private static final ParseField REMOVE_BACKING_INDEX = new ParseField("remove_backing_index"); + + public enum Type { + ADD_BACKING_INDEX((byte) 0, DataStreamAction.ADD_BACKING_INDEX), + REMOVE_BACKING_INDEX((byte) 1, DataStreamAction.REMOVE_BACKING_INDEX); + + private final byte value; + private final String fieldName; + + Type(byte value, ParseField field) { + this.value = value; + this.fieldName = field.getPreferredName(); + } + + public byte value() { + return value; + } + + public static Type fromValue(byte value) { + switch (value) { + case 0: return ADD_BACKING_INDEX; + case 1: return REMOVE_BACKING_INDEX; + default: throw new IllegalArgumentException("no data stream action type for [" + value + "]"); + } + } + } + + private final Type type; + private String dataStream; + private String index; public static DataStreamAction addBackingIndex(String dataStream, String index) { - return new DataStreamAction.AddBackingIndex(dataStream, index); + return new DataStreamAction(Type.ADD_BACKING_INDEX, dataStream, index); } public static DataStreamAction removeBackingIndex(String dataStream, String index) { - return new DataStreamAction.RemoveBackingIndex(dataStream, index); + return new DataStreamAction(Type.REMOVE_BACKING_INDEX, dataStream, index); } - private DataStreamAction(String dataStream) { + public DataStreamAction(StreamInput in) throws IOException { + this.type = Type.fromValue(in.readByte()); + this.dataStream = in.readString(); + this.index = in.readString(); + } + + private DataStreamAction(Type type, String dataStream, String index) { if (false == Strings.hasText(dataStream)) { throw new IllegalArgumentException("[data_stream] is required"); } + if (false == Strings.hasText(index)) { + throw new IllegalArgumentException("[index] is required"); + } + this.type = Objects.requireNonNull(type, "[type] must not be null"); this.dataStream = dataStream; + this.index = index; + } + + DataStreamAction(Type type) { + this.type = type; } - /** - * Data stream on which the operation should act - */ public String getDataStream() { return dataStream; } - public static class AddBackingIndex extends DataStreamAction { - - private final String index; + public void setDataStream(String datastream) { + this.dataStream = datastream; + } - private AddBackingIndex(String dataStream, String index) { - super(dataStream); + public String getIndex() { + return index; + } - if (false == Strings.hasText(index)) { - throw new IllegalArgumentException("[index] is required"); - } + public void setIndex(String index) { + this.index = index; + } - this.index = index; - } + public Type getType() { + return type; + } - public String getIndex() { - return index; - } + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.startObject(type.fieldName); + builder.field(DATA_STREAM.getPreferredName(), dataStream); + builder.field(INDEX.getPreferredName(), index); + builder.endObject(); + builder.endObject(); + return builder; + } + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeByte(type.value()); + out.writeString(dataStream); + out.writeString(index); } - public static class RemoveBackingIndex extends DataStreamAction { + public static DataStreamAction fromXContent(XContentParser parser) throws IOException { + return PARSER.apply(parser, null); + } - private final String index; + private static final ObjectParser ADD_BACKING_INDEX_PARSER = parser( + ADD_BACKING_INDEX.getPreferredName(), + () -> new DataStreamAction(Type.ADD_BACKING_INDEX) + ); + private static final ObjectParser REMOVE_BACKING_INDEX_PARSER = parser( + REMOVE_BACKING_INDEX.getPreferredName(), + () -> new DataStreamAction(Type.REMOVE_BACKING_INDEX) + ); + static { + ADD_BACKING_INDEX_PARSER.declareField(DataStreamAction::setDataStream, XContentParser::text, DATA_STREAM, + ObjectParser.ValueType.STRING); + ADD_BACKING_INDEX_PARSER.declareField(DataStreamAction::setIndex, XContentParser::text, INDEX, ObjectParser.ValueType.STRING); + REMOVE_BACKING_INDEX_PARSER.declareField(DataStreamAction::setDataStream, XContentParser::text, DATA_STREAM, + ObjectParser.ValueType.STRING); + REMOVE_BACKING_INDEX_PARSER.declareField(DataStreamAction::setIndex, XContentParser::text, INDEX, ObjectParser.ValueType.STRING); + } - private RemoveBackingIndex(String dataStream, String index) { - super(dataStream); + private static ObjectParser parser(String name, Supplier supplier) { + ObjectParser parser = new ObjectParser<>(name, supplier); + return parser; + } - if (false == Strings.hasText(index)) { - throw new IllegalArgumentException("[index] is required"); + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "data_stream_action", a -> { + // Take the first action and error if there is more than one action + DataStreamAction action = null; + for (Object o : a) { + if (o != null) { + if (action == null) { + action = (DataStreamAction) o; + } else { + throw new IllegalArgumentException("too many data stream operations declared on operation entry"); + } } - - this.index = index; } + return action; + }); + static { + PARSER.declareObject(optionalConstructorArg(), ADD_BACKING_INDEX_PARSER, ADD_BACKING_INDEX); + PARSER.declareObject(optionalConstructorArg(), REMOVE_BACKING_INDEX_PARSER, REMOVE_BACKING_INDEX); + } - public String getIndex() { - return index; + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != getClass()) { + return false; } + DataStreamAction other = (DataStreamAction) obj; + return Objects.equals(type, other.type) + && Objects.equals(dataStream, other.dataStream) + && Objects.equals(index, other.index); + } + @Override + public int hashCode() { + return Objects.hash(type, dataStream, index); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java index 5dff916955b04..9341589705209 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java @@ -9,10 +9,10 @@ package org.elasticsearch.cluster.metadata; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.datastreams.ModifyDataStreamsAction; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; @@ -20,8 +20,6 @@ import org.elasticsearch.indices.IndicesService; import java.io.IOException; -import java.util.Collections; -import java.util.List; import java.util.function.Function; /** @@ -37,9 +35,9 @@ public MetadataDataStreamsService(ClusterService clusterService, IndicesService this.indicesService = indicesService; } - public void updateBackingIndices(final ModifyDataStreamRequest request, - final ActionListener listener) { - if (request.actions().size() == 0) { + public void modifyDataStream(final ModifyDataStreamsAction.Request request, + final ActionListener listener) { + if (request.getActions().size() == 0) { listener.onResponse(AcknowledgedResponse.TRUE); } else { clusterService.submitStateUpdateTask("update-backing-indices", @@ -48,7 +46,7 @@ public void updateBackingIndices(final ModifyDataStreamRequest request, public ClusterState execute(ClusterState currentState) { return modifyDataStream( currentState, - request.actions(), + request.getActions(), indexMetadata -> { try { return indicesService.createIndexMapperService(indexMetadata); @@ -66,7 +64,7 @@ public ClusterState execute(ClusterState currentState) { * Computes the resulting cluster state after applying all requested data stream modifications in order. * * @param currentState current cluster state - * @param actions ordered list of modifications to perform + * @param actions ordered list of modifications to perform * @return resulting cluster state after all modifications have been performed */ static ClusterState modifyDataStream( @@ -78,20 +76,20 @@ static ClusterState modifyDataStream( for (var action : actions) { Metadata.Builder builder = Metadata.builder(updatedMetadata); - if (action instanceof DataStreamAction.AddBackingIndex) { + if (action.getType() == DataStreamAction.Type.ADD_BACKING_INDEX) { addBackingIndex( updatedMetadata, builder, mapperSupplier, action.getDataStream(), - ((DataStreamAction.AddBackingIndex) action).getIndex() + action.getIndex() ); - } else if (action instanceof DataStreamAction.RemoveBackingIndex) { + } else if (action.getType() == DataStreamAction.Type.REMOVE_BACKING_INDEX) { removeBackingIndex( updatedMetadata, builder, action.getDataStream(), - ((DataStreamAction.RemoveBackingIndex) action).getIndex() + action.getIndex() ); } else { throw new IllegalStateException("unsupported data stream action type [" + action.getClass().getName() + "]"); @@ -155,17 +153,4 @@ private static IndexAbstraction validateIndex(Metadata metadata, String indexNam return index; } - public static final class ModifyDataStreamRequest extends ClusterStateUpdateRequest { - - private final List actions; - - public ModifyDataStreamRequest(List actions) { - this.actions = Collections.unmodifiableList(actions); - } - - public List actions() { - return actions; - } - } - } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 45211fdb917bb..d3eeb8e970df1 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -45,6 +45,7 @@ import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService; import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; +import org.elasticsearch.cluster.metadata.MetadataDataStreamsService; import org.elasticsearch.cluster.metadata.SystemIndexMetadataUpgradeService; import org.elasticsearch.cluster.metadata.TemplateUpgradeService; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -563,6 +564,7 @@ protected Node(final Environment initialEnvironment, final MetadataCreateDataStreamService metadataCreateDataStreamService = new MetadataCreateDataStreamService(threadPool, clusterService, metadataCreateIndexService); + final MetadataDataStreamsService metadataDataStreamsService = new MetadataDataStreamsService(clusterService, indicesService); Collection pluginComponents = pluginsService.filterPlugins(Plugin.class).stream() .flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService, @@ -694,6 +696,7 @@ protected Node(final Environment initialEnvironment, b.bind(AliasValidator.class).toInstance(aliasValidator); b.bind(MetadataCreateIndexService.class).toInstance(metadataCreateIndexService); b.bind(MetadataCreateDataStreamService.class).toInstance(metadataCreateDataStreamService); + b.bind(MetadataDataStreamsService.class).toInstance(metadataDataStreamsService); b.bind(SearchService.class).toInstance(searchService); b.bind(SearchTransportService.class).toInstance(searchTransportService); b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(searchService::aggReduceContextBuilder)); diff --git a/server/src/main/java/org/elasticsearch/rest/action/datastreams/RestModifyDataStreamsAction.java b/server/src/main/java/org/elasticsearch/rest/action/datastreams/RestModifyDataStreamsAction.java new file mode 100644 index 0000000000000..3a8862f902fc7 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/rest/action/datastreams/RestModifyDataStreamsAction.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.rest.action.datastreams; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.action.datastreams.ModifyDataStreamsAction; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.POST; + +public class RestModifyDataStreamsAction extends BaseRestHandler { + + @Override + public String getName() { + return "modify_data_stream_action"; + } + + @Override + public List routes() { + return List.of(new Route(POST, "/_data_stream/_modify")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + ModifyDataStreamsAction.Request modifyDsRequest; + try (XContentParser parser = request.contentParser()) { + modifyDsRequest = ModifyDataStreamsAction.Request.PARSER.parse(parser, null); + } + if (modifyDsRequest.getActions() == null || modifyDsRequest.getActions().isEmpty()) { + throw new IllegalArgumentException("no data stream actions specified, at least one must be specified"); + } + modifyDsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", modifyDsRequest.masterNodeTimeout())); + modifyDsRequest.timeout(request.paramAsTime("timeout", modifyDsRequest.timeout())); + return channel -> client.execute(ModifyDataStreamsAction.INSTANCE, modifyDsRequest, new RestToXContentListener<>(channel)); + } + +} diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/ModifyDataStreamsRequestTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/ModifyDataStreamsRequestTests.java new file mode 100644 index 0000000000000..c0fce3246dc09 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/datastreams/ModifyDataStreamsRequestTests.java @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.datastreams; + +import org.elasticsearch.action.datastreams.ModifyDataStreamsAction.Request; +import org.elasticsearch.cluster.metadata.DataStreamAction; +import org.elasticsearch.cluster.metadata.DataStreamActionTests; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class ModifyDataStreamsRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return Request::new; + } + + @Override + protected Request createTestInstance() { + final int numActions = randomIntBetween(1, 10); + List actions = new ArrayList<>(); + for (int k = 1; k <= numActions; k++) { + actions.add(DataStreamActionTests.createTestInstance()); + } + return new Request(actions); + } + + @Override + protected Request mutateInstance(Request request) throws IOException { + final int moreActions = randomIntBetween(1, 5); + List actions = new ArrayList<>(request.getActions()); + for (int k = 1; k <= moreActions; k++) { + actions.add(DataStreamActionTests.createTestInstance()); + } + return new Request(actions); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamActionTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamActionTests.java new file mode 100644 index 0000000000000..d40546579946d --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamActionTests.java @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; + +import static org.hamcrest.CoreMatchers.equalTo; + +public class DataStreamActionTests extends ESTestCase { + + public void testToAndFromXContent() throws IOException { + DataStreamAction action = createTestInstance(); + XContentType xContentType = randomFrom(XContentType.values()); + + BytesReference shuffled = toShuffledXContent(action, xContentType, ToXContent.EMPTY_PARAMS, true); + + DataStreamAction parsedAction; + try (XContentParser parser = createParser(xContentType.xContent(), shuffled)) { + parsedAction = DataStreamAction.fromXContent(parser); + assertNull(parser.nextToken()); + } + + assertThat(parsedAction.getType(), equalTo(action.getType())); + assertThat(parsedAction.getDataStream(), equalTo(action.getDataStream())); + assertThat(parsedAction.getIndex(), equalTo(action.getIndex())); + } + + public static DataStreamAction createTestInstance() { + DataStreamAction action = new DataStreamAction(randomBoolean() + ? DataStreamAction.Type.ADD_BACKING_INDEX + : DataStreamAction.Type.REMOVE_BACKING_INDEX + ); + action.setDataStream(randomAlphaOfLength(8)); + action.setIndex(randomAlphaOfLength(8)); + return action; + } + +} diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java index 92206ce8e5ea5..0c861e1744525 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsServiceTests.java @@ -22,11 +22,13 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Locale; import java.util.stream.Collectors; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createTimestampField; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.generateMapping; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; @@ -143,6 +145,55 @@ public void testRemoveBackingIndex() { assertNull(newState.metadata().getIndicesLookup().get(indexToRemove.getIndex().getName()).getParentDataStream()); } + public void testRemoveWriteIndexIsProhibited() { + final long epochMillis = System.currentTimeMillis(); + final int numBackingIndices = randomIntBetween(1, 4); + final String dataStreamName = randomAlphaOfLength(5); + IndexMetadata[] backingIndices = new IndexMetadata[numBackingIndices]; + Metadata.Builder mb = Metadata.builder(); + for (int k = 0; k < numBackingIndices; k++) { + backingIndices[k] = + IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, k + 1, epochMillis)) + .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .putMapping(generateMapping("@timestamp")) + .build(); + mb.put(backingIndices[k], false); + } + + mb.put(new DataStream( + dataStreamName, + createTimestampField("@timestamp"), + Arrays.stream(backingIndices).map(IndexMetadata::getIndex).collect(Collectors.toList()) + ) + ); + + final IndexMetadata indexToRemove = backingIndices[numBackingIndices - 1]; + ClusterState originalState = ClusterState.builder(new ClusterName("dummy")).metadata(mb.build()).build(); + + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> MetadataDataStreamsService.modifyDataStream( + originalState, + List.of(DataStreamAction.removeBackingIndex(dataStreamName, indexToRemove.getIndex().getName())), + this::getMapperService + ) + ); + + assertThat( + e.getMessage(), + containsString( + String.format( + Locale.ROOT, + "cannot remove backing index [%s] of data stream [%s] because it is the write index", + indexToRemove.getIndex().getName(), + dataStreamName + ) + ) + ); + } + public void testAddRemoveAddRoundtripInSingleRequest() { final long epochMillis = System.currentTimeMillis(); final int numBackingIndices = randomIntBetween(1, 4); diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index c82494caeaf76..bd9ebb2dfe575 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -354,6 +354,7 @@ public class Constants { "indices:admin/data_stream/delete", "indices:admin/data_stream/get", "indices:admin/data_stream/migrate", + "indices:admin/data_stream/modify", "indices:admin/data_stream/promote", "indices:admin/delete", "indices:admin/flush", diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml new file mode 100644 index 0000000000000..034c3701a2ece --- /dev/null +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/170_modify_data_stream.yml @@ -0,0 +1,90 @@ +--- +"Modify a data stream": + - skip: + version: " - 7.99.99" + reason: "change to 7.15.99 after backporting" + features: allowed_warnings + + - do: + allowed_warnings: + - "index template [my-template] has index patterns [data-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template] will take precedence during new index creation" + indices.put_index_template: + name: my-template + body: + index_patterns: [data-*] + data_stream: {} + + - do: + indices.create_data_stream: + name: data-stream-for-modification + - is_true: acknowledged + + # rollover data stream to create new backing index + - do: + indices.rollover: + alias: "data-stream-for-modification" + - is_true: acknowledged + + # save index names for later use + - do: + indices.get_data_stream: + name: data-stream-for-modification + - set: { data_streams.0.indices.0.index_name: first_index } + - set: { data_streams.0.indices.1.index_name: write_index } + + - do: + index: + index: test_index1 + body: { "foo": "bar1", "@timestamp": "2009-11-15T14:12:12" } + + - do: + indices.modify_data_stream: + body: + actions: + - add_backing_index: + data_stream: "data-stream-for-modification" + index: "test_index1" + - is_true: acknowledged + + - do: + indices.get_data_stream: + name: "data-stream-for-modification" + - match: { data_streams.0.name: data-stream-for-modification } + - match: { data_streams.0.timestamp_field.name: '@timestamp' } + - match: { data_streams.0.generation: 3 } + - length: { data_streams.0.indices: 3 } + - match: { data_streams.0.indices.0.index_name: 'test_index1' } + - match: { data_streams.0.indices.1.index_name: $first_index } + - match: { data_streams.0.indices.2.index_name: $write_index } + + - do: + catch: /cannot remove backing index \[.*\] of data stream \[data-stream-for-modification\] because it is the write index/ + indices.modify_data_stream: + body: + actions: + - remove_backing_index: + data_stream: "data-stream-for-modification" + index: $write_index + + - do: + indices.modify_data_stream: + body: + actions: + - remove_backing_index: + data_stream: "data-stream-for-modification" + index: "test_index1" + + - do: + indices.get_data_stream: + name: "data-stream-for-modification" + - match: { data_streams.0.name: data-stream-for-modification } + - match: { data_streams.0.timestamp_field.name: '@timestamp' } + - match: { data_streams.0.generation: 3 } + - length: { data_streams.0.indices: 2 } + - match: { data_streams.0.indices.0.index_name: $first_index } + - match: { data_streams.0.indices.1.index_name: $write_index } + + - do: + indices.delete_data_stream: + name: data-stream-for-modification + - is_true: acknowledged