From 6aa44d2987bd60cbe3bad5996bf975600bd9dcc8 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Fri, 20 Mar 2020 10:00:47 -0500 Subject: [PATCH 1/7] Cluster state and CRUD ops for data streams --- .../test/indices.data_stream/10_basic.yml | 7 +-- .../elasticsearch/ElasticsearchException.java | 7 ++- .../datastream/CreateDataStreamAction.java | 53 ++++++++++++++++- .../datastream/DeleteDataStreamAction.java | 54 +++++++++++++++++- .../datastream/GetDataStreamsAction.java | 31 ++++++++-- .../elasticsearch/cluster/ClusterModule.java | 4 ++ .../cluster/metadata/MetaData.java | 32 +++++++++++ .../indices/DataStreamMissingException.java | 57 +++++++++++++++++++ .../ExceptionSerializationTests.java | 2 + .../cluster/metadata/DataStreamTests.java | 17 ++++-- 10 files changed, 244 insertions(+), 20 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/indices/DataStreamMissingException.java diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml index 49754005b6db5..7c2df2ddde9d9 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml @@ -13,12 +13,9 @@ - do: indices.get_data_streams: {} - - match: { 0.name: my_data_stream1 } + - match: { 0.name: data-stream2 } - match: { 0.timestamp_field: '@timestamp' } - - match: { 0.indices: ['my_data_stream1-000000'] } - - match: { 1.name: my_data_stream2 } - - match: { 1.timestamp_field: '@timestamp' } - - match: { 1.indices: [] } + - match: { 0.indices: [] } - do: indices.delete_data_stream: diff --git a/server/src/main/java/org/elasticsearch/ElasticsearchException.java b/server/src/main/java/org/elasticsearch/ElasticsearchException.java index efbf8d25c9a2c..95bb6adb11097 100644 --- a/server/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/server/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -1040,7 +1040,12 @@ private enum ElasticsearchExceptionHandle { org.elasticsearch.ingest.IngestProcessorException.class, org.elasticsearch.ingest.IngestProcessorException::new, 157, - Version.V_7_5_0); + Version.V_7_5_0), + DATA_STREAM_MISSING_EXCEPTION( + org.elasticsearch.indices.DataStreamMissingException.class, + org.elasticsearch.indices.DataStreamMissingException::new, + 158, + Version.V_7_7_0); final Class exceptionClass; final CheckedFunction constructor; diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java index df6e829a28af4..197eb2f897075 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java @@ -18,30 +18,42 @@ */ package org.elasticsearch.action.admin.indices.datastream; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.ValidateActions; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.Collections; import java.util.Objects; public class CreateDataStreamAction extends ActionType { + private static final Logger logger = LogManager.getLogger(CreateDataStreamAction.class); + public static final CreateDataStreamAction INSTANCE = new CreateDataStreamAction(); public static final String NAME = "indices:admin/data_stream/create"; @@ -64,7 +76,14 @@ public void setTimestampFieldName(String timestampFieldName) { @Override public ActionRequestValidationException validate() { - return null; + ActionRequestValidationException validationException = null; + if (Strings.hasText(name) == false) { + validationException = ValidateActions.addValidationError("name is missing", validationException); + } + if (Strings.hasText(timestampFieldName) == false) { + validationException = ValidateActions.addValidationError("timestamp field name is missing", validationException); + } + return validationException; } public Request(StreamInput in) throws IOException { @@ -116,7 +135,37 @@ protected AcknowledgedResponse read(StreamInput in) throws IOException { @Override protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) throws Exception { - listener.onResponse(new AcknowledgedResponse(true)); + clusterService.submitStateUpdateTask("create-data-stream [" + request.name + "]", + new ClusterStateUpdateTask(Priority.HIGH) { + + @Override + public TimeValue timeout() { + return request.masterNodeTimeout(); + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + if (currentState.metaData().dataStreams().containsKey(request.name)) { + throw new IllegalArgumentException("data_stream [" + request.name + "] already exists"); + } + + MetaData.Builder builder = MetaData.builder(currentState.metaData()).put(request.name, + new DataStream(request.name, request.timestampFieldName, Collections.emptyList())); + + logger.info("adding data stream [{}]", request.name); + return ClusterState.builder(currentState).metaData(builder).build(); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(new AcknowledgedResponse(true)); + } + }); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java index 20a2ba4aa2cd6..6f6fedaa10535 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java @@ -18,6 +18,8 @@ */ package org.elasticsearch.action.admin.indices.datastream; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; @@ -26,22 +28,32 @@ import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.indices.DataStreamMissingException; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.HashSet; import java.util.Objects; +import java.util.Set; public class DeleteDataStreamAction extends ActionType { + private static final Logger logger = LogManager.getLogger(DeleteDataStreamAction.class); + public static final DeleteDataStreamAction INSTANCE = new DeleteDataStreamAction(); public static final String NAME = "indices:admin/data_stream/delete"; @@ -108,7 +120,47 @@ protected AcknowledgedResponse read(StreamInput in) throws IOException { @Override protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) throws Exception { - listener.onResponse(new AcknowledgedResponse(true)); + clusterService.submitStateUpdateTask("remove-data-stream [" + request.name + "]", new ClusterStateUpdateTask(Priority.HIGH) { + + @Override + public TimeValue timeout() { + return request.masterNodeTimeout(); + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public ClusterState execute(ClusterState currentState) { + Set dataStreams = new HashSet<>(); + for (String dataStreamName : currentState.metaData().dataStreams().keySet()) { + if (Regex.simpleMatch(request.name, dataStreamName)) { + dataStreams.add(dataStreamName); + } + } + if (dataStreams.isEmpty()) { + // if a match-all pattern was specified and no data streams were found because none exist, do not + // fail with data stream missing exception + if (Regex.isMatchAllPattern(request.name)) { + return currentState; + } + throw new DataStreamMissingException(request.name); + } + MetaData.Builder metaData = MetaData.builder(currentState.metaData()); + for (String dataStreamName : dataStreams) { + logger.info("removing data stream [{}]", dataStreamName); + metaData.removeDataStream(dataStreamName); + } + return ClusterState.builder(currentState).metaData(metaData).build(); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(new AcknowledgedResponse(true)); + } + }); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsAction.java index 1549f056e811f..db268ec11705f 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsAction.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.tasks.Task; @@ -41,8 +42,10 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Objects; public class GetDataStreamsAction extends ActionType { @@ -154,11 +157,29 @@ protected Response read(StreamInput in) throws IOException { @Override protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) throws Exception { - List dataStreams = List.of( - new DataStream("my_data_stream1", "@timestamp", List.of("my_data_stream1-000000")), - new DataStream("my_data_stream2", "@timestamp", List.of()) - ); - listener.onResponse(new Response(dataStreams)); + + Map dataStreams = state.metaData().dataStreams(); + + // return all data streams if no name was specified + if (request.names.length == 0) { + listener.onResponse(new Response(new ArrayList<>(dataStreams.values()))); + return; + } + + final List results = new ArrayList<>(); + for (String name : request.names) { + if (Regex.isSimpleMatchPattern(name)) { + for (Map.Entry entry : dataStreams.entrySet()) { + if (Regex.simpleMatch(name, entry.getKey())) { + results.add(entry.getValue()); + } + } + } else if (dataStreams.containsKey(name)) { + results.add(dataStreams.get(name)); + } + } + + listener.onResponse(new Response(results)); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index f52b32f3ccf67..6dfe07e5c20aa 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.ComponentTemplateMetadata; +import org.elasticsearch.cluster.metadata.DataStreamMetadata; import org.elasticsearch.cluster.metadata.IndexGraveyard; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexTemplateV2Metadata; @@ -133,6 +134,7 @@ public static List getNamedWriteables() { ComponentTemplateMetadata::readDiffFrom); registerMetaDataCustom(entries, IndexTemplateV2Metadata.TYPE, IndexTemplateV2Metadata::new, IndexTemplateV2Metadata::readDiffFrom); + registerMetaDataCustom(entries, DataStreamMetadata.TYPE, DataStreamMetadata::new, DataStreamMetadata::readDiffFrom); // Task Status (not Diffable) entries.add(new Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME, PersistentTasksNodeService.Status::new)); return entries; @@ -155,6 +157,8 @@ public static List getNamedXWriteables() { ComponentTemplateMetadata::fromXContent)); entries.add(new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(IndexTemplateV2Metadata.TYPE), IndexTemplateV2Metadata::fromXContent)); + entries.add(new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(DataStreamMetadata.TYPE), + DataStreamMetadata::fromXContent)); return entries; } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java index 8471364b07dea..dc17e3e3dc94a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java @@ -674,6 +674,12 @@ public Map templatesV2() { .orElse(Collections.emptyMap()); } + public Map dataStreams() { + return Optional.ofNullable((DataStreamMetadata) this.custom(DataStreamMetadata.TYPE)) + .map(DataStreamMetadata::dataStreams) + .orElse(Collections.emptyMap()); + } + public ImmutableOpenMap customs() { return this.customs; } @@ -1121,6 +1127,32 @@ public Builder removeIndexTemplate(String name) { return this; } + public Builder dataStreams(Map dataStreams) { + this.customs.put(DataStreamMetadata.TYPE, new DataStreamMetadata(dataStreams)); + return this; + } + + public Builder put(String name, DataStream dataStream) { + Objects.requireNonNull(dataStream, "it is invalid to add a null data stream: " + name); + Map existingDataStreams = + Optional.ofNullable((DataStreamMetadata) this.customs.get(DataStreamMetadata.TYPE)) + .map(dsmd -> new HashMap<>(dsmd.dataStreams())) + .orElse(new HashMap<>()); + existingDataStreams.put(name, dataStream); + this.customs.put(DataStreamMetadata.TYPE, new DataStreamMetadata(existingDataStreams)); + return this; + } + + public Builder removeDataStream(String name) { + Map existingDataStreams = + Optional.ofNullable((DataStreamMetadata) this.customs.get(DataStreamMetadata.TYPE)) + .map(dsmd -> new HashMap<>(dsmd.dataStreams())) + .orElse(new HashMap<>()); + existingDataStreams.remove(name); + this.customs.put(DataStreamMetadata.TYPE, new DataStreamMetadata(existingDataStreams)); + return this; + } + public Custom getCustom(String type) { return customs.get(type); } diff --git a/server/src/main/java/org/elasticsearch/indices/DataStreamMissingException.java b/server/src/main/java/org/elasticsearch/indices/DataStreamMissingException.java new file mode 100644 index 0000000000000..da270996090ca --- /dev/null +++ b/server/src/main/java/org/elasticsearch/indices/DataStreamMissingException.java @@ -0,0 +1,57 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.rest.RestStatus; + +import java.io.IOException; + +public class DataStreamMissingException extends ElasticsearchException { + + private final String name; + + public DataStreamMissingException(String name) { + super("data_stream [" + name + "] missing"); + this.name = name; + } + + public DataStreamMissingException(StreamInput in) throws IOException { + super(in); + name = in.readOptionalString(); + } + + public String name() { + return this.name; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeOptionalString(name); + } + + @Override + public RestStatus status() { + return RestStatus.NOT_FOUND; + } +} diff --git a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java index f2a73573ee1b7..98a2f11ab7007 100644 --- a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java @@ -67,6 +67,7 @@ import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotInPrimaryModeException; +import org.elasticsearch.indices.DataStreamMissingException; import org.elasticsearch.indices.IndexTemplateMissingException; import org.elasticsearch.indices.InvalidIndexTemplateException; import org.elasticsearch.indices.recovery.RecoverFilesRecoveryException; @@ -825,6 +826,7 @@ public void testIds() { ids.put(155, ShardNotInPrimaryModeException.class); ids.put(156, RetentionLeaseInvalidRetainingSeqNoException.class); ids.put(157, IngestProcessorException.class); + ids.put(158, DataStreamMissingException.class); Map, Integer> reverse = new HashMap<>(); for (Map.Entry> entry : ids.entrySet()) { diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index 072165ab098ef..8e2cae06290ce 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -28,6 +28,15 @@ public class DataStreamTests extends AbstractSerializingTestCase { + public static DataStream randomInstance() { + int numIndices = randomIntBetween(0, 128); + List indices = new ArrayList<>(numIndices); + for (int i = 0; i < numIndices; i++) { + indices.add(randomAlphaOfLength(10)); + } + return new DataStream(randomAlphaOfLength(10), randomAlphaOfLength(10), indices); + } + @Override protected DataStream doParseInstance(XContentParser parser) throws IOException { return DataStream.fromXContent(parser); @@ -40,11 +49,7 @@ protected Writeable.Reader instanceReader() { @Override protected DataStream createTestInstance() { - int numIndices = randomIntBetween(0, 128); - List indices = new ArrayList<>(numIndices); - for (int i = 0; i < numIndices; i++) { - indices.add(randomAlphaOfLength(10)); - } - return new DataStream(randomAlphaOfLength(10), randomAlphaOfLength(10), indices); + return randomInstance(); } + } From 9a44505edcb0034593eed94ab9d42fc83adb91cf Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Fri, 20 Mar 2020 10:01:27 -0500 Subject: [PATCH 2/7] include new files --- .../cluster/metadata/DataStreamMetadata.java | 170 ++++++++++++++++++ .../metadata/DataStreamMetadataTests.java | 59 ++++++ 2 files changed, 229 insertions(+) create mode 100644 server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamMetadata.java create mode 100644 server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamMetadataTests.java diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamMetadata.java new file mode 100644 index 0000000000000..3719e32680bcc --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamMetadata.java @@ -0,0 +1,170 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.Diff; +import org.elasticsearch.cluster.DiffableUtils; +import org.elasticsearch.cluster.NamedDiff; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Custom {@link MetaData} implementation for storing a map of {@link DataStream}s and their names. + */ +public class DataStreamMetadata implements MetaData.Custom { + + public static final String TYPE = "data_stream"; + private static final ParseField DATA_STREAM = new ParseField("data_stream"); + @SuppressWarnings("unchecked") + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(TYPE, false, + a -> new DataStreamMetadata((Map) a[0])); + + static { + PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> { + Map dataStreams = new HashMap<>(); + while (p.nextToken() != XContentParser.Token.END_OBJECT) { + String name = p.currentName(); + dataStreams.put(name, DataStream.fromXContent(p)); + } + return dataStreams; + }, DATA_STREAM); + } + + private final Map dataStreams; + + public DataStreamMetadata(Map dataStreams) { + this.dataStreams = dataStreams; + } + + public DataStreamMetadata(StreamInput in) throws IOException { + this.dataStreams = in.readMap(StreamInput::readString, DataStream::new); + } + + public Map dataStreams() { + return this.dataStreams; + } + + @Override + public Diff diff(MetaData.Custom before) { + return new DataStreamMetadata.DataStreamMetadataDiff((DataStreamMetadata) before, this); + } + + public static NamedDiff readDiffFrom(StreamInput in) throws IOException { + return new DataStreamMetadata.DataStreamMetadataDiff(in); + } + + @Override + public EnumSet context() { + return MetaData.ALL_CONTEXTS; + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public Version getMinimalSupportedVersion() { + return Version.V_7_7_0; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(this.dataStreams, StreamOutput::writeString, (stream, val) -> val.writeTo(stream)); + } + + public static DataStreamMetadata fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(DATA_STREAM.getPreferredName()); + for (Map.Entry dataStream : dataStreams.entrySet()) { + builder.field(dataStream.getKey(), dataStream.getValue()); + } + builder.endObject(); + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(this.dataStreams); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (obj.getClass() != getClass()) { + return false; + } + DataStreamMetadata other = (DataStreamMetadata) obj; + return Objects.equals(this.dataStreams, other.dataStreams); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + static class DataStreamMetadataDiff implements NamedDiff { + + final Diff> dataStreamDiff; + + DataStreamMetadataDiff(DataStreamMetadata before, DataStreamMetadata after) { + this.dataStreamDiff = DiffableUtils.diff(before.dataStreams, after.dataStreams, + DiffableUtils.getStringKeySerializer()); + } + + DataStreamMetadataDiff(StreamInput in) throws IOException { + this.dataStreamDiff = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), + DataStream::new, DataStream::readDiffFrom); + } + + @Override + public MetaData.Custom apply(MetaData.Custom part) { + return new DataStreamMetadata(dataStreamDiff.apply(((DataStreamMetadata) part).dataStreams)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + dataStreamDiff.writeTo(out); + } + + @Override + public String getWriteableName() { + return TYPE; + } + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamMetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamMetadataTests.java new file mode 100644 index 0000000000000..7ef4aad1a1802 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamMetadataTests.java @@ -0,0 +1,59 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.test.AbstractNamedWriteableTestCase; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class DataStreamMetadataTests extends AbstractNamedWriteableTestCase { + + @Override + protected DataStreamMetadata createTestInstance() { + if (randomBoolean()) { + return new DataStreamMetadata(Collections.emptyMap()); + } + Map dataStreams = new HashMap<>(); + for (int i = 0; i < randomIntBetween(1, 5); i++) { + dataStreams.put(randomAlphaOfLength(5), DataStreamTests.randomInstance()); + } + return new DataStreamMetadata(dataStreams); + } + + @Override + protected DataStreamMetadata mutateInstance(DataStreamMetadata instance) throws IOException { + return randomValueOtherThan(instance, this::createTestInstance); + } + + @Override + protected NamedWriteableRegistry getNamedWriteableRegistry() { + return new NamedWriteableRegistry(Collections.singletonList(new NamedWriteableRegistry.Entry(DataStreamMetadata.class, + DataStreamMetadata.TYPE, DataStreamMetadata::new))); + } + + @Override + protected Class categoryClass() { + return DataStreamMetadata.class; + } +} From a2ff5925b609102cd8fe147bc6e04331a8a161de Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Fri, 20 Mar 2020 15:41:28 -0500 Subject: [PATCH 3/7] review comments --- .../test/indices.data_stream/10_basic.yml | 18 ++++-- .../elasticsearch/ElasticsearchException.java | 7 +-- .../datastream/CreateDataStreamAction.java | 22 ++++--- .../datastream/DeleteDataStreamAction.java | 46 ++++++++------- .../datastream/GetDataStreamsAction.java | 11 ++-- .../cluster/metadata/DataStreamMetadata.java | 20 ++++++- .../cluster/metadata/MetaData.java | 6 +- .../indices/DataStreamMissingException.java | 57 ------------------- .../ExceptionSerializationTests.java | 2 - .../metadata/ToAndFromJsonMetaDataTests.java | 12 ++++ 10 files changed, 93 insertions(+), 108 deletions(-) delete mode 100644 server/src/main/java/org/elasticsearch/indices/DataStreamMissingException.java diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml index 7c2df2ddde9d9..65283f723babd 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml @@ -1,23 +1,33 @@ --- -"Test stubs": +"Create data stream": - skip: version: " - 7.99.99" reason: not backported yet - do: indices.create_data_stream: - name: data-stream2 + name: simple-data-stream1 body: timestamp_field: "@timestamp" - is_true: acknowledged + - do: + indices.create_data_stream: + name: simple-data-stream2 + body: + timestamp_field: "@timestamp2" + - is_true: acknowledged + - do: indices.get_data_streams: {} - - match: { 0.name: data-stream2 } + - match: { 0.name: simple-data-stream1 } - match: { 0.timestamp_field: '@timestamp' } - match: { 0.indices: [] } + - match: { 1.name: simple-data-stream2 } + - match: { 1.timestamp_field: '@timestamp2' } + - match: { 1.indices: [] } - do: indices.delete_data_stream: - name: data-stream2 + name: simple-data-stream2 - is_true: acknowledged diff --git a/server/src/main/java/org/elasticsearch/ElasticsearchException.java b/server/src/main/java/org/elasticsearch/ElasticsearchException.java index 95bb6adb11097..efbf8d25c9a2c 100644 --- a/server/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/server/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -1040,12 +1040,7 @@ private enum ElasticsearchExceptionHandle { org.elasticsearch.ingest.IngestProcessorException.class, org.elasticsearch.ingest.IngestProcessorException::new, 157, - Version.V_7_5_0), - DATA_STREAM_MISSING_EXCEPTION( - org.elasticsearch.indices.DataStreamMissingException.class, - org.elasticsearch.indices.DataStreamMissingException::new, - 158, - Version.V_7_7_0); + Version.V_7_5_0); final Class exceptionClass; final CheckedFunction constructor; diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java index 197eb2f897075..37f190691f33a 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java @@ -150,15 +150,7 @@ public void onFailure(String source, Exception e) { @Override public ClusterState execute(ClusterState currentState) throws Exception { - if (currentState.metaData().dataStreams().containsKey(request.name)) { - throw new IllegalArgumentException("data_stream [" + request.name + "] already exists"); - } - - MetaData.Builder builder = MetaData.builder(currentState.metaData()).put(request.name, - new DataStream(request.name, request.timestampFieldName, Collections.emptyList())); - - logger.info("adding data stream [{}]", request.name); - return ClusterState.builder(currentState).metaData(builder).build(); + return createDataStream(currentState, request); } @Override @@ -168,6 +160,18 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS }); } + static ClusterState createDataStream(ClusterState currentState, Request request) { + if (currentState.metaData().dataStreams().containsKey(request.name)) { + throw new IllegalArgumentException("data_stream [" + request.name + "] already exists"); + } + + MetaData.Builder builder = MetaData.builder(currentState.metaData()).put( + new DataStream(request.name, request.timestampFieldName, Collections.emptyList())); + + logger.info("adding data stream [{}]", request.name); + return ClusterState.builder(currentState).metaData(builder).build(); + } + @Override protected ClusterBlockException checkBlock(Request request, ClusterState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java index 6f6fedaa10535..df658b663c943 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java @@ -20,6 +20,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; @@ -40,7 +41,6 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.indices.DataStreamMissingException; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -134,26 +134,7 @@ public void onFailure(String source, Exception e) { @Override public ClusterState execute(ClusterState currentState) { - Set dataStreams = new HashSet<>(); - for (String dataStreamName : currentState.metaData().dataStreams().keySet()) { - if (Regex.simpleMatch(request.name, dataStreamName)) { - dataStreams.add(dataStreamName); - } - } - if (dataStreams.isEmpty()) { - // if a match-all pattern was specified and no data streams were found because none exist, do not - // fail with data stream missing exception - if (Regex.isMatchAllPattern(request.name)) { - return currentState; - } - throw new DataStreamMissingException(request.name); - } - MetaData.Builder metaData = MetaData.builder(currentState.metaData()); - for (String dataStreamName : dataStreams) { - logger.info("removing data stream [{}]", dataStreamName); - metaData.removeDataStream(dataStreamName); - } - return ClusterState.builder(currentState).metaData(metaData).build(); + return removeDataStream(currentState, request); } @Override @@ -163,6 +144,29 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS }); } + static ClusterState removeDataStream(ClusterState currentState, Request request) { + Set dataStreams = new HashSet<>(); + for (String dataStreamName : currentState.metaData().dataStreams().keySet()) { + if (Regex.simpleMatch(request.name, dataStreamName)) { + dataStreams.add(dataStreamName); + } + } + if (dataStreams.isEmpty()) { + // if a match-all pattern was specified and no data streams were found because none exist, do not + // fail with data stream missing exception + if (Regex.isMatchAllPattern(request.name)) { + return currentState; + } + throw new ResourceNotFoundException("data_streams matching [" + request.name + "] not found"); + } + MetaData.Builder metaData = MetaData.builder(currentState.metaData()); + for (String dataStreamName : dataStreams) { + logger.info("removing data stream [{}]", dataStreamName); + metaData.removeDataStream(dataStreamName); + } + return ClusterState.builder(currentState).metaData(metaData).build(); + } + @Override protected ClusterBlockException checkBlock(Request request, ClusterState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsAction.java index db268ec11705f..ed2411401aeb6 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsAction.java @@ -157,13 +157,15 @@ protected Response read(StreamInput in) throws IOException { @Override protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) throws Exception { + listener.onResponse(new Response(getDataStreams(state, request))); + } - Map dataStreams = state.metaData().dataStreams(); + static List getDataStreams(ClusterState clusterState, Request request) { + Map dataStreams = clusterState.metaData().dataStreams(); // return all data streams if no name was specified if (request.names.length == 0) { - listener.onResponse(new Response(new ArrayList<>(dataStreams.values()))); - return; + return new ArrayList<>(dataStreams.values()); } final List results = new ArrayList<>(); @@ -178,8 +180,7 @@ protected void masterOperation(Task task, Request request, ClusterState state, results.add(dataStreams.get(name)); } } - - listener.onResponse(new Response(results)); + return results; } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamMetadata.java index 3719e32680bcc..5731c1353f989 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamMetadata.java @@ -94,7 +94,7 @@ public String getWriteableName() { @Override public Version getMinimalSupportedVersion() { - return Version.V_7_7_0; + return Version.V_8_0_0; } @Override @@ -116,6 +116,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } + public static Builder builder() { + return new Builder(); + } + @Override public int hashCode() { return Objects.hash(this.dataStreams); @@ -138,6 +142,20 @@ public String toString() { return Strings.toString(this); } + public static class Builder { + + private final Map dataStreams = new HashMap<>(); + + public Builder putDataStream(DataStream dataStream) { + dataStreams.put(dataStream.getName(), dataStream); + return this; + } + + public DataStreamMetadata build() { + return new DataStreamMetadata(dataStreams); + } + } + static class DataStreamMetadataDiff implements NamedDiff { final Diff> dataStreamDiff; diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java index dc17e3e3dc94a..73ab3ccab7176 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java @@ -1132,13 +1132,13 @@ public Builder dataStreams(Map dataStreams) { return this; } - public Builder put(String name, DataStream dataStream) { - Objects.requireNonNull(dataStream, "it is invalid to add a null data stream: " + name); + public Builder put(DataStream dataStream) { + Objects.requireNonNull(dataStream, "it is invalid to add a null data stream"); Map existingDataStreams = Optional.ofNullable((DataStreamMetadata) this.customs.get(DataStreamMetadata.TYPE)) .map(dsmd -> new HashMap<>(dsmd.dataStreams())) .orElse(new HashMap<>()); - existingDataStreams.put(name, dataStream); + existingDataStreams.put(dataStream.getName(), dataStream); this.customs.put(DataStreamMetadata.TYPE, new DataStreamMetadata(existingDataStreams)); return this; } diff --git a/server/src/main/java/org/elasticsearch/indices/DataStreamMissingException.java b/server/src/main/java/org/elasticsearch/indices/DataStreamMissingException.java deleted file mode 100644 index da270996090ca..0000000000000 --- a/server/src/main/java/org/elasticsearch/indices/DataStreamMissingException.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.indices; - -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.rest.RestStatus; - -import java.io.IOException; - -public class DataStreamMissingException extends ElasticsearchException { - - private final String name; - - public DataStreamMissingException(String name) { - super("data_stream [" + name + "] missing"); - this.name = name; - } - - public DataStreamMissingException(StreamInput in) throws IOException { - super(in); - name = in.readOptionalString(); - } - - public String name() { - return this.name; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeOptionalString(name); - } - - @Override - public RestStatus status() { - return RestStatus.NOT_FOUND; - } -} diff --git a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java index 98a2f11ab7007..f2a73573ee1b7 100644 --- a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java @@ -67,7 +67,6 @@ import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotInPrimaryModeException; -import org.elasticsearch.indices.DataStreamMissingException; import org.elasticsearch.indices.IndexTemplateMissingException; import org.elasticsearch.indices.InvalidIndexTemplateException; import org.elasticsearch.indices.recovery.RecoverFilesRecoveryException; @@ -826,7 +825,6 @@ public void testIds() { ids.put(155, ShardNotInPrimaryModeException.class); ids.put(156, RetentionLeaseInvalidRetainingSeqNoException.class); ids.put(157, IngestProcessorException.class); - ids.put(158, DataStreamMissingException.class); Map, Integer> reverse = new HashMap<>(); for (Map.Entry> entry : ids.entrySet()) { diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetaDataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetaDataTests.java index d5e76c97e1ad4..d2a95cb4fc905 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetaDataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetaDataTests.java @@ -151,6 +151,8 @@ public void testSimpleJsonFromAndTo() throws IOException { .putAlias(newAliasMetaDataBuilder("alias-bar1")) .putAlias(newAliasMetaDataBuilder("alias-bar2").filter("{\"term\":{\"user\":\"kimchy\"}}")) .putAlias(newAliasMetaDataBuilder("alias-bar3").routing("routing-bar"))) + .put(new DataStream("data-stream1", "@timestamp", Collections.emptyList())) + .put(new DataStream("data-stream2", "@timestamp2", Collections.emptyList())) .build(); String metaDataSource = MetaData.Builder.toXContent(metaData); @@ -323,6 +325,16 @@ public void testSimpleJsonFromAndTo() throws IOException { equalTo(new Template(Settings.builder().put("setting", "value").build(), new CompressedXContent("{\"baz\":\"eggplant\"}"), Collections.singletonMap("alias", AliasMetaData.builder("alias").build())))); + + // data streams + assertNotNull(parsedMetaData.dataStreams().get("data-stream1")); + assertThat(parsedMetaData.dataStreams().get("data-stream1").getName(), is("data-stream1")); + assertThat(parsedMetaData.dataStreams().get("data-stream1").getTimeStampField(), is("@timestamp")); + assertThat(parsedMetaData.dataStreams().get("data-stream1").getIndices(), is(Collections.emptyList())); + assertNotNull(parsedMetaData.dataStreams().get("data-stream2")); + assertThat(parsedMetaData.dataStreams().get("data-stream2").getName(), is("data-stream2")); + assertThat(parsedMetaData.dataStreams().get("data-stream2").getTimeStampField(), is("@timestamp2")); + assertThat(parsedMetaData.dataStreams().get("data-stream2").getIndices(), is(Collections.emptyList())); } private static final String MAPPING_SOURCE1 = "{\"mapping1\":{\"text1\":{\"type\":\"string\"}}}"; From fcacbbf2579b09697bde770f39794c962878101d Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Mon, 23 Mar 2020 09:24:42 -0500 Subject: [PATCH 4/7] include data streams in MetadataTests --- .../java/org/elasticsearch/cluster/metadata/MetaDataTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataTests.java index bd2e48a20c2e4..e3099ab8f9de3 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataTests.java @@ -939,6 +939,7 @@ public static MetaData randomMetaData() { .version(randomNonNegativeLong()) .put("component_template_" + randomAlphaOfLength(3), ComponentTemplateTests.randomInstance()) .put("index_template_v2_" + randomAlphaOfLength(3), IndexTemplateV2Tests.randomInstance()) + .put(DataStreamTests.randomInstance()) .build(); } } From 169efc4c74f0abc7560b943f1c1afa01490b47db Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Mon, 23 Mar 2020 12:02:04 -0500 Subject: [PATCH 5/7] unit test for transport actions --- .../datastream/DeleteDataStreamAction.java | 8 +++- .../CreateDataStreamRequestTests.java | 47 +++++++++++++++++++ .../DeleteDataStreamRequestTests.java | 46 ++++++++++++++++++ .../GetDataStreamsRequestTests.java | 38 +++++++++++++++ 4 files changed, 138 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java index df658b663c943..91444ef64320b 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.ValidateActions; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeRequest; @@ -36,6 +37,7 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -71,7 +73,11 @@ public Request(String name) { @Override public ActionRequestValidationException validate() { - return null; + ActionRequestValidationException validationException = null; + if (Strings.hasText(name) == false) { + validationException = ValidateActions.addValidationError("name is missing", validationException); + } + return validationException; } public Request(StreamInput in) throws IOException { diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java index d6a846c205fb3..6945811c8df23 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java @@ -18,10 +18,21 @@ */ package org.elasticsearch.action.admin.indices.datastream; +import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction.Request; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.test.AbstractWireSerializingTestCase; +import java.util.Collections; +import java.util.Map; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCase { @Override @@ -35,4 +46,40 @@ protected Request createTestInstance() { request.setTimestampFieldName(randomAlphaOfLength(8)); return request; } + + public void testValidateRequest() { + CreateDataStreamAction.Request req = new CreateDataStreamAction.Request("my-data-stream"); + req.setTimestampFieldName("my-timestamp-field"); + ActionRequestValidationException e = req.validate(); + assertNull(e); + } + + public void testValidateRequestWithoutTimestampField() { + CreateDataStreamAction.Request req = new CreateDataStreamAction.Request("my-data-stream"); + ActionRequestValidationException e = req.validate(); + assertNotNull(e); + assertThat(e.validationErrors().size(), equalTo(1)); + assertThat(e.validationErrors().get(0), containsString("timestamp field name is missing")); + } + + public void testCreateDataStream() { + final String dataStreamName = "my-data-stream"; + ClusterState cs = ClusterState.builder(new ClusterName("_name")).build(); + CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName); + ClusterState newState = CreateDataStreamAction.TransportAction.createDataStream(cs, req); + assertThat(newState.metaData().dataStreams().size(), equalTo(1)); + assertThat(newState.metaData().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName)); + } + + public void testCreateDuplicateDataStream() { + final String dataStreamName = "my-data-stream"; + DataStream existingDataStream = new DataStream(dataStreamName, "timestamp", Collections.emptyList()); + ClusterState cs = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().dataStreams(Map.of(dataStreamName, existingDataStream)).build()).build(); + CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName); + + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, + () -> CreateDataStreamAction.TransportAction.createDataStream(cs, req)); + assertThat(e.getMessage(), containsString("data_stream [" + dataStreamName + "] already exists")); + } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java index f460065699795..42a961d828f4e 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java @@ -18,10 +18,22 @@ */ package org.elasticsearch.action.admin.indices.datastream; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction.Request; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.test.AbstractWireSerializingTestCase; +import java.util.Collections; +import java.util.Map; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + public class DeleteDataStreamRequestTests extends AbstractWireSerializingTestCase { @Override @@ -33,4 +45,38 @@ protected Writeable.Reader instanceReader() { protected Request createTestInstance() { return new Request(randomAlphaOfLength(8)); } + + public void testValidateRequest() { + DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request("my-data-stream"); + ActionRequestValidationException e = req.validate(); + assertNull(e); + } + + public void testValidateRequestWithoutName() { + DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(""); + ActionRequestValidationException e = req.validate(); + assertNotNull(e); + assertThat(e.validationErrors().size(), equalTo(1)); + assertThat(e.validationErrors().get(0), containsString("name is missing")); + } + + public void testDeleteDataStream() { + final String dataStreamName = "my-data-stream"; + DataStream existingDataStream = new DataStream(dataStreamName, "timestamp", Collections.emptyList()); + ClusterState cs = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().dataStreams(Map.of(dataStreamName, existingDataStream)).build()).build(); + DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(dataStreamName); + + ClusterState newState = DeleteDataStreamAction.TransportAction.removeDataStream(cs, req); + assertThat(newState.metaData().dataStreams().size(), equalTo(0)); + } + + public void testDeleteNonexistentDataStream() { + final String dataStreamName = "my-data-stream"; + ClusterState cs = ClusterState.builder(new ClusterName("_name")).build(); + DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(dataStreamName); + ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class, + () -> DeleteDataStreamAction.TransportAction.removeDataStream(cs, req)); + assertThat(e.getMessage(), containsString("data_streams matching [" + dataStreamName + "] not found")); + } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsRequestTests.java index 062bdef629cc9..60752d6d0844a 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsRequestTests.java @@ -18,10 +18,23 @@ */ package org.elasticsearch.action.admin.indices.datastream; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction.Request; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.test.AbstractWireSerializingTestCase; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + public class GetDataStreamsRequestTests extends AbstractWireSerializingTestCase { @Override @@ -33,4 +46,29 @@ protected Writeable.Reader instanceReader() { protected Request createTestInstance() { return new Request(generateRandomStringArray(8, 8, false)); } + + public void testValidateRequest() { + GetDataStreamsAction.Request req = new GetDataStreamsAction.Request(new String[]{}); + ActionRequestValidationException e = req.validate(); + assertNull(e); + } + + public void testGetDataStreams() { + final String dataStreamName = "my-data-stream"; + DataStream existingDataStream = new DataStream(dataStreamName, "timestamp", Collections.emptyList()); + ClusterState cs = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().dataStreams(Map.of(dataStreamName, existingDataStream)).build()).build(); + GetDataStreamsAction.Request req = new GetDataStreamsAction.Request(new String[]{dataStreamName}); + List dataStreams = GetDataStreamsAction.TransportAction.getDataStreams(cs, req); + assertThat(dataStreams.size(), equalTo(1)); + assertThat(dataStreams.get(0).getName(), equalTo(dataStreamName)); + } + + public void testGetNonexistentDataStream() { + final String dataStreamName = "my-data-stream"; + ClusterState cs = ClusterState.builder(new ClusterName("_name")).build(); + GetDataStreamsAction.Request req = new GetDataStreamsAction.Request(new String[]{dataStreamName}); + List dataStreams = GetDataStreamsAction.TransportAction.getDataStreams(cs, req); + assertThat(dataStreams.size(), equalTo(0)); + } } From bef471dc9dd89ba85f5a493158930798a136e213 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Mon, 23 Mar 2020 13:16:54 -0500 Subject: [PATCH 6/7] checkstyle --- .../admin/indices/datastream/GetDataStreamsRequestTests.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsRequestTests.java index 60752d6d0844a..1caa8ce568883 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/GetDataStreamsRequestTests.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.action.admin.indices.datastream; -import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction.Request; import org.elasticsearch.cluster.ClusterName; @@ -32,7 +31,6 @@ import java.util.List; import java.util.Map; -import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; public class GetDataStreamsRequestTests extends AbstractWireSerializingTestCase { From bfa64f1a6bb6503da645ca29d44271dc693bca67 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Mon, 23 Mar 2020 15:46:51 -0500 Subject: [PATCH 7/7] temporarily mute YML test --- .../rest-api-spec/test/indices.data_stream/10_basic.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml index 1294ea1ac53ca..d21abfc11c754 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml @@ -1,8 +1,8 @@ --- "Create data stream": - skip: - version: " - 7.6.99" - reason: only available in 7.7+ + version: "all" + reason: "AwaitsFix https://github.com/elastic/elasticsearch/issues/54022" - do: indices.create_data_stream: