Skip to content

Commit 19da22e

Browse files
authored
Delete backing indices with data stream (#54693)
1 parent 87ce139 commit 19da22e

File tree

3 files changed

+163
-14
lines changed

3 files changed

+163
-14
lines changed

rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,3 +119,50 @@
119119
indices.delete_data_stream:
120120
name: get-data-stream2
121121
- is_true: acknowledged
122+
123+
---
124+
"Delete data stream with backing indices":
125+
- skip:
126+
version: " - 7.99.99"
127+
reason: "enable in 7.8+ after back-porting"
128+
129+
- do:
130+
indices.create_data_stream:
131+
name: delete-data-stream1
132+
body:
133+
timestamp_field: "@timestamp"
134+
- is_true: acknowledged
135+
136+
- do:
137+
indices.create:
138+
index: test_index
139+
body:
140+
settings:
141+
number_of_shards: 1
142+
number_of_replicas: 1
143+
144+
- do:
145+
indices.get:
146+
index: "*"
147+
expand_wildcards: all
148+
149+
- is_true: test_index.settings
150+
- is_true: delete-data-stream1-000001.settings
151+
152+
- do:
153+
indices.get_data_streams: {}
154+
- match: { 0.name: delete-data-stream1 }
155+
- match: { 0.timestamp_field: '@timestamp' }
156+
- length: { 0.indices: 1 }
157+
- match: { 0.indices.0.index_name: 'delete-data-stream1-000001' }
158+
159+
- do:
160+
indices.delete_data_stream:
161+
name: delete-data-stream1
162+
- is_true: acknowledged
163+
164+
- do:
165+
catch: missing
166+
indices.get:
167+
index: "delete-data-stream1-000001"
168+
expand_wildcards: all

server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,10 @@
3333
import org.elasticsearch.cluster.ClusterStateUpdateTask;
3434
import org.elasticsearch.cluster.block.ClusterBlockException;
3535
import org.elasticsearch.cluster.block.ClusterBlockLevel;
36+
import org.elasticsearch.cluster.metadata.DataStream;
3637
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
3738
import org.elasticsearch.cluster.metadata.Metadata;
39+
import org.elasticsearch.cluster.metadata.MetadataDeleteIndexService;
3840
import org.elasticsearch.cluster.service.ClusterService;
3941
import org.elasticsearch.common.Priority;
4042
import org.elasticsearch.common.Strings;
@@ -43,12 +45,15 @@
4345
import org.elasticsearch.common.io.stream.StreamOutput;
4446
import org.elasticsearch.common.regex.Regex;
4547
import org.elasticsearch.common.unit.TimeValue;
48+
import org.elasticsearch.index.Index;
4649
import org.elasticsearch.tasks.Task;
4750
import org.elasticsearch.threadpool.ThreadPool;
4851
import org.elasticsearch.transport.TransportService;
4952

5053
import java.io.IOException;
54+
import java.util.ArrayList;
5155
import java.util.HashSet;
56+
import java.util.List;
5257
import java.util.Objects;
5358
import java.util.Set;
5459

@@ -107,10 +112,14 @@ public int hashCode() {
107112

108113
public static class TransportAction extends TransportMasterNodeAction<Request, AcknowledgedResponse> {
109114

115+
private final MetadataDeleteIndexService deleteIndexService;
116+
110117
@Inject
111118
public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
112-
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
119+
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
120+
MetadataDeleteIndexService deleteIndexService) {
113121
super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver);
122+
this.deleteIndexService = deleteIndexService;
114123
}
115124

116125
@Override
@@ -140,7 +149,7 @@ public void onFailure(String source, Exception e) {
140149

141150
@Override
142151
public ClusterState execute(ClusterState currentState) {
143-
return removeDataStream(currentState, request);
152+
return removeDataStream(deleteIndexService, currentState, request);
144153
}
145154

146155
@Override
@@ -150,7 +159,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
150159
});
151160
}
152161

153-
static ClusterState removeDataStream(ClusterState currentState, Request request) {
162+
static ClusterState removeDataStream(MetadataDeleteIndexService deleteIndexService, ClusterState currentState, Request request) {
154163
Set<String> dataStreams = new HashSet<>();
155164
for (String dataStreamName : currentState.metadata().dataStreams().keySet()) {
156165
if (Regex.simpleMatch(request.name, dataStreamName)) {
@@ -165,10 +174,19 @@ static ClusterState removeDataStream(ClusterState currentState, Request request)
165174
}
166175
throw new ResourceNotFoundException("data_streams matching [" + request.name + "] not found");
167176
}
168-
Metadata.Builder metadata = Metadata.builder(currentState.metadata());
177+
List<String> dataStreamsToRemove = new ArrayList<>();
178+
Set<Index> backingIndicesToRemove = new HashSet<>();
169179
for (String dataStreamName : dataStreams) {
170-
logger.info("removing data stream [{}]", dataStreamName);
171-
metadata.removeDataStream(dataStreamName);
180+
DataStream dataStream = currentState.metadata().dataStreams().get(dataStreamName);
181+
assert dataStream != null;
182+
backingIndicesToRemove.addAll(dataStream.getIndices());
183+
dataStreamsToRemove.add(dataStreamName);
184+
}
185+
currentState = deleteIndexService.deleteIndices(currentState, backingIndicesToRemove);
186+
Metadata.Builder metadata = Metadata.builder(currentState.metadata());
187+
for (String ds : dataStreamsToRemove) {
188+
logger.info("removing data stream [{}]", ds);
189+
metadata.removeDataStream(ds);
172190
}
173191
return ClusterState.builder(currentState).metadata(metadata).build();
174192
}

server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java

Lines changed: 92 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,32 @@
1919
package org.elasticsearch.action.admin.indices.datastream;
2020

2121
import org.elasticsearch.ResourceNotFoundException;
22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.action.ActionRequestValidationException;
2324
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction.Request;
2425
import org.elasticsearch.cluster.ClusterName;
2526
import org.elasticsearch.cluster.ClusterState;
2627
import org.elasticsearch.cluster.metadata.DataStream;
28+
import org.elasticsearch.cluster.metadata.IndexMetadata;
2729
import org.elasticsearch.cluster.metadata.Metadata;
30+
import org.elasticsearch.cluster.metadata.MetadataDeleteIndexService;
31+
import org.elasticsearch.common.collect.Tuple;
2832
import org.elasticsearch.common.io.stream.Writeable;
33+
import org.elasticsearch.common.settings.Settings;
34+
import org.elasticsearch.index.Index;
2935
import org.elasticsearch.test.AbstractWireSerializingTestCase;
3036

31-
import java.util.Collections;
32-
import java.util.Map;
37+
import java.util.ArrayList;
38+
import java.util.List;
39+
import java.util.Locale;
40+
import java.util.Set;
41+
import java.util.stream.Collectors;
3342

3443
import static org.hamcrest.Matchers.containsString;
3544
import static org.hamcrest.Matchers.equalTo;
45+
import static org.mockito.Matchers.any;
46+
import static org.mockito.Mockito.mock;
47+
import static org.mockito.Mockito.when;
3648

3749
public class DeleteDataStreamRequestTests extends AbstractWireSerializingTestCase<Request> {
3850

@@ -62,21 +74,93 @@ public void testValidateRequestWithoutName() {
6274

6375
public void testDeleteDataStream() {
6476
final String dataStreamName = "my-data-stream";
65-
DataStream existingDataStream = new DataStream(dataStreamName, "timestamp", Collections.emptyList());
66-
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
67-
.metadata(Metadata.builder().dataStreams(Map.of(dataStreamName, existingDataStream)).build()).build();
68-
DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(dataStreamName);
77+
final List<String> otherIndices = randomSubsetOf(List.of("foo", "bar", "baz"));
78+
79+
ClusterState cs = getClusterState(
80+
List.of(new Tuple<>(dataStreamName, List.of(
81+
String.format(Locale.ROOT, "%s-%06d", dataStreamName, 1),
82+
String.format(Locale.ROOT, "%s-%06d", dataStreamName, 2)))),
83+
otherIndices);
6984

70-
ClusterState newState = DeleteDataStreamAction.TransportAction.removeDataStream(cs, req);
85+
DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(dataStreamName);
86+
ClusterState newState = DeleteDataStreamAction.TransportAction.removeDataStream(getMetadataDeleteIndexService(), cs, req);
7187
assertThat(newState.metadata().dataStreams().size(), equalTo(0));
88+
assertThat(newState.metadata().indices().size(), equalTo(otherIndices.size()));
89+
for (String indexName : otherIndices) {
90+
assertThat(newState.metadata().indices().get(indexName).getIndex().getName(), equalTo(indexName));
91+
}
7292
}
7393

7494
public void testDeleteNonexistentDataStream() {
7595
final String dataStreamName = "my-data-stream";
7696
ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
7797
DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(dataStreamName);
7898
ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class,
79-
() -> DeleteDataStreamAction.TransportAction.removeDataStream(cs, req));
99+
() -> DeleteDataStreamAction.TransportAction.removeDataStream(getMetadataDeleteIndexService(), cs, req));
80100
assertThat(e.getMessage(), containsString("data_streams matching [" + dataStreamName + "] not found"));
81101
}
102+
103+
@SuppressWarnings("unchecked")
104+
private static MetadataDeleteIndexService getMetadataDeleteIndexService() {
105+
MetadataDeleteIndexService s = mock(MetadataDeleteIndexService.class);
106+
when(s.deleteIndices(any(ClusterState.class), any(Set.class)))
107+
.thenAnswer(mockInvocation -> {
108+
ClusterState currentState = (ClusterState) mockInvocation.getArguments()[0];
109+
Set<Index> indices = (Set<Index>) mockInvocation.getArguments()[1];
110+
111+
final Metadata.Builder b = Metadata.builder(currentState.metadata());
112+
for (Index index : indices) {
113+
b.remove(index.getName());
114+
}
115+
116+
return ClusterState.builder(currentState).metadata(b.build()).build();
117+
});
118+
119+
return s;
120+
}
121+
122+
/**
123+
* Constructs {@code ClusterState} with the specified data streams and indices.
124+
*
125+
* @param dataStreamAndIndexNames The names of the data streams to create with their respective backing indices
126+
* @param indexNames The names of indices to create that do not back any data streams
127+
*/
128+
private static ClusterState getClusterState(List<Tuple<String, List<String>>> dataStreamAndIndexNames, List<String> indexNames) {
129+
Metadata.Builder builder = Metadata.builder();
130+
131+
List<IndexMetadata> allIndices = new ArrayList<>();
132+
for (Tuple<String, List<String>> dsTuple : dataStreamAndIndexNames) {
133+
List<IndexMetadata> backingIndices = new ArrayList<>();
134+
for (String indexName : dsTuple.v2()) {
135+
backingIndices.add(createIndexMetadata(indexName, true));
136+
}
137+
allIndices.addAll(backingIndices);
138+
139+
DataStream ds = new DataStream(dsTuple.v1(), "@timestamp",
140+
backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()));
141+
builder.put(ds);
142+
}
143+
144+
for (String indexName : indexNames) {
145+
allIndices.add(createIndexMetadata(indexName, false));
146+
}
147+
148+
for (IndexMetadata index : allIndices) {
149+
builder.put(index, false);
150+
}
151+
152+
return ClusterState.builder(new ClusterName("_name")).metadata(builder).build();
153+
}
154+
155+
private static IndexMetadata createIndexMetadata(String name, boolean hidden) {
156+
Settings.Builder b = Settings.builder()
157+
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
158+
.put("index.hidden", hidden);
159+
160+
return IndexMetadata.builder(name)
161+
.settings(b)
162+
.numberOfShards(1)
163+
.numberOfReplicas(1)
164+
.build();
165+
}
82166
}

0 commit comments

Comments
 (0)