Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,26 +1,33 @@
---
"Test stubs":
"Create data stream":
- skip:
version: " - 7.6.99"
reason: only available in 7.7+
version: "all"
reason: "AwaitsFix https://github.com/elastic/elasticsearch/issues/54022"

- do:
indices.create_data_stream:
name: data-stream2
name: simple-data-stream1
body:
timestamp_field: "@timestamp"
- is_true: acknowledged

- do:
indices.create_data_stream:
name: simple-data-stream2
body:
timestamp_field: "@timestamp2"
- is_true: acknowledged

- do:
indices.get_data_streams: {}
- match: { 0.name: my_data_stream1 }
- match: { 0.name: simple-data-stream1 }
- match: { 0.timestamp_field: '@timestamp' }
- match: { 0.indices: ['my_data_stream1-000000'] }
- match: { 1.name: my_data_stream2 }
- match: { 1.timestamp_field: '@timestamp' }
- match: { 0.indices: [] }
- match: { 1.name: simple-data-stream2 }
- match: { 1.timestamp_field: '@timestamp2' }
- match: { 1.indices: [] }

- do:
indices.delete_data_stream:
name: data-stream2
name: simple-data-stream2
- is_true: acknowledged
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,42 @@
*/
package org.elasticsearch.action.admin.indices.datastream;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Collections;
import java.util.Objects;

public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {

private static final Logger logger = LogManager.getLogger(CreateDataStreamAction.class);

public static final CreateDataStreamAction INSTANCE = new CreateDataStreamAction();
public static final String NAME = "indices:admin/data_stream/create";

Expand All @@ -64,7 +76,14 @@ public void setTimestampFieldName(String timestampFieldName) {

@Override
public ActionRequestValidationException validate() {
return null;
ActionRequestValidationException validationException = null;
if (Strings.hasText(name) == false) {
validationException = ValidateActions.addValidationError("name is missing", validationException);
}
if (Strings.hasText(timestampFieldName) == false) {
validationException = ValidateActions.addValidationError("timestamp field name is missing", validationException);
}
return validationException;
}

public Request(StreamInput in) throws IOException {
Expand Down Expand Up @@ -116,7 +135,41 @@ protected AcknowledgedResponse read(StreamInput in) throws IOException {
@Override
protected void masterOperation(Task task, Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) throws Exception {
listener.onResponse(new AcknowledgedResponse(true));
clusterService.submitStateUpdateTask("create-data-stream [" + request.name + "]",
new ClusterStateUpdateTask(Priority.HIGH) {

@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}

@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return createDataStream(currentState, request);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(new AcknowledgedResponse(true));
}
});
}

static ClusterState createDataStream(ClusterState currentState, Request request) {
if (currentState.metaData().dataStreams().containsKey(request.name)) {
throw new IllegalArgumentException("data_stream [" + request.name + "] already exists");
}

MetaData.Builder builder = MetaData.builder(currentState.metaData()).put(
new DataStream(request.name, request.timestampFieldName, Collections.emptyList()));

logger.info("adding data stream [{}]", request.name);
return ClusterState.builder(currentState).metaData(builder).build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,44 @@
*/
package org.elasticsearch.action.admin.indices.datastream;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;

public class DeleteDataStreamAction extends ActionType<AcknowledgedResponse> {

private static final Logger logger = LogManager.getLogger(DeleteDataStreamAction.class);

public static final DeleteDataStreamAction INSTANCE = new DeleteDataStreamAction();
public static final String NAME = "indices:admin/data_stream/delete";

Expand All @@ -59,7 +73,11 @@ public Request(String name) {

@Override
public ActionRequestValidationException validate() {
return null;
ActionRequestValidationException validationException = null;
if (Strings.hasText(name) == false) {
validationException = ValidateActions.addValidationError("name is missing", validationException);
}
return validationException;
}

public Request(StreamInput in) throws IOException {
Expand Down Expand Up @@ -108,7 +126,51 @@ protected AcknowledgedResponse read(StreamInput in) throws IOException {
@Override
protected void masterOperation(Task task, Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) throws Exception {
listener.onResponse(new AcknowledgedResponse(true));
clusterService.submitStateUpdateTask("remove-data-stream [" + request.name + "]", new ClusterStateUpdateTask(Priority.HIGH) {

@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}

@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}

@Override
public ClusterState execute(ClusterState currentState) {
return removeDataStream(currentState, request);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(new AcknowledgedResponse(true));
}
});
}

static ClusterState removeDataStream(ClusterState currentState, Request request) {
Set<String> dataStreams = new HashSet<>();
for (String dataStreamName : currentState.metaData().dataStreams().keySet()) {
if (Regex.simpleMatch(request.name, dataStreamName)) {
dataStreams.add(dataStreamName);
}
}
if (dataStreams.isEmpty()) {
// if a match-all pattern was specified and no data streams were found because none exist, do not
// fail with data stream missing exception
if (Regex.isMatchAllPattern(request.name)) {
return currentState;
}
throw new ResourceNotFoundException("data_streams matching [" + request.name + "] not found");
}
MetaData.Builder metaData = MetaData.builder(currentState.metaData());
for (String dataStreamName : dataStreams) {
logger.info("removing data stream [{}]", dataStreamName);
metaData.removeDataStream(dataStreamName);
}
return ClusterState.builder(currentState).metaData(metaData).build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,18 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class GetDataStreamsAction extends ActionType<GetDataStreamsAction.Response> {
Expand Down Expand Up @@ -154,11 +157,30 @@ protected Response read(StreamInput in) throws IOException {
@Override
protected void masterOperation(Task task, Request request, ClusterState state,
ActionListener<Response> listener) throws Exception {
List<DataStream> dataStreams = List.of(
new DataStream("my_data_stream1", "@timestamp", List.of("my_data_stream1-000000")),
new DataStream("my_data_stream2", "@timestamp", List.of())
);
listener.onResponse(new Response(dataStreams));
listener.onResponse(new Response(getDataStreams(state, request)));
}

static List<DataStream> getDataStreams(ClusterState clusterState, Request request) {
Map<String, DataStream> dataStreams = clusterState.metaData().dataStreams();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unit test?
Also I think this should throw a resource not found exception if a specific data stream doesn't exist?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems there are different existing behaviors when attempting to retrieve a specific resource that does not exist. E.g., aliases and indices return a 404 and index templates and ingest pipelines return an empty list. I don't if those are intentional behavioral differences, but I can certainly return a 404 if that's appropriate for data streams.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that returning a 404 in case of requesting a specific data stream is appropriate.


// return all data streams if no name was specified
if (request.names.length == 0) {
return new ArrayList<>(dataStreams.values());
}

final List<DataStream> results = new ArrayList<>();
for (String name : request.names) {
if (Regex.isSimpleMatchPattern(name)) {
for (Map.Entry<String, DataStream> entry : dataStreams.entrySet()) {
if (Regex.simpleMatch(name, entry.getKey())) {
results.add(entry.getValue());
}
}
} else if (dataStreams.containsKey(name)) {
results.add(dataStreams.get(name));
}
}
return results;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.ComponentTemplateMetadata;
import org.elasticsearch.cluster.metadata.DataStreamMetadata;
import org.elasticsearch.cluster.metadata.IndexGraveyard;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexTemplateV2Metadata;
Expand Down Expand Up @@ -133,6 +134,7 @@ public static List<Entry> getNamedWriteables() {
ComponentTemplateMetadata::readDiffFrom);
registerMetaDataCustom(entries, IndexTemplateV2Metadata.TYPE, IndexTemplateV2Metadata::new,
IndexTemplateV2Metadata::readDiffFrom);
registerMetaDataCustom(entries, DataStreamMetadata.TYPE, DataStreamMetadata::new, DataStreamMetadata::readDiffFrom);
// Task Status (not Diffable)
entries.add(new Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME, PersistentTasksNodeService.Status::new));
return entries;
Expand All @@ -155,6 +157,8 @@ public static List<NamedXContentRegistry.Entry> getNamedXWriteables() {
ComponentTemplateMetadata::fromXContent));
entries.add(new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(IndexTemplateV2Metadata.TYPE),
IndexTemplateV2Metadata::fromXContent));
entries.add(new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(DataStreamMetadata.TYPE),
DataStreamMetadata::fromXContent));
return entries;
}

Expand Down
Loading