Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions docs/reference/indices/open-close.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,6 @@ Closed indices consume a significant amount of disk-space which can cause
problems in managed environments. Closing indices can be disabled via the cluster settings
API by setting `cluster.indices.close.enable` to `false`. The default is `true`.

The current write index on a data stream cannot be closed. In order to close
the current write index, the data stream must first be
<<data-streams-rollover,rolled over>> so that a new write index is created
and then the previous write index can be closed.

// end::closed-index[]

[[open-index-api-wait-for-active-shards]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ POST /_snapshot/my_repository/my_snapshot/_restore

Use the restore snapshot API to restore a snapshot of a cluster, including all data streams and indices in the snapshot. If you do not want to restore the entire snapshot, you can select specific data streams or indices to restore.

NOTE: You cannot restore a data stream if a stream with the same name already
exists.

You can run the restore operation on a cluster that contains an elected
<<master-node,master node>> and has data nodes with enough capacity to accommodate the snapshot
you are restoring. Existing indices can only be restored if they are
Expand Down
3 changes: 0 additions & 3 deletions docs/reference/snapshot-restore/restore-snapshot.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,6 @@ new indices if they didn't exist in the cluster.
If a data stream is restored, its backing indices are also restored. The restore
operation automatically opens restored backing indices if they were closed.

NOTE: You cannot restore a data stream if a stream with the same name already
exists.

In addition to entire data streams, you can restore only specific backing
indices from a snapshot. However, restored backing indices are not automatically
added to any existing data streams. For example, if only the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,18 +135,6 @@ public void closeIndices(final CloseIndexClusterStateUpdateRequest request, fina
if (concreteIndices == null || concreteIndices.length == 0) {
throw new IllegalArgumentException("Index name is required");
}
List<String> writeIndices = new ArrayList<>();
SortedMap<String, IndexAbstraction> lookup = clusterService.state().metadata().getIndicesLookup();
for (Index index : concreteIndices) {
IndexAbstraction ia = lookup.get(index.getName());
if (ia != null && ia.getParentDataStream() != null && ia.getParentDataStream().getWriteIndex().getIndex().equals(index)) {
writeIndices.add(index.getName());
}
}
if (writeIndices.size() > 0) {
throw new IllegalArgumentException("cannot close the following data stream write indices [" +
Strings.collectionToCommaDelimitedString(writeIndices) + "]");
}

clusterService.submitStateUpdateTask("add-block-index-to-close " + Arrays.toString(concreteIndices),
new ClusterStateUpdateTask(Priority.URGENT, request.masterNodeTimeout()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.elasticsearch.cluster.metadata;

import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse.IndexResult;
import org.elasticsearch.cluster.ClusterName;
Expand All @@ -27,11 +26,8 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
Expand All @@ -43,15 +39,11 @@
import org.elasticsearch.snapshots.SnapshotInfoTests;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.hamcrest.CoreMatchers;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -72,8 +64,6 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class MetadataIndexStateServiceTests extends ESTestCase {

Expand Down Expand Up @@ -379,35 +369,6 @@ public void testCloseFailedIfBlockDisappeared() {
assertThat(failedIndices, equalTo(disappearedIndices));
}

public void testCloseCurrentWriteIndexForDataStream() {
int numDataStreams = randomIntBetween(1, 3);
List<Tuple<String, Integer>> dataStreamsToCreate = new ArrayList<>();
List<String> writeIndices = new ArrayList<>();
for (int k = 0; k < numDataStreams; k++) {
String dataStreamName = randomAlphaOfLength(6).toLowerCase(Locale.ROOT);
int numBackingIndices = randomIntBetween(1, 5);
dataStreamsToCreate.add(new Tuple<>(dataStreamName, numBackingIndices));
writeIndices.add(DataStream.getDefaultBackingIndexName(dataStreamName, numBackingIndices));
}
ClusterState cs = DataStreamTestHelper.getClusterStateWithDataStreams(dataStreamsToCreate,
org.elasticsearch.common.collect.List.of());

ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(cs);

List<String> indicesToDelete = randomSubsetOf(randomIntBetween(1, numDataStreams), writeIndices);
Index[] indicesToDeleteArray = new Index[indicesToDelete.size()];
for (int k = 0; k < indicesToDelete.size(); k++) {
Index indexToDelete = cs.metadata().index(indicesToDelete.get(k)).getIndex();
indicesToDeleteArray[k] = indexToDelete;
}
MetadataIndexStateService service = new MetadataIndexStateService(clusterService, null, null, null, null, null, null, null);
CloseIndexClusterStateUpdateRequest request = new CloseIndexClusterStateUpdateRequest(0L).indices(indicesToDeleteArray);
Exception e = expectThrows(IllegalArgumentException.class, () -> service.closeIndices(request, null));
assertThat(e.getMessage(), CoreMatchers.containsString("cannot close the following data stream write indices [" +
Strings.collectionToCommaDelimitedString(indicesToDelete) + "]"));
}

public static ClusterState addOpenedIndex(final String index, final int numShards, final int numReplicas, final ClusterState state) {
return addIndex(state, index, numShards, numReplicas, IndexMetadata.State.OPEN, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,32 @@
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.SnapshotInProgressException;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotRestoreException;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.xpack.core.action.CreateDataStreamAction;
import org.elasticsearch.xpack.core.action.DeleteDataStreamAction;
import org.elasticsearch.xpack.core.action.GetDataStreamAction;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.collect.List;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.index.Index;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.SnapshotInProgressException;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotRestoreException;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.core.action.CreateDataStreamAction;
import org.elasticsearch.xpack.core.action.DeleteDataStreamAction;
import org.elasticsearch.xpack.core.action.GetDataStreamAction;
import org.elasticsearch.xpack.datastreams.DataStreamsPlugin;
import org.hamcrest.Matchers;
import org.junit.After;
Expand All @@ -46,13 +48,15 @@
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
Expand Down Expand Up @@ -150,6 +154,109 @@ public void testSnapshotAndRestore() throws Exception {
assertEquals(DS_BACKING_INDEX_NAME, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName());
}

public void testSnapshotAndRestoreAllDataStreamsInPlace() throws Exception {
CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot(REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIndices("ds")
.setIncludeGlobalState(false)
.get();

RestStatus status = createSnapshotResponse.getSnapshotInfo().status();
assertEquals(RestStatus.OK, status);

assertEquals(Collections.singletonList(DS_BACKING_INDEX_NAME), getSnapshot(REPO, SNAPSHOT).indices());

// Close all indices:
CloseIndexRequest closeIndexRequest = new CloseIndexRequest("*");
closeIndexRequest.indicesOptions(IndicesOptions.strictExpandHidden());
assertAcked(client.admin().indices().close(closeIndexRequest).actionGet());

RestoreSnapshotResponse restoreSnapshotResponse = client.admin()
.cluster()
.prepareRestoreSnapshot(REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIndices("ds")
.get();
assertEquals(1, restoreSnapshotResponse.getRestoreInfo().successfulShards());

assertEquals(DOCUMENT_SOURCE, client.prepareGet(DS_BACKING_INDEX_NAME, "_doc", id).get().getSourceAsMap());
SearchHit[] hits = client.prepareSearch("ds").get().getHits().getHits();
assertEquals(1, hits.length);
assertEquals(DOCUMENT_SOURCE, hits[0].getSourceAsMap());

GetDataStreamAction.Request getDataSteamRequest = new GetDataStreamAction.Request(new String[] { "*" });
GetDataStreamAction.Response ds = client.execute(GetDataStreamAction.INSTANCE, getDataSteamRequest).get();
assertThat(
ds.getDataStreams().stream().map(e -> e.getDataStream().getName()).collect(Collectors.toList()),
contains(equalTo("ds"), equalTo("other-ds"))
);
java.util.List<Index> backingIndices = ds.getDataStreams().get(0).getDataStream().getIndices();
assertThat(backingIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(DS_BACKING_INDEX_NAME));
backingIndices = ds.getDataStreams().get(1).getDataStream().getIndices();
String expectedBackingIndexName = DataStream.getDefaultBackingIndexName("other-ds", 1);
assertThat(backingIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(expectedBackingIndexName));
}

public void testSnapshotAndRestoreInPlace() throws Exception {
CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot(REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIndices("ds")
.setIncludeGlobalState(false)
.get();

RestStatus status = createSnapshotResponse.getSnapshotInfo().status();
assertEquals(RestStatus.OK, status);

assertEquals(Collections.singletonList(DS_BACKING_INDEX_NAME), getSnapshot(REPO, SNAPSHOT).indices());

// A rollover after taking snapshot. The new backing index should be a standalone index after restoring
// and not part of the data stream:
RolloverRequest rolloverRequest = new RolloverRequest("ds", null);
RolloverResponse rolloverResponse = client.admin().indices().rolloverIndex(rolloverRequest).actionGet();
assertThat(rolloverResponse.isRolledOver(), is(true));
assertThat(rolloverResponse.getNewIndex(), equalTo(DataStream.getDefaultBackingIndexName("ds", 2)));

// Close all backing indices of ds data stream:
CloseIndexRequest closeIndexRequest = new CloseIndexRequest(".ds-ds-*");
closeIndexRequest.indicesOptions(IndicesOptions.strictExpandHidden());
assertAcked(client.admin().indices().close(closeIndexRequest).actionGet());

RestoreSnapshotResponse restoreSnapshotResponse = client.admin()
.cluster()
.prepareRestoreSnapshot(REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIndices("ds")
.get();
assertEquals(1, restoreSnapshotResponse.getRestoreInfo().successfulShards());

assertEquals(DOCUMENT_SOURCE, client.prepareGet(DS_BACKING_INDEX_NAME, "_doc", id).get().getSourceAsMap());
SearchHit[] hits = client.prepareSearch("ds").get().getHits().getHits();
assertEquals(1, hits.length);
assertEquals(DOCUMENT_SOURCE, hits[0].getSourceAsMap());

GetDataStreamAction.Request getDataSteamRequest = new GetDataStreamAction.Request(new String[] { "ds" });
GetDataStreamAction.Response ds = client.execute(GetDataStreamAction.INSTANCE, getDataSteamRequest).actionGet();
assertThat(
ds.getDataStreams().stream().map(e -> e.getDataStream().getName()).collect(Collectors.toList()),
contains(equalTo("ds"))
);
java.util.List<Index> backingIndices = ds.getDataStreams().get(0).getDataStream().getIndices();
assertThat(ds.getDataStreams().get(0).getDataStream().getIndices(), hasSize(1));
assertThat(backingIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(equalTo(DS_BACKING_INDEX_NAME)));

// The backing index created as part of rollover should still exist (but just not part of the data stream)
assertThat(indexExists(DataStream.getDefaultBackingIndexName("ds", 2)), is(true));
// An additional rollover should create a new backing index (3th generation) and leave .ds-ds-...-2 index as is:
rolloverRequest = new RolloverRequest("ds", null);
rolloverResponse = client.admin().indices().rolloverIndex(rolloverRequest).actionGet();
assertThat(rolloverResponse.isRolledOver(), is(true));
assertThat(rolloverResponse.getNewIndex(), equalTo(DataStream.getDefaultBackingIndexName("ds", 3)));
}

public void testSnapshotAndRestoreAll() throws Exception {
CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.MetadataIndexStateService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
Expand All @@ -42,6 +44,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.SortedMap;

public final class TransportFreezeIndexAction extends
TransportMasterNodeAction<FreezeRequest, FreezeResponse> {
Expand Down Expand Up @@ -139,6 +142,20 @@ private void toggleFrozenSettings(final Index[] concreteIndices, final FreezeReq
})) {
@Override
public ClusterState execute(ClusterState currentState) {
List<String> writeIndices = new ArrayList<>();
SortedMap<String, IndexAbstraction> lookup = currentState.metadata().getIndicesLookup();
for (Index index : concreteIndices) {
IndexAbstraction ia = lookup.get(index.getName());
if (ia != null && ia.getParentDataStream() != null &&
ia.getParentDataStream().getWriteIndex().getIndex().equals(index)) {
writeIndices.add(index.getName());
}
}
if (writeIndices.size() > 0) {
throw new IllegalArgumentException("cannot freeze the following data stream write indices [" +
Strings.collectionToCommaDelimitedString(writeIndices) + "]");
}

final Metadata.Builder builder = Metadata.builder(currentState.metadata());
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
for (Index index : concreteIndices) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,39 +142,6 @@
name: simple-data-stream1
- is_true: acknowledged

---
"Close write index for data stream fails":
- skip:
version: " - 7.8.99"
reason: "data streams only supported in 7.9+"
features: allowed_warnings

- do:
allowed_warnings:
- "index template [my-template1] has index patterns [simple-data-stream1] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template1] will take precedence during new index creation"
indices.put_index_template:
name: my-template1
body:
index_patterns: [simple-data-stream1]
data_stream: {}

- do:
indices.create_data_stream:
name: simple-data-stream1
- is_true: acknowledged

- do:
catch: bad_request
indices.close:
index: ".ds-simple-data-stream1-*000001"
allowed_warnings:
- "the default value for the ?wait_for_active_shards parameter will change from '0' to 'index-setting' in version 8; specify '?wait_for_active_shards=index-setting' to adopt the future default behaviour, or '?wait_for_active_shards=0' to preserve today's behaviour"

- do:
indices.delete_data_stream:
name: simple-data-stream1
- is_true: acknowledged

---
"Prohibit split on data stream's write index":
- skip:
Expand Down