Skip to content

Commit 04ce0e7

Browse files
authored
Avoid concurrent snapshot finalizations when deleting an INIT snapshot (#28078)
This commit removes the finalization of a snapshot by the snapshot deletion request. This way, the deletion marks the snapshot as ABORTED in cluster state and waits for the snapshot completion. It is the responsability of the snapshot execution to detect the abortion and terminates itself correctly. This avoids concurrent snapshot finalizations and also ordinates the operations: the deletion aborts the snapshot and waits for the snapshot completion, the creation detects the abortion and stops by itself and finalizes the snapshot, then the deletion resumes and continues the deletion process.
1 parent fd45a46 commit 04ce0e7

File tree

2 files changed

+48
-31
lines changed

2 files changed

+48
-31
lines changed

core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 45 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -372,26 +372,32 @@ private void beginSnapshot(final ClusterState clusterState,
372372
return;
373373
}
374374
clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() {
375-
boolean accepted = false;
376-
SnapshotsInProgress.Entry updatedSnapshot;
375+
376+
SnapshotsInProgress.Entry endSnapshot;
377377
String failure = null;
378378

379379
@Override
380380
public ClusterState execute(ClusterState currentState) {
381381
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
382382
List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
383383
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
384-
if (entry.snapshot().equals(snapshot.snapshot()) && entry.state() != State.ABORTED) {
385-
// Replace the snapshot that was just created
384+
if (entry.snapshot().equals(snapshot.snapshot()) == false) {
385+
entries.add(entry);
386+
continue;
387+
}
388+
389+
if (entry.state() != State.ABORTED) {
390+
// Replace the snapshot that was just intialized
386391
ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards = shards(currentState, entry.indices());
387392
if (!partial) {
388393
Tuple<Set<String>, Set<String>> indicesWithMissingShards = indicesWithMissingShards(shards, currentState.metaData());
389394
Set<String> missing = indicesWithMissingShards.v1();
390395
Set<String> closed = indicesWithMissingShards.v2();
391396
if (missing.isEmpty() == false || closed.isEmpty() == false) {
392-
StringBuilder failureMessage = new StringBuilder();
393-
updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.FAILED, shards);
394-
entries.add(updatedSnapshot);
397+
endSnapshot = new SnapshotsInProgress.Entry(entry, State.FAILED, shards);
398+
entries.add(endSnapshot);
399+
400+
final StringBuilder failureMessage = new StringBuilder();
395401
if (missing.isEmpty() == false) {
396402
failureMessage.append("Indices don't have primary shards ");
397403
failureMessage.append(missing);
@@ -407,13 +413,16 @@ public ClusterState execute(ClusterState currentState) {
407413
continue;
408414
}
409415
}
410-
updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.STARTED, shards);
416+
SnapshotsInProgress.Entry updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.STARTED, shards);
411417
entries.add(updatedSnapshot);
412-
if (!completed(shards.values())) {
413-
accepted = true;
418+
if (completed(shards.values())) {
419+
endSnapshot = updatedSnapshot;
414420
}
415421
} else {
416-
entries.add(entry);
422+
assert entry.state() == State.ABORTED : "expecting snapshot to be aborted during initialization";
423+
failure = "snapshot was aborted during initialization";
424+
endSnapshot = entry;
425+
entries.add(endSnapshot);
417426
}
418427
}
419428
return ClusterState.builder(currentState)
@@ -448,8 +457,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
448457
// We should end snapshot only if 1) we didn't accept it for processing (which happens when there
449458
// is nothing to do) and 2) there was a snapshot in metadata that we should end. Otherwise we should
450459
// go ahead and continue working on this snapshot rather then end here.
451-
if (!accepted && updatedSnapshot != null) {
452-
endSnapshot(updatedSnapshot, failure);
460+
if (endSnapshot != null) {
461+
endSnapshot(endSnapshot, failure);
453462
}
454463
}
455464
});
@@ -750,6 +759,11 @@ public ClusterState execute(ClusterState currentState) throws Exception {
750759
}
751760
entries.add(updatedSnapshot);
752761
} else if (snapshot.state() == State.INIT && newMaster) {
762+
changed = true;
763+
// Mark the snapshot as aborted as it failed to start from the previous master
764+
updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.ABORTED, snapshot.shards());
765+
entries.add(updatedSnapshot);
766+
753767
// Clean up the snapshot that failed to start from the old master
754768
deleteSnapshot(snapshot.snapshot(), new DeleteSnapshotListener() {
755769
@Override
@@ -935,7 +949,7 @@ private Tuple<Set<String>, Set<String>> indicesWithMissingShards(ImmutableOpenMa
935949
*
936950
* @param entry snapshot
937951
*/
938-
void endSnapshot(SnapshotsInProgress.Entry entry) {
952+
void endSnapshot(final SnapshotsInProgress.Entry entry) {
939953
endSnapshot(entry, null);
940954
}
941955

@@ -1144,24 +1158,26 @@ public ClusterState execute(ClusterState currentState) throws Exception {
11441158
} else {
11451159
// This snapshot is currently running - stopping shards first
11461160
waitForSnapshot = true;
1147-
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;
1148-
if (snapshotEntry.state() == State.STARTED && snapshotEntry.shards() != null) {
1149-
// snapshot is currently running - stop started shards
1150-
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shardsBuilder = ImmutableOpenMap.builder();
1161+
1162+
final ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;
1163+
1164+
final State state = snapshotEntry.state();
1165+
if (state == State.INIT) {
1166+
// snapshot is still initializing, mark it as aborted
1167+
shards = snapshotEntry.shards();
1168+
1169+
} else if (state == State.STARTED) {
1170+
// snapshot is started - mark every non completed shard as aborted
1171+
final ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shardsBuilder = ImmutableOpenMap.builder();
11511172
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : snapshotEntry.shards()) {
11521173
ShardSnapshotStatus status = shardEntry.value;
1153-
if (!status.state().completed()) {
1154-
shardsBuilder.put(shardEntry.key, new ShardSnapshotStatus(status.nodeId(), State.ABORTED,
1155-
"aborted by snapshot deletion"));
1156-
} else {
1157-
shardsBuilder.put(shardEntry.key, status);
1174+
if (status.state().completed() == false) {
1175+
status = new ShardSnapshotStatus(status.nodeId(), State.ABORTED, "aborted by snapshot deletion");
11581176
}
1177+
shardsBuilder.put(shardEntry.key, status);
11591178
}
11601179
shards = shardsBuilder.build();
1161-
} else if (snapshotEntry.state() == State.INIT) {
1162-
// snapshot hasn't started yet - end it
1163-
shards = snapshotEntry.shards();
1164-
endSnapshot(snapshotEntry);
1180+
11651181
} else {
11661182
boolean hasUncompletedShards = false;
11671183
// Cleanup in case a node gone missing and snapshot wasn't updated for some reason
@@ -1178,7 +1194,8 @@ public ClusterState execute(ClusterState currentState) throws Exception {
11781194
logger.debug("trying to delete completed snapshot - should wait for shards to finalize on all nodes");
11791195
return currentState;
11801196
} else {
1181-
// no shards to wait for - finish the snapshot
1197+
// no shards to wait for but a node is gone - this is the only case
1198+
// where we force to finish the snapshot
11821199
logger.debug("trying to delete completed snapshot with no finalizing shards - can delete immediately");
11831200
shards = snapshotEntry.shards();
11841201
endSnapshot(snapshotEntry);

core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3151,7 +3151,7 @@ public void testSnapshottingWithMissingSequenceNumbers() {
31513151
assertThat(shardStats.getSeqNoStats().getMaxSeqNo(), equalTo(15L));
31523152
}
31533153

3154-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/27974")
3154+
@TestLogging("org.elasticsearch.snapshots:TRACE")
31553155
public void testAbortedSnapshotDuringInitDoesNotStart() throws Exception {
31563156
final Client client = client();
31573157

@@ -3163,11 +3163,11 @@ public void testAbortedSnapshotDuringInitDoesNotStart() throws Exception {
31633163
));
31643164

31653165
createIndex("test-idx");
3166-
final int nbDocs = scaledRandomIntBetween(1, 100);
3166+
final int nbDocs = scaledRandomIntBetween(100, 500);
31673167
for (int i = 0; i < nbDocs; i++) {
31683168
index("test-idx", "_doc", Integer.toString(i), "foo", "bar" + i);
31693169
}
3170-
refresh();
3170+
flushAndRefresh("test-idx");
31713171
assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits(), equalTo((long) nbDocs));
31723172

31733173
// Create a snapshot

0 commit comments

Comments
 (0)