Skip to content

Commit ed19090

Browse files
committed
Avoid concurrent snapshot finalizations when deleting an INIT snapshot (#28078)
This commit removes the finalization of a snapshot by the snapshot deletion request. This way, the deletion marks the snapshot as ABORTED in cluster state and waits for the snapshot completion. It is the responsability of the snapshot execution to detect the abortion and terminates itself correctly. This avoids concurrent snapshot finalizations and also ordinates the operations: the deletion aborts the snapshot and waits for the snapshot completion, the creation detects the abortion and stops by itself and finalizes the snapshot, then the deletion resumes and continues the deletion process.
1 parent 84503a1 commit ed19090

File tree

2 files changed

+47
-29
lines changed

2 files changed

+47
-29
lines changed

core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 45 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -370,26 +370,32 @@ private void beginSnapshot(final ClusterState clusterState,
370370
return;
371371
}
372372
clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() {
373-
boolean accepted = false;
374-
SnapshotsInProgress.Entry updatedSnapshot;
373+
374+
SnapshotsInProgress.Entry endSnapshot;
375375
String failure = null;
376376

377377
@Override
378378
public ClusterState execute(ClusterState currentState) {
379379
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
380380
List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
381381
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
382-
if (entry.snapshot().equals(snapshot.snapshot()) && entry.state() != State.ABORTED) {
383-
// Replace the snapshot that was just created
382+
if (entry.snapshot().equals(snapshot.snapshot()) == false) {
383+
entries.add(entry);
384+
continue;
385+
}
386+
387+
if (entry.state() != State.ABORTED) {
388+
// Replace the snapshot that was just intialized
384389
ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards = shards(currentState, entry.indices());
385390
if (!partial) {
386391
Tuple<Set<String>, Set<String>> indicesWithMissingShards = indicesWithMissingShards(shards, currentState.metaData());
387392
Set<String> missing = indicesWithMissingShards.v1();
388393
Set<String> closed = indicesWithMissingShards.v2();
389394
if (missing.isEmpty() == false || closed.isEmpty() == false) {
390-
StringBuilder failureMessage = new StringBuilder();
391-
updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.FAILED, shards);
392-
entries.add(updatedSnapshot);
395+
endSnapshot = new SnapshotsInProgress.Entry(entry, State.FAILED, shards);
396+
entries.add(endSnapshot);
397+
398+
final StringBuilder failureMessage = new StringBuilder();
393399
if (missing.isEmpty() == false) {
394400
failureMessage.append("Indices don't have primary shards ");
395401
failureMessage.append(missing);
@@ -405,13 +411,16 @@ public ClusterState execute(ClusterState currentState) {
405411
continue;
406412
}
407413
}
408-
updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.STARTED, shards);
414+
SnapshotsInProgress.Entry updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.STARTED, shards);
409415
entries.add(updatedSnapshot);
410-
if (!completed(shards.values())) {
411-
accepted = true;
416+
if (completed(shards.values())) {
417+
endSnapshot = updatedSnapshot;
412418
}
413419
} else {
414-
entries.add(entry);
420+
assert entry.state() == State.ABORTED : "expecting snapshot to be aborted during initialization";
421+
failure = "snapshot was aborted during initialization";
422+
endSnapshot = entry;
423+
entries.add(endSnapshot);
415424
}
416425
}
417426
return ClusterState.builder(currentState)
@@ -446,8 +455,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
446455
// We should end snapshot only if 1) we didn't accept it for processing (which happens when there
447456
// is nothing to do) and 2) there was a snapshot in metadata that we should end. Otherwise we should
448457
// go ahead and continue working on this snapshot rather then end here.
449-
if (!accepted && updatedSnapshot != null) {
450-
endSnapshot(updatedSnapshot, failure);
458+
if (endSnapshot != null) {
459+
endSnapshot(endSnapshot, failure);
451460
}
452461
}
453462
});
@@ -747,6 +756,11 @@ public ClusterState execute(ClusterState currentState) throws Exception {
747756
}
748757
entries.add(updatedSnapshot);
749758
} else if (snapshot.state() == State.INIT && newMaster) {
759+
changed = true;
760+
// Mark the snapshot as aborted as it failed to start from the previous master
761+
updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.ABORTED, snapshot.shards());
762+
entries.add(updatedSnapshot);
763+
750764
// Clean up the snapshot that failed to start from the old master
751765
deleteSnapshot(snapshot.snapshot(), new DeleteSnapshotListener() {
752766
@Override
@@ -932,7 +946,7 @@ private Tuple<Set<String>, Set<String>> indicesWithMissingShards(ImmutableOpenMa
932946
*
933947
* @param entry snapshot
934948
*/
935-
void endSnapshot(SnapshotsInProgress.Entry entry) {
949+
void endSnapshot(final SnapshotsInProgress.Entry entry) {
936950
endSnapshot(entry, null);
937951
}
938952

@@ -1142,24 +1156,26 @@ public ClusterState execute(ClusterState currentState) throws Exception {
11421156
} else {
11431157
// This snapshot is currently running - stopping shards first
11441158
waitForSnapshot = true;
1145-
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;
1146-
if (snapshotEntry.state() == State.STARTED && snapshotEntry.shards() != null) {
1147-
// snapshot is currently running - stop started shards
1148-
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shardsBuilder = ImmutableOpenMap.builder();
1159+
1160+
final ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;
1161+
1162+
final State state = snapshotEntry.state();
1163+
if (state == State.INIT) {
1164+
// snapshot is still initializing, mark it as aborted
1165+
shards = snapshotEntry.shards();
1166+
1167+
} else if (state == State.STARTED) {
1168+
// snapshot is started - mark every non completed shard as aborted
1169+
final ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shardsBuilder = ImmutableOpenMap.builder();
11491170
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : snapshotEntry.shards()) {
11501171
ShardSnapshotStatus status = shardEntry.value;
1151-
if (!status.state().completed()) {
1152-
shardsBuilder.put(shardEntry.key, new ShardSnapshotStatus(status.nodeId(), State.ABORTED,
1153-
"aborted by snapshot deletion"));
1154-
} else {
1155-
shardsBuilder.put(shardEntry.key, status);
1172+
if (status.state().completed() == false) {
1173+
status = new ShardSnapshotStatus(status.nodeId(), State.ABORTED, "aborted by snapshot deletion");
11561174
}
1175+
shardsBuilder.put(shardEntry.key, status);
11571176
}
11581177
shards = shardsBuilder.build();
1159-
} else if (snapshotEntry.state() == State.INIT) {
1160-
// snapshot hasn't started yet - end it
1161-
shards = snapshotEntry.shards();
1162-
endSnapshot(snapshotEntry);
1178+
11631179
} else {
11641180
boolean hasUncompletedShards = false;
11651181
// Cleanup in case a node gone missing and snapshot wasn't updated for some reason
@@ -1176,7 +1192,8 @@ public ClusterState execute(ClusterState currentState) throws Exception {
11761192
logger.debug("trying to delete completed snapshot - should wait for shards to finalize on all nodes");
11771193
return currentState;
11781194
} else {
1179-
// no shards to wait for - finish the snapshot
1195+
// no shards to wait for but a node is gone - this is the only case
1196+
// where we force to finish the snapshot
11801197
logger.debug("trying to delete completed snapshot with no finalizing shards - can delete immediately");
11811198
shards = snapshotEntry.shards();
11821199
endSnapshot(snapshotEntry);

core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3122,6 +3122,7 @@ public void testGetSnapshotsFromIndexBlobOnly() throws Exception {
31223122
}
31233123
}
31243124

3125+
@TestLogging("org.elasticsearch.snapshots:TRACE")
31253126
public void testAbortedSnapshotDuringInitDoesNotStart() throws Exception {
31263127
final Client client = client();
31273128

@@ -3137,7 +3138,7 @@ public void testAbortedSnapshotDuringInitDoesNotStart() throws Exception {
31373138
for (int i = 0; i < nbDocs; i++) {
31383139
index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i);
31393140
}
3140-
refresh();
3141+
flushAndRefresh("test-idx");
31413142
assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits(), equalTo((long) nbDocs));
31423143

31433144
// Create a snapshot

0 commit comments

Comments
 (0)