Skip to content

Commit 1f080e3

Browse files
Fix Snapshot State Machine Issues around Failed Clones (#76419)
With recent fixes it is never correct to simply remove a snapshot from the cluster state without updating other snapshot entries if an entry contains any successful shards due to possible dependencies. This change reproduces two issues resulting from simply removing snapshot without regard for other queued operations and fixes them by having all removal of snapshot from the cluster state go through the same code path. Also, this change moves the tracking of a snapshot as "ending" up a few lines to fix an assertion about finishing snapshots that forces them to be in this collection.
1 parent e305a6b commit 1f080e3

File tree

4 files changed

+92
-39
lines changed

4 files changed

+92
-39
lines changed

server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.client.Client;
2020
import org.elasticsearch.cluster.SnapshotsInProgress;
2121
import org.elasticsearch.common.UUIDs;
22+
import org.elasticsearch.common.settings.Settings;
2223
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
2324
import org.elasticsearch.core.TimeValue;
2425
import org.elasticsearch.index.IndexNotFoundException;
@@ -736,6 +737,66 @@ public void testManyConcurrentClonesStartOutOfOrder() throws Exception {
736737
assertAcked(clone2.get());
737738
}
738739

740+
public void testRemoveFailedCloneFromCSWithoutIO() throws Exception {
741+
final String masterNode = internalCluster().startMasterOnlyNode();
742+
internalCluster().startDataOnlyNode();
743+
final String repoName = "test-repo";
744+
createRepository(repoName, "mock");
745+
final String testIndex = "index-test";
746+
createIndexWithContent(testIndex);
747+
748+
final String sourceSnapshot = "source-snapshot";
749+
createFullSnapshot(repoName, sourceSnapshot);
750+
751+
final String targetSnapshot = "target-snapshot";
752+
blockAndFailMasterOnShardClone(repoName);
753+
final ActionFuture<AcknowledgedResponse> cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, testIndex);
754+
awaitNumberOfSnapshotsInProgress(1);
755+
waitForBlock(masterNode, repoName);
756+
unblockNode(repoName, masterNode);
757+
expectThrows(SnapshotException.class, cloneFuture::actionGet);
758+
awaitNoMoreRunningOperations();
759+
assertAllSnapshotsSuccessful(getRepositoryData(repoName), 1);
760+
assertAcked(startDeleteSnapshot(repoName, sourceSnapshot).get());
761+
}
762+
763+
public void testRemoveFailedCloneFromCSWithQueuedSnapshotInProgress() throws Exception {
764+
// single threaded master snapshot pool so we can selectively fail part of a clone by letting it run shard by shard
765+
final String masterNode = internalCluster().startMasterOnlyNode(
766+
Settings.builder().put("thread_pool.snapshot.core", 1).put("thread_pool.snapshot.max", 1).build()
767+
);
768+
final String dataNode = internalCluster().startDataOnlyNode();
769+
final String repoName = "test-repo";
770+
createRepository(repoName, "mock");
771+
final String testIndex = "index-test";
772+
final String testIndex2 = "index-test-2";
773+
createIndexWithContent(testIndex);
774+
createIndexWithContent(testIndex2);
775+
776+
final String sourceSnapshot = "source-snapshot";
777+
createFullSnapshot(repoName, sourceSnapshot);
778+
779+
final String targetSnapshot = "target-snapshot";
780+
blockAndFailMasterOnShardClone(repoName);
781+
782+
createIndexWithContent("test-index-3");
783+
blockDataNode(repoName, dataNode);
784+
final ActionFuture<CreateSnapshotResponse> fullSnapshotFuture1 = startFullSnapshot(repoName, "full-snapshot-1");
785+
waitForBlock(dataNode, repoName);
786+
final ActionFuture<AcknowledgedResponse> cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, testIndex, testIndex2);
787+
awaitNumberOfSnapshotsInProgress(2);
788+
waitForBlock(masterNode, repoName);
789+
unblockNode(repoName, masterNode);
790+
final ActionFuture<CreateSnapshotResponse> fullSnapshotFuture2 = startFullSnapshot(repoName, "full-snapshot-2");
791+
expectThrows(SnapshotException.class, cloneFuture::actionGet);
792+
unblockNode(repoName, dataNode);
793+
awaitNoMoreRunningOperations();
794+
assertSuccessful(fullSnapshotFuture1);
795+
assertSuccessful(fullSnapshotFuture2);
796+
assertAllSnapshotsSuccessful(getRepositoryData(repoName), 3);
797+
assertAcked(startDeleteSnapshot(repoName, sourceSnapshot).get());
798+
}
799+
739800
private ActionFuture<AcknowledgedResponse> startCloneFromDataNode(
740801
String repoName,
741802
String sourceSnapshot,
@@ -772,6 +833,10 @@ private void blockMasterOnShardClone(String repoName) {
772833
AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnMaster(repoName).setBlockOnWriteShardLevelMeta();
773834
}
774835

836+
private void blockAndFailMasterOnShardClone(String repoName) {
837+
AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnMaster(repoName).setBlockAndFailOnWriteShardLevelMeta();
838+
}
839+
775840
/**
776841
* Assert that given {@link RepositoryData} contains exactly the given number of snapshots and all of them are successful.
777842
*/

server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public Metadata clusterMetadata() {
7979
}
8080

8181
public ClusterState updatedClusterState(ClusterState state) {
82-
return SnapshotsService.stateWithoutSuccessfulSnapshot(state, snapshotInfo.snapshot());
82+
return SnapshotsService.stateWithoutSnapshot(state, snapshotInfo.snapshot());
8383
}
8484

8585
@Override

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 9 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1376,12 +1376,14 @@ private static boolean removedNodesCleanupNeeded(SnapshotsInProgress snapshotsIn
13761376
*/
13771377
private void endSnapshot(SnapshotsInProgress.Entry entry, Metadata metadata, @Nullable RepositoryData repositoryData) {
13781378
final Snapshot snapshot = entry.snapshot();
1379+
final boolean newFinalization = endingSnapshots.add(snapshot);
13791380
if (entry.isClone() && entry.state() == State.FAILED) {
13801381
logger.debug("Removing failed snapshot clone [{}] from cluster state", entry);
1381-
removeFailedSnapshotFromClusterState(snapshot, new SnapshotException(snapshot, entry.failure()), null);
1382+
if (newFinalization) {
1383+
removeFailedSnapshotFromClusterState(snapshot, new SnapshotException(snapshot, entry.failure()), null);
1384+
}
13821385
return;
13831386
}
1384-
final boolean newFinalization = endingSnapshots.add(snapshot);
13851387
final String repoName = snapshot.getRepository();
13861388
if (tryEnterRepoLoop(repoName)) {
13871389
if (repositoryData == null) {
@@ -1741,15 +1743,15 @@ private static Tuple<ClusterState, List<SnapshotDeletionsInProgress.Entry>> read
17411743
}
17421744

17431745
/**
1744-
* Computes the cluster state resulting from removing a given snapshot create operation that was finalized in the repository from the
1745-
* given state. This method will update the shard generations of snapshots that the given snapshot depended on so that finalizing them
1746-
* will not cause rolling back to an outdated shard generation.
1746+
* Computes the cluster state resulting from removing a given snapshot create operation from the given state. This method will update
1747+
* the shard generations of snapshots that the given snapshot depended on so that finalizing them will not cause rolling back to an
1748+
* outdated shard generation.
17471749
*
17481750
* @param state current cluster state
17491751
* @param snapshot snapshot for which to remove the snapshot operation
17501752
* @return updated cluster state
17511753
*/
1752-
public static ClusterState stateWithoutSuccessfulSnapshot(ClusterState state, Snapshot snapshot) {
1754+
public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot snapshot) {
17531755
// TODO: updating snapshots here leaks their outdated generation files, we should add logic to clean those up and enhance
17541756
// BlobStoreTestUtil to catch this leak
17551757
SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
@@ -1904,32 +1906,6 @@ private static <T> ImmutableOpenMap.Builder<T, ShardSnapshotStatus> maybeAddUpda
19041906
return updatedShardAssignments;
19051907
}
19061908

1907-
/**
1908-
* Computes the cluster state resulting from removing a given snapshot create operation from the given state after it has failed at
1909-
* any point before being finalized in the repository.
1910-
*
1911-
* @param state current cluster state
1912-
* @param snapshot snapshot for which to remove the snapshot operation
1913-
* @return updated cluster state
1914-
*/
1915-
private static ClusterState stateWithoutFailedSnapshot(ClusterState state, Snapshot snapshot) {
1916-
SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
1917-
ClusterState result = state;
1918-
boolean changed = false;
1919-
ArrayList<SnapshotsInProgress.Entry> entries = new ArrayList<>();
1920-
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
1921-
if (entry.snapshot().equals(snapshot)) {
1922-
changed = true;
1923-
} else {
1924-
entries.add(entry);
1925-
}
1926-
}
1927-
if (changed) {
1928-
result = ClusterState.builder(state).putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(entries)).build();
1929-
}
1930-
return readyDeletions(result).v1();
1931-
}
1932-
19331909
/**
19341910
* Removes record of running snapshot from cluster state and notifies the listener when this action is complete. This method is only
19351911
* used when the snapshot fails for some reason. During normal operation the snapshot repository will remove the
@@ -1946,7 +1922,7 @@ private void removeFailedSnapshotFromClusterState(Snapshot snapshot, Exception f
19461922

19471923
@Override
19481924
public ClusterState execute(ClusterState currentState) {
1949-
final ClusterState updatedState = stateWithoutFailedSnapshot(currentState, snapshot);
1925+
final ClusterState updatedState = stateWithoutSnapshot(currentState, snapshot);
19501926
assert updatedState == currentState || endingSnapshots.contains(snapshot)
19511927
: "did not track [" + snapshot + "] in ending snapshots while removing it from the cluster state";
19521928
// now check if there are any delete operations that refer to the just failed snapshot and remove the snapshot from them

test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,8 @@ public long getFailureCount() {
135135

136136
private volatile boolean blockOnWriteShardLevelMeta;
137137

138+
private volatile boolean blockAndFailOnWriteShardLevelMeta;
139+
138140
private volatile boolean blockOnReadIndexMeta;
139141

140142
private final AtomicBoolean blockOnceOnReadSnapshotInfo = new AtomicBoolean(false);
@@ -224,6 +226,7 @@ public synchronized void unblock() {
224226
blockedIndexId = null;
225227
blockOnDeleteIndexN = false;
226228
blockOnWriteShardLevelMeta = false;
229+
blockAndFailOnWriteShardLevelMeta = false;
227230
blockOnReadIndexMeta = false;
228231
blockOnceOnReadSnapshotInfo.set(false);
229232
blockAndFailOnReadSnapFile = false;
@@ -268,9 +271,15 @@ public void setBlockOnDeleteIndexFile() {
268271
}
269272

270273
public void setBlockOnWriteShardLevelMeta() {
274+
assert blockAndFailOnWriteShardLevelMeta == false : "Either fail or wait after blocking on shard level metadata, not both";
271275
blockOnWriteShardLevelMeta = true;
272276
}
273277

278+
public void setBlockAndFailOnWriteShardLevelMeta() {
279+
assert blockOnWriteShardLevelMeta == false : "Either fail or wait after blocking on shard level metadata, not both";
280+
blockAndFailOnWriteShardLevelMeta = true;
281+
}
282+
274283
public void setBlockOnReadIndexMeta() {
275284
blockOnReadIndexMeta = true;
276285
}
@@ -310,8 +319,8 @@ private synchronized boolean blockExecution() {
310319
boolean wasBlocked = false;
311320
try {
312321
while (blockAndFailOnDataFiles || blockOnDataFiles || blockOnAnyFiles || blockAndFailOnWriteIndexFile || blockOnWriteIndexFile
313-
|| blockAndFailOnWriteSnapFile || blockOnDeleteIndexN || blockOnWriteShardLevelMeta || blockOnReadIndexMeta
314-
|| blockAndFailOnReadSnapFile || blockAndFailOnReadIndexFile || blockedIndexId != null) {
322+
|| blockAndFailOnWriteSnapFile || blockOnDeleteIndexN || blockOnWriteShardLevelMeta || blockAndFailOnWriteShardLevelMeta
323+
|| blockOnReadIndexMeta || blockAndFailOnReadSnapFile || blockAndFailOnReadIndexFile || blockedIndexId != null) {
315324
blocked = true;
316325
this.wait();
317326
wasBlocked = true;
@@ -555,9 +564,12 @@ public void writeBlob(String blobName,
555564

556565
private void beforeWrite(String blobName) throws IOException {
557566
maybeIOExceptionOrBlock(blobName);
558-
if (blockOnWriteShardLevelMeta && blobName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX)
559-
&& path().equals(basePath()) == false) {
560-
blockExecutionAndMaybeWait(blobName);
567+
if (blobName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX) && path().equals(basePath()) == false) {
568+
if (blockOnWriteShardLevelMeta) {
569+
blockExecutionAndMaybeWait(blobName);
570+
} else if (blockAndFailOnWriteShardLevelMeta) {
571+
blockExecutionAndFail(blobName);
572+
}
561573
}
562574
}
563575

0 commit comments

Comments
 (0)