Skip to content

Commit 633b66f

Browse files
authored
Allow closing a write index of a data stream. (#70908)
Prior to this commit when attempting to close a data stream a validation error is returned indicating that it is forbidden to close a write index of a data stream. The idea behind that is to ensure that a data stream always can accept writes. For the same reason deleting a write index is not allowed (the write index can only be deleted when deleting the entire data stream). However closing an index isn't as destructive as deleting an index (an open index request makes the write index available again) and there are other cases where a data stream can't accept writes. For example when primary shards of the write index are not available. So the original reasoning for not allowing to close a write index isn't that strong. On top of this is that this also avoids certain administrative operations from being performed. For example restoring a snapshot containing data streams that already exist in the cluster (in place restore). Closes #70903 #70861
1 parent 73e2775 commit 633b66f

File tree

5 files changed

+133
-91
lines changed

5 files changed

+133
-91
lines changed

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexStateService.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -133,18 +133,6 @@ public void closeIndices(final CloseIndexClusterStateUpdateRequest request, fina
133133
if (concreteIndices == null || concreteIndices.length == 0) {
134134
throw new IllegalArgumentException("Index name is required");
135135
}
136-
List<String> writeIndices = new ArrayList<>();
137-
SortedMap<String, IndexAbstraction> lookup = clusterService.state().metadata().getIndicesLookup();
138-
for (Index index : concreteIndices) {
139-
IndexAbstraction ia = lookup.get(index.getName());
140-
if (ia != null && ia.getParentDataStream() != null && ia.getParentDataStream().getWriteIndex().getIndex().equals(index)) {
141-
writeIndices.add(index.getName());
142-
}
143-
}
144-
if (writeIndices.size() > 0) {
145-
throw new IllegalArgumentException("cannot close the following data stream write indices [" +
146-
Strings.collectionToCommaDelimitedString(writeIndices) + "]");
147-
}
148136

149137
clusterService.submitStateUpdateTask("add-block-index-to-close " + Arrays.toString(concreteIndices),
150138
new ClusterStateUpdateTask(Priority.URGENT, request.masterNodeTimeout()) {

server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexStateServiceTests.java

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
package org.elasticsearch.cluster.metadata;
1010

1111
import org.elasticsearch.Version;
12-
import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
1312
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
1413
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse.IndexResult;
1514
import org.elasticsearch.cluster.ClusterName;
@@ -24,11 +23,8 @@
2423
import org.elasticsearch.cluster.routing.ShardRouting;
2524
import org.elasticsearch.cluster.routing.ShardRoutingState;
2625
import org.elasticsearch.cluster.routing.UnassignedInfo;
27-
import org.elasticsearch.cluster.service.ClusterService;
2826
import org.elasticsearch.common.Nullable;
29-
import org.elasticsearch.common.Strings;
3027
import org.elasticsearch.common.collect.ImmutableOpenMap;
31-
import org.elasticsearch.common.collect.Tuple;
3228
import org.elasticsearch.common.settings.Settings;
3329
import org.elasticsearch.index.Index;
3430
import org.elasticsearch.index.IndexNotFoundException;
@@ -40,15 +36,12 @@
4036
import org.elasticsearch.snapshots.SnapshotInfoTests;
4137
import org.elasticsearch.test.ESTestCase;
4238
import org.elasticsearch.test.VersionUtils;
43-
import org.hamcrest.CoreMatchers;
4439

45-
import java.util.ArrayList;
4640
import java.util.Collection;
4741
import java.util.Collections;
4842
import java.util.HashMap;
4943
import java.util.HashSet;
5044
import java.util.List;
51-
import java.util.Locale;
5245
import java.util.Map;
5346
import java.util.Set;
5447
import java.util.stream.Collectors;
@@ -68,8 +61,6 @@
6861
import static org.hamcrest.Matchers.hasSize;
6962
import static org.hamcrest.Matchers.is;
7063
import static org.hamcrest.Matchers.notNullValue;
71-
import static org.mockito.Mockito.mock;
72-
import static org.mockito.Mockito.when;
7364

7465
public class MetadataIndexStateServiceTests extends ESTestCase {
7566

@@ -316,34 +307,6 @@ public void testCloseFailedIfBlockDisappeared() {
316307
assertThat(failedIndices, equalTo(disappearedIndices));
317308
}
318309

319-
public void testCloseCurrentWriteIndexForDataStream() {
320-
int numDataStreams = randomIntBetween(1, 3);
321-
List<Tuple<String, Integer>> dataStreamsToCreate = new ArrayList<>();
322-
List<String> writeIndices = new ArrayList<>();
323-
for (int k = 0; k < numDataStreams; k++) {
324-
String dataStreamName = randomAlphaOfLength(6).toLowerCase(Locale.ROOT);
325-
int numBackingIndices = randomIntBetween(1, 5);
326-
dataStreamsToCreate.add(new Tuple<>(dataStreamName, numBackingIndices));
327-
writeIndices.add(DataStream.getDefaultBackingIndexName(dataStreamName, numBackingIndices));
328-
}
329-
ClusterState cs = DataStreamTestHelper.getClusterStateWithDataStreams(dataStreamsToCreate, List.of());
330-
331-
ClusterService clusterService = mock(ClusterService.class);
332-
when(clusterService.state()).thenReturn(cs);
333-
334-
List<String> indicesToDelete = randomSubsetOf(randomIntBetween(1, numDataStreams), writeIndices);
335-
Index[] indicesToDeleteArray = new Index[indicesToDelete.size()];
336-
for (int k = 0; k < indicesToDelete.size(); k++) {
337-
Index indexToDelete = cs.metadata().index(indicesToDelete.get(k)).getIndex();
338-
indicesToDeleteArray[k] = indexToDelete;
339-
}
340-
MetadataIndexStateService service = new MetadataIndexStateService(clusterService, null, null, null, null, null, null);
341-
CloseIndexClusterStateUpdateRequest request = new CloseIndexClusterStateUpdateRequest(0L).indices(indicesToDeleteArray);
342-
Exception e = expectThrows(IllegalArgumentException.class, () -> service.closeIndices(request, null));
343-
assertThat(e.getMessage(), CoreMatchers.containsString("cannot close the following data stream write indices [" +
344-
Strings.collectionToCommaDelimitedString(indicesToDelete) + "]"));
345-
}
346-
347310
public static ClusterState addOpenedIndex(final String index, final int numShards, final int numReplicas, final ClusterState state) {
348311
return addIndex(state, index, numShards, numReplicas, IndexMetadata.State.OPEN, null);
349312
}

x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamsSnapshotsIT.java

Lines changed: 116 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,28 +12,30 @@
1212
import org.elasticsearch.action.DocWriteResponse;
1313
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
1414
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
15+
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
1516
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
1617
import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
18+
import org.elasticsearch.action.index.IndexResponse;
19+
import org.elasticsearch.action.support.IndicesOptions;
20+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
21+
import org.elasticsearch.client.Client;
22+
import org.elasticsearch.cluster.metadata.DataStream;
1723
import org.elasticsearch.common.settings.Settings;
1824
import org.elasticsearch.common.unit.ByteSizeUnit;
25+
import org.elasticsearch.index.Index;
26+
import org.elasticsearch.plugins.Plugin;
27+
import org.elasticsearch.rest.RestStatus;
28+
import org.elasticsearch.search.SearchHit;
1929
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
2030
import org.elasticsearch.snapshots.RestoreInfo;
2131
import org.elasticsearch.snapshots.SnapshotInProgressException;
2232
import org.elasticsearch.snapshots.SnapshotInfo;
2333
import org.elasticsearch.snapshots.SnapshotRestoreException;
2434
import org.elasticsearch.snapshots.SnapshotState;
35+
import org.elasticsearch.snapshots.mockstore.MockRepository;
2536
import org.elasticsearch.xpack.core.action.CreateDataStreamAction;
2637
import org.elasticsearch.xpack.core.action.DeleteDataStreamAction;
2738
import org.elasticsearch.xpack.core.action.GetDataStreamAction;
28-
import org.elasticsearch.action.index.IndexResponse;
29-
import org.elasticsearch.action.support.IndicesOptions;
30-
import org.elasticsearch.action.support.master.AcknowledgedResponse;
31-
import org.elasticsearch.client.Client;
32-
import org.elasticsearch.cluster.metadata.DataStream;
33-
import org.elasticsearch.plugins.Plugin;
34-
import org.elasticsearch.rest.RestStatus;
35-
import org.elasticsearch.search.SearchHit;
36-
import org.elasticsearch.snapshots.mockstore.MockRepository;
3739
import org.elasticsearch.xpack.datastreams.DataStreamsPlugin;
3840
import org.hamcrest.Matchers;
3941
import org.junit.After;
@@ -45,13 +47,15 @@
4547
import java.util.List;
4648
import java.util.Map;
4749
import java.util.concurrent.ExecutionException;
50+
import java.util.stream.Collectors;
4851

4952
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
5053
import static org.hamcrest.Matchers.contains;
5154
import static org.hamcrest.Matchers.containsString;
5255
import static org.hamcrest.Matchers.empty;
5356
import static org.hamcrest.Matchers.equalTo;
5457
import static org.hamcrest.Matchers.hasItems;
58+
import static org.hamcrest.Matchers.hasSize;
5559
import static org.hamcrest.Matchers.is;
5660
import static org.hamcrest.Matchers.not;
5761
import static org.hamcrest.Matchers.nullValue;
@@ -145,6 +149,109 @@ public void testSnapshotAndRestore() throws Exception {
145149
assertEquals(DS_BACKING_INDEX_NAME, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName());
146150
}
147151

152+
public void testSnapshotAndRestoreAllDataStreamsInPlace() throws Exception {
153+
CreateSnapshotResponse createSnapshotResponse = client.admin()
154+
.cluster()
155+
.prepareCreateSnapshot(REPO, SNAPSHOT)
156+
.setWaitForCompletion(true)
157+
.setIndices("ds")
158+
.setIncludeGlobalState(false)
159+
.get();
160+
161+
RestStatus status = createSnapshotResponse.getSnapshotInfo().status();
162+
assertEquals(RestStatus.OK, status);
163+
164+
assertEquals(Collections.singletonList(DS_BACKING_INDEX_NAME), getSnapshot(REPO, SNAPSHOT).indices());
165+
166+
// Close all indices:
167+
CloseIndexRequest closeIndexRequest = new CloseIndexRequest("*");
168+
closeIndexRequest.indicesOptions(IndicesOptions.strictExpandHidden());
169+
assertAcked(client.admin().indices().close(closeIndexRequest).actionGet());
170+
171+
RestoreSnapshotResponse restoreSnapshotResponse = client.admin()
172+
.cluster()
173+
.prepareRestoreSnapshot(REPO, SNAPSHOT)
174+
.setWaitForCompletion(true)
175+
.setIndices("ds")
176+
.get();
177+
assertEquals(1, restoreSnapshotResponse.getRestoreInfo().successfulShards());
178+
179+
assertEquals(DOCUMENT_SOURCE, client.prepareGet(DS_BACKING_INDEX_NAME, id).get().getSourceAsMap());
180+
SearchHit[] hits = client.prepareSearch("ds").get().getHits().getHits();
181+
assertEquals(1, hits.length);
182+
assertEquals(DOCUMENT_SOURCE, hits[0].getSourceAsMap());
183+
184+
GetDataStreamAction.Request getDataSteamRequest = new GetDataStreamAction.Request(new String[] { "*" });
185+
GetDataStreamAction.Response ds = client.execute(GetDataStreamAction.INSTANCE, getDataSteamRequest).get();
186+
assertThat(
187+
ds.getDataStreams().stream().map(e -> e.getDataStream().getName()).collect(Collectors.toList()),
188+
contains(equalTo("ds"), equalTo("other-ds"))
189+
);
190+
List<Index> backingIndices = ds.getDataStreams().get(0).getDataStream().getIndices();
191+
assertThat(backingIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(DS_BACKING_INDEX_NAME));
192+
backingIndices = ds.getDataStreams().get(1).getDataStream().getIndices();
193+
String expectedBackingIndexName = DataStream.getDefaultBackingIndexName("other-ds", 1);
194+
assertThat(backingIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(expectedBackingIndexName));
195+
}
196+
197+
public void testSnapshotAndRestoreInPlace() throws Exception {
198+
CreateSnapshotResponse createSnapshotResponse = client.admin()
199+
.cluster()
200+
.prepareCreateSnapshot(REPO, SNAPSHOT)
201+
.setWaitForCompletion(true)
202+
.setIndices("ds")
203+
.setIncludeGlobalState(false)
204+
.get();
205+
206+
RestStatus status = createSnapshotResponse.getSnapshotInfo().status();
207+
assertEquals(RestStatus.OK, status);
208+
209+
assertEquals(Collections.singletonList(DS_BACKING_INDEX_NAME), getSnapshot(REPO, SNAPSHOT).indices());
210+
211+
// A rollover after taking snapshot. The new backing index should be a standalone index after restoring
212+
// and not part of the data stream:
213+
RolloverRequest rolloverRequest = new RolloverRequest("ds", null);
214+
RolloverResponse rolloverResponse = client.admin().indices().rolloverIndex(rolloverRequest).actionGet();
215+
assertThat(rolloverResponse.isRolledOver(), is(true));
216+
assertThat(rolloverResponse.getNewIndex(), equalTo(DataStream.getDefaultBackingIndexName("ds", 2)));
217+
218+
// Close all backing indices of ds data stream:
219+
CloseIndexRequest closeIndexRequest = new CloseIndexRequest(".ds-ds-*");
220+
closeIndexRequest.indicesOptions(IndicesOptions.strictExpandHidden());
221+
assertAcked(client.admin().indices().close(closeIndexRequest).actionGet());
222+
223+
RestoreSnapshotResponse restoreSnapshotResponse = client.admin()
224+
.cluster()
225+
.prepareRestoreSnapshot(REPO, SNAPSHOT)
226+
.setWaitForCompletion(true)
227+
.setIndices("ds")
228+
.get();
229+
assertEquals(1, restoreSnapshotResponse.getRestoreInfo().successfulShards());
230+
231+
assertEquals(DOCUMENT_SOURCE, client.prepareGet(DS_BACKING_INDEX_NAME, id).get().getSourceAsMap());
232+
SearchHit[] hits = client.prepareSearch("ds").get().getHits().getHits();
233+
assertEquals(1, hits.length);
234+
assertEquals(DOCUMENT_SOURCE, hits[0].getSourceAsMap());
235+
236+
GetDataStreamAction.Request getDataSteamRequest = new GetDataStreamAction.Request(new String[] { "ds" });
237+
GetDataStreamAction.Response ds = client.execute(GetDataStreamAction.INSTANCE, getDataSteamRequest).actionGet();
238+
assertThat(
239+
ds.getDataStreams().stream().map(e -> e.getDataStream().getName()).collect(Collectors.toList()),
240+
contains(equalTo("ds"))
241+
);
242+
List<Index> backingIndices = ds.getDataStreams().get(0).getDataStream().getIndices();
243+
assertThat(ds.getDataStreams().get(0).getDataStream().getIndices(), hasSize(1));
244+
assertThat(backingIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(equalTo(DS_BACKING_INDEX_NAME)));
245+
246+
// The backing index created as part of rollover should still exist (but just not part of the data stream)
247+
assertThat(indexExists(DataStream.getDefaultBackingIndexName("ds", 2)), is(true));
248+
// An additional rollover should create a new backing index (3th generation) and leave .ds-ds-...-2 index as is:
249+
rolloverRequest = new RolloverRequest("ds", null);
250+
rolloverResponse = client.admin().indices().rolloverIndex(rolloverRequest).actionGet();
251+
assertThat(rolloverResponse.isRolledOver(), is(true));
252+
assertThat(rolloverResponse.getNewIndex(), equalTo(DataStream.getDefaultBackingIndexName("ds", 3)));
253+
}
254+
148255
public void testSnapshotAndRestoreAll() throws Exception {
149256
CreateSnapshotResponse createSnapshotResponse = client.admin()
150257
.cluster()

x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/xpack/frozen/action/TransportFreezeIndexAction.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222
import org.elasticsearch.cluster.block.ClusterBlockException;
2323
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2424
import org.elasticsearch.cluster.block.ClusterBlocks;
25+
import org.elasticsearch.cluster.metadata.IndexAbstraction;
2526
import org.elasticsearch.cluster.metadata.IndexMetadata;
2627
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2728
import org.elasticsearch.cluster.metadata.Metadata;
2829
import org.elasticsearch.cluster.metadata.MetadataIndexStateService;
2930
import org.elasticsearch.cluster.service.ClusterService;
3031
import org.elasticsearch.common.Priority;
32+
import org.elasticsearch.common.Strings;
3133
import org.elasticsearch.common.inject.Inject;
3234
import org.elasticsearch.common.settings.Settings;
3335
import org.elasticsearch.index.Index;
@@ -42,6 +44,7 @@
4244

4345
import java.util.ArrayList;
4446
import java.util.List;
47+
import java.util.SortedMap;
4548

4649
public final class TransportFreezeIndexAction extends
4750
TransportMasterNodeAction<FreezeRequest, FreezeResponse> {
@@ -133,6 +136,20 @@ private void toggleFrozenSettings(final Index[] concreteIndices, final FreezeReq
133136
})) {
134137
@Override
135138
public ClusterState execute(ClusterState currentState) {
139+
List<String> writeIndices = new ArrayList<>();
140+
SortedMap<String, IndexAbstraction> lookup = currentState.metadata().getIndicesLookup();
141+
for (Index index : concreteIndices) {
142+
IndexAbstraction ia = lookup.get(index.getName());
143+
if (ia != null && ia.getParentDataStream() != null &&
144+
ia.getParentDataStream().getWriteIndex().getIndex().equals(index)) {
145+
writeIndices.add(index.getName());
146+
}
147+
}
148+
if (writeIndices.size() > 0) {
149+
throw new IllegalArgumentException("cannot freeze the following data stream write indices [" +
150+
Strings.collectionToCommaDelimitedString(writeIndices) + "]");
151+
}
152+
136153
final Metadata.Builder builder = Metadata.builder(currentState.metadata());
137154
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
138155
for (Index index : concreteIndices) {

x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/data_stream/20_unsupported_apis.yml

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -142,39 +142,6 @@
142142
name: simple-data-stream1
143143
- is_true: acknowledged
144144

145-
---
146-
"Close write index for data stream fails":
147-
- skip:
148-
version: " - 7.8.99"
149-
reason: "data streams only supported in 7.9+"
150-
features: allowed_warnings
151-
152-
- do:
153-
allowed_warnings:
154-
- "index template [my-template1] has index patterns [simple-data-stream1] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template1] will take precedence during new index creation"
155-
indices.put_index_template:
156-
name: my-template1
157-
body:
158-
index_patterns: [simple-data-stream1]
159-
data_stream: {}
160-
161-
- do:
162-
indices.create_data_stream:
163-
name: simple-data-stream1
164-
- is_true: acknowledged
165-
166-
- do:
167-
catch: bad_request
168-
indices.close:
169-
index: ".ds-simple-data-stream1-*000001"
170-
allowed_warnings:
171-
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"
172-
173-
- do:
174-
indices.delete_data_stream:
175-
name: simple-data-stream1
176-
- is_true: acknowledged
177-
178145
---
179146
"Prohibit split on data stream's write index":
180147
- skip:

0 commit comments

Comments
 (0)