Skip to content

Commit a5d9939

Browse files
authored
Check again on-going snapshots/restores of indices before closing (#43873)
Today we prevent any index that is actively snapshotted or restored to be closed. This verification is done during the execution of the first phase of index closing (ie before blocking the indices). We should also do this verification again in the last phase of index closing (ie after the shard sanity checks and right before actually changing the index state and the routing table) because a snapshot/restore could sneak in while the shards are verified-before-close.
1 parent 65cbe51 commit a5d9939

File tree

5 files changed

+119
-76
lines changed

5 files changed

+119
-76
lines changed

server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexService.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.common.util.set.Sets;
3939
import org.elasticsearch.index.Index;
4040
import org.elasticsearch.snapshots.RestoreService;
41+
import org.elasticsearch.snapshots.SnapshotInProgressException;
4142
import org.elasticsearch.snapshots.SnapshotsService;
4243

4344
import java.util.Arrays;
@@ -90,9 +91,15 @@ public ClusterState execute(final ClusterState currentState) {
9091
*/
9192
public ClusterState deleteIndices(ClusterState currentState, Set<Index> indices) {
9293
final MetaData meta = currentState.metaData();
93-
final Set<IndexMetaData> metaDatas = indices.stream().map(i -> meta.getIndexSafe(i)).collect(toSet());
94+
final Set<Index> indicesToDelete = indices.stream().map(i -> meta.getIndexSafe(i).getIndex()).collect(toSet());
95+
9496
// Check if index deletion conflicts with any running snapshots
95-
SnapshotsService.checkIndexDeletion(currentState, metaDatas);
97+
Set<Index> snapshottingIndices = SnapshotsService.snapshottingIndices(currentState, indicesToDelete);
98+
if (snapshottingIndices.isEmpty() == false) {
99+
throw new SnapshotInProgressException("Cannot delete indices that are being snapshotted: " + snapshottingIndices +
100+
". Try again after snapshot finishes or cancel the currently running snapshot.");
101+
}
102+
96103
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable());
97104
MetaData.Builder metaDataBuilder = MetaData.builder(meta);
98105
ClusterBlocks.Builder clusterBlocksBuilder = ClusterBlocks.builder().blocks(currentState.blocks());

server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import org.elasticsearch.indices.IndicesService;
6969
import org.elasticsearch.rest.RestStatus;
7070
import org.elasticsearch.snapshots.RestoreService;
71+
import org.elasticsearch.snapshots.SnapshotInProgressException;
7172
import org.elasticsearch.snapshots.SnapshotsService;
7273
import org.elasticsearch.tasks.TaskId;
7374
import org.elasticsearch.threadpool.ThreadPool;
@@ -86,6 +87,7 @@
8687
import java.util.function.Consumer;
8788
import java.util.stream.Collectors;
8889

90+
import static java.util.Collections.singleton;
8991
import static java.util.Collections.unmodifiableMap;
9092

9193
/**
@@ -230,11 +232,11 @@ static ClusterState addIndexClosedBlocks(final Index[] indices, final Map<Index,
230232
final ClusterState currentState) {
231233
final MetaData.Builder metadata = MetaData.builder(currentState.metaData());
232234

233-
final Set<IndexMetaData> indicesToClose = new HashSet<>();
235+
final Set<Index> indicesToClose = new HashSet<>();
234236
for (Index index : indices) {
235237
final IndexMetaData indexMetaData = metadata.getSafe(index);
236238
if (indexMetaData.getState() != IndexMetaData.State.CLOSE) {
237-
indicesToClose.add(indexMetaData);
239+
indicesToClose.add(index);
238240
} else {
239241
logger.debug("index {} is already closed, ignoring", index);
240242
assert currentState.blocks().hasIndexBlock(index.getName(), INDEX_CLOSED_BLOCK);
@@ -246,16 +248,22 @@ static ClusterState addIndexClosedBlocks(final Index[] indices, final Map<Index,
246248
}
247249

248250
// Check if index closing conflicts with any running restores
249-
RestoreService.checkIndexClosing(currentState, indicesToClose);
251+
Set<Index> restoringIndices = RestoreService.restoringIndices(currentState, indicesToClose);
252+
if (restoringIndices.isEmpty() == false) {
253+
throw new IllegalArgumentException("Cannot close indices that are being restored: " + restoringIndices);
254+
}
255+
250256
// Check if index closing conflicts with any running snapshots
251-
SnapshotsService.checkIndexClosing(currentState, indicesToClose);
257+
Set<Index> snapshottingIndices = SnapshotsService.snapshottingIndices(currentState, indicesToClose);
258+
if (snapshottingIndices.isEmpty() == false) {
259+
throw new SnapshotInProgressException("Cannot close indices that are being snapshotted: " + snapshottingIndices +
260+
". Try again after snapshot finishes or cancel the currently running snapshot.");
261+
}
252262

253263
final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
254264
final RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable());
255265

256-
for (IndexMetaData indexToClose : indicesToClose) {
257-
final Index index = indexToClose.getIndex();
258-
266+
for (Index index : indicesToClose) {
259267
ClusterBlock indexBlock = null;
260268
final Set<ClusterBlock> clusterBlocks = currentState.blocks().indices().get(index.getName());
261269
if (clusterBlocks != null) {
@@ -453,6 +461,24 @@ static Tuple<ClusterState, Collection<IndexResult>> closeRoutingTable(final Clus
453461
continue;
454462
}
455463

464+
// Check if index closing conflicts with any running restores
465+
Set<Index> restoringIndices = RestoreService.restoringIndices(currentState, singleton(index));
466+
if (restoringIndices.isEmpty() == false) {
467+
closingResults.put(result.getKey(), new IndexResult(result.getKey(), new IllegalStateException(
468+
"verification of shards before closing " + index + " succeeded but index is being restored in the meantime")));
469+
logger.debug("verification of shards before closing {} succeeded but index is being restored in the meantime", index);
470+
continue;
471+
}
472+
473+
// Check if index closing conflicts with any running snapshots
474+
Set<Index> snapshottingIndices = SnapshotsService.snapshottingIndices(currentState, singleton(index));
475+
if (snapshottingIndices.isEmpty() == false) {
476+
closingResults.put(result.getKey(), new IndexResult(result.getKey(), new IllegalStateException(
477+
"verification of shards before closing " + index + " succeeded but index is being snapshot in the meantime")));
478+
logger.debug("verification of shards before closing {} succeeded but index is being snapshot in the meantime", index);
479+
continue;
480+
}
481+
456482
blocks.removeIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID);
457483
blocks.addIndexBlock(index.getName(), INDEX_CLOSED_BLOCK);
458484
final IndexMetaData.Builder updatedMetaData = IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE);

server/src/main/java/org/elasticsearch/snapshots/RestoreService.java

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
import java.util.function.Predicate;
8484
import java.util.stream.Collectors;
8585

86+
import static java.util.Collections.emptySet;
8687
import static java.util.Collections.unmodifiableSet;
8788
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS;
8889
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE;
@@ -857,30 +858,26 @@ private static boolean failed(SnapshotInfo snapshot, String index) {
857858
}
858859

859860
/**
860-
* Check if any of the indices to be closed are currently being restored from a snapshot and fail closing if such an index
861-
* is found as closing an index that is being restored makes the index unusable (it cannot be recovered).
861+
* Returns the indices that are currently being restored and that are contained in the indices-to-check set.
862862
*/
863-
public static void checkIndexClosing(ClusterState currentState, Set<IndexMetaData> indices) {
864-
RestoreInProgress restore = currentState.custom(RestoreInProgress.TYPE);
865-
if (restore != null) {
866-
Set<Index> indicesToFail = null;
867-
for (RestoreInProgress.Entry entry : restore) {
868-
for (ObjectObjectCursor<ShardId, RestoreInProgress.ShardRestoreStatus> shard : entry.shards()) {
869-
if (!shard.value.state().completed()) {
870-
IndexMetaData indexMetaData = currentState.metaData().index(shard.key.getIndex());
871-
if (indexMetaData != null && indices.contains(indexMetaData)) {
872-
if (indicesToFail == null) {
873-
indicesToFail = new HashSet<>();
874-
}
875-
indicesToFail.add(shard.key.getIndex());
876-
}
877-
}
863+
public static Set<Index> restoringIndices(final ClusterState currentState, final Set<Index> indicesToCheck) {
864+
final RestoreInProgress restore = currentState.custom(RestoreInProgress.TYPE);
865+
if (restore == null) {
866+
return emptySet();
867+
}
868+
869+
final Set<Index> indices = new HashSet<>();
870+
for (RestoreInProgress.Entry entry : restore) {
871+
for (ObjectObjectCursor<ShardId, RestoreInProgress.ShardRestoreStatus> shard : entry.shards()) {
872+
Index index = shard.key.getIndex();
873+
if (indicesToCheck.contains(index)
874+
&& shard.value.state().completed() == false
875+
&& currentState.getMetaData().index(index) != null) {
876+
indices.add(index);
878877
}
879878
}
880-
if (indicesToFail != null) {
881-
throw new IllegalArgumentException("Cannot close indices that are being restored: " + indicesToFail);
882-
}
883879
}
880+
return indices;
884881
}
885882

886883
@Override

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 23 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
import java.util.stream.Collectors;
8787
import java.util.stream.StreamSupport;
8888

89+
import static java.util.Collections.emptySet;
8990
import static java.util.Collections.unmodifiableList;
9091
import static java.util.Collections.unmodifiableMap;
9192
import static org.elasticsearch.cluster.SnapshotsInProgress.completed;
@@ -1418,62 +1419,37 @@ private static ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus
14181419
}
14191420

14201421
/**
1421-
* Check if any of the indices to be deleted are currently being snapshotted. Fail as deleting an index that is being
1422-
* snapshotted (with partial == false) makes the snapshot fail.
1422+
* Returns the indices that are currently being snapshotted (with partial == false) and that are contained in the indices-to-check set.
14231423
*/
1424-
public static void checkIndexDeletion(ClusterState currentState, Set<IndexMetaData> indices) {
1425-
Set<Index> indicesToFail = indicesToFailForCloseOrDeletion(currentState, indices);
1426-
if (indicesToFail != null) {
1427-
throw new SnapshotInProgressException("Cannot delete indices that are being snapshotted: " + indicesToFail +
1428-
". Try again after snapshot finishes or cancel the currently running snapshot.");
1424+
public static Set<Index> snapshottingIndices(final ClusterState currentState, final Set<Index> indicesToCheck) {
1425+
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
1426+
if (snapshots == null) {
1427+
return emptySet();
14291428
}
1430-
}
1431-
1432-
/**
1433-
* Check if any of the indices to be closed are currently being snapshotted. Fail as closing an index that is being
1434-
* snapshotted (with partial == false) makes the snapshot fail.
1435-
*/
1436-
public static void checkIndexClosing(ClusterState currentState, Set<IndexMetaData> indices) {
1437-
Set<Index> indicesToFail = indicesToFailForCloseOrDeletion(currentState, indices);
1438-
if (indicesToFail != null) {
1439-
throw new SnapshotInProgressException("Cannot close indices that are being snapshotted: " + indicesToFail +
1440-
". Try again after snapshot finishes or cancel the currently running snapshot.");
1441-
}
1442-
}
14431429

1444-
private static Set<Index> indicesToFailForCloseOrDeletion(ClusterState currentState, Set<IndexMetaData> indices) {
1445-
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
1446-
Set<Index> indicesToFail = null;
1447-
if (snapshots != null) {
1448-
for (final SnapshotsInProgress.Entry entry : snapshots.entries()) {
1449-
if (entry.partial() == false) {
1450-
if (entry.state() == State.INIT) {
1451-
for (IndexId index : entry.indices()) {
1452-
IndexMetaData indexMetaData = currentState.metaData().index(index.getName());
1453-
if (indexMetaData != null && indices.contains(indexMetaData)) {
1454-
if (indicesToFail == null) {
1455-
indicesToFail = new HashSet<>();
1456-
}
1457-
indicesToFail.add(indexMetaData.getIndex());
1458-
}
1430+
final Set<Index> indices = new HashSet<>();
1431+
for (final SnapshotsInProgress.Entry entry : snapshots.entries()) {
1432+
if (entry.partial() == false) {
1433+
if (entry.state() == State.INIT) {
1434+
for (IndexId index : entry.indices()) {
1435+
IndexMetaData indexMetaData = currentState.metaData().index(index.getName());
1436+
if (indexMetaData != null && indicesToCheck.contains(indexMetaData.getIndex())) {
1437+
indices.add(indexMetaData.getIndex());
14591438
}
1460-
} else {
1461-
for (ObjectObjectCursor<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shard : entry.shards()) {
1462-
if (!shard.value.state().completed()) {
1463-
IndexMetaData indexMetaData = currentState.metaData().index(shard.key.getIndex());
1464-
if (indexMetaData != null && indices.contains(indexMetaData)) {
1465-
if (indicesToFail == null) {
1466-
indicesToFail = new HashSet<>();
1467-
}
1468-
indicesToFail.add(shard.key.getIndex());
1469-
}
1470-
}
1439+
}
1440+
} else {
1441+
for (ObjectObjectCursor<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shard : entry.shards()) {
1442+
Index index = shard.key.getIndex();
1443+
if (indicesToCheck.contains(index)
1444+
&& shard.value.state().completed() == false
1445+
&& currentState.getMetaData().index(index) != null) {
1446+
indices.add(index);
14711447
}
14721448
}
14731449
}
14741450
}
14751451
}
1476-
return indicesToFail;
1452+
return indices;
14771453
}
14781454

14791455
/**

server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363

6464
import static java.util.Collections.emptyMap;
6565
import static java.util.Collections.emptySet;
66+
import static java.util.Collections.singletonMap;
6667
import static java.util.Collections.unmodifiableMap;
6768
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
6869
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
@@ -124,6 +125,42 @@ public void testCloseRoutingTable() {
124125
}
125126
}
126127

128+
public void testCloseRoutingTableWithRestoredIndex() {
129+
ClusterState state = ClusterState.builder(new ClusterName("testCloseRoutingTableWithRestoredIndex")).build();
130+
131+
String indexName = "restored-index";
132+
ClusterBlock block = MetaDataIndexStateService.createIndexClosingBlock();
133+
state = addRestoredIndex(indexName, randomIntBetween(1, 5), randomIntBetween(0, 5), state);
134+
state = ClusterState.builder(state)
135+
.blocks(ClusterBlocks.builder().blocks(state.blocks()).addIndexBlock(indexName, block))
136+
.build();
137+
138+
final Index index = state.metaData().index(indexName).getIndex();
139+
final ClusterState updatedState =
140+
MetaDataIndexStateService.closeRoutingTable(state, singletonMap(index, block), singletonMap(index, new IndexResult(index)))
141+
.v1();
142+
assertIsOpened(index.getName(), updatedState);
143+
assertThat(updatedState.blocks().hasIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID), is(true));
144+
}
145+
146+
public void testCloseRoutingTableWithSnapshottedIndex() {
147+
ClusterState state = ClusterState.builder(new ClusterName("testCloseRoutingTableWithSnapshottedIndex")).build();
148+
149+
String indexName = "snapshotted-index";
150+
ClusterBlock block = MetaDataIndexStateService.createIndexClosingBlock();
151+
state = addSnapshotIndex(indexName, randomIntBetween(1, 5), randomIntBetween(0, 5), state);
152+
state = ClusterState.builder(state)
153+
.blocks(ClusterBlocks.builder().blocks(state.blocks()).addIndexBlock(indexName, block))
154+
.build();
155+
156+
final Index index = state.metaData().index(indexName).getIndex();
157+
final ClusterState updatedState =
158+
MetaDataIndexStateService.closeRoutingTable(state, singletonMap(index, block), singletonMap(index, new IndexResult(index)))
159+
.v1();
160+
assertIsOpened(index.getName(), updatedState);
161+
assertThat(updatedState.blocks().hasIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID), is(true));
162+
}
163+
127164
public void testCloseRoutingTableRemovesRoutingTable() {
128165
final Set<Index> nonBlockedIndices = new HashSet<>();
129166
final Map<Index, ClusterBlock> blockedIndices = new HashMap<>();

0 commit comments

Comments
 (0)