Skip to content

Commit aa1107b

Browse files
Refactor SnapshotsInProgress to Track Snapshots By Repository (#77984)
First step in making `SnapshotsInProgress` easier to work with by tracking snapshots per repository. This allows simplifying the concurrency logic in a couple of places and sets up a follow-up that would invert the current list of maps for snapshots that is very hard to reason about in the concurrency logic into a map of lists that maps repo-shard to snapshots to make the logic more obviously correct.
1 parent cdc1cc3 commit aa1107b

File tree

26 files changed

+728
-685
lines changed

26 files changed

+728
-685
lines changed

server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterStateDiffIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -703,7 +703,7 @@ public ClusterState.Builder remove(ClusterState.Builder builder, String name) {
703703
public ClusterState.Custom randomCreate(String name) {
704704
switch (randomIntBetween(0, 1)) {
705705
case 0:
706-
return SnapshotsInProgress.of(List.of(new SnapshotsInProgress.Entry(
706+
return SnapshotsInProgress.EMPTY.withAddedEntry(new SnapshotsInProgress.Entry(
707707
new Snapshot(randomName("repo"), new SnapshotId(randomName("snap"), UUIDs.randomBase64UUID())),
708708
randomBoolean(),
709709
randomBoolean(),
@@ -715,7 +715,7 @@ public ClusterState.Custom randomCreate(String name) {
715715
ImmutableOpenMap.of(),
716716
null,
717717
SnapshotInfoTestUtils.randomUserMetadata(),
718-
randomVersion(random()))));
718+
randomVersion(random())));
719719
case 1:
720720
return new RestoreInProgress.Builder().add(
721721
new RestoreInProgress.Entry(

server/src/internalClusterTest/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ public void testDisruptionAfterFinalization() throws Exception {
6969

7070
createRandomIndex(idxName);
7171

72-
createRepository("test-repo", "fs");
72+
final String repoName = "test-repo";
73+
createRepository(repoName, "fs");
7374

7475
final String masterNode1 = internalCluster().getMasterName();
7576

@@ -82,12 +83,12 @@ public void testDisruptionAfterFinalization() throws Exception {
8283
@Override
8384
public void clusterChanged(ClusterChangedEvent event) {
8485
SnapshotsInProgress snapshots = event.state().custom(SnapshotsInProgress.TYPE);
85-
if (snapshots != null && snapshots.entries().size() > 0) {
86-
final SnapshotsInProgress.Entry snapshotEntry = snapshots.entries().get(0);
86+
if (snapshots != null && snapshots.isEmpty() == false) {
87+
final SnapshotsInProgress.Entry snapshotEntry = snapshots.forRepo(repoName).get(0);
8788
if (snapshotEntry.state() == SnapshotsInProgress.State.SUCCESS) {
8889
final RepositoriesMetadata repoMeta =
8990
event.state().metadata().custom(RepositoriesMetadata.TYPE);
90-
final RepositoryMetadata metadata = repoMeta.repository("test-repo");
91+
final RepositoryMetadata metadata = repoMeta.repository(repoName);
9192
if (metadata.pendingGeneration() > snapshotEntry.repositoryStateId()) {
9293
logger.info("--> starting disruption");
9394
networkDisruption.startDisrupting();

server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -644,7 +644,7 @@ public void testStartCloneWithSuccessfulShardSnapshotPendingFinalization() throw
644644
try {
645645
awaitClusterState(clusterState -> {
646646
final List<SnapshotsInProgress.Entry> entries = clusterState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
647-
.entries();
647+
.forRepo(repoName);
648648
return entries.size() == 2 && entries.get(1).shardsByRepoShardId().isEmpty() == false;
649649
});
650650
assertFalse(blockedSnapshot.isDone());
@@ -681,7 +681,7 @@ public void testStartCloneDuringRunningDelete() throws Exception {
681681
final ActionFuture<AcknowledgedResponse> cloneFuture = startClone(repoName, sourceSnapshot, "target-snapshot", indexName);
682682
logger.info("--> waiting for snapshot clone to be fully initialized");
683683
awaitClusterState(state -> {
684-
for (SnapshotsInProgress.Entry entry : state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) {
684+
for (SnapshotsInProgress.Entry entry : state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).forRepo(repoName)) {
685685
if (entry.shardsByRepoShardId().isEmpty() == false) {
686686
assertEquals(sourceSnapshot, entry.source().getName());
687687
for (ObjectCursor<SnapshotsInProgress.ShardSnapshotStatus> value : entry.shardsByRepoShardId().values()) {
@@ -724,7 +724,7 @@ public void testManyConcurrentClonesStartOutOfOrder() throws Exception {
724724
awaitNumberOfSnapshotsInProgress(2);
725725
awaitClusterState(
726726
state -> state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
727-
.entries()
727+
.forRepo(repoName)
728728
.stream()
729729
.anyMatch(entry -> entry.state().completed())
730730
);

server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ public void testAbortOneOfMultipleSnapshots() throws Exception {
320320
logger.info("--> wait for snapshot on second data node to finish");
321321
awaitClusterState(state -> {
322322
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
323-
return snapshotsInProgress.entries().size() == 2 && snapshotHasCompletedShard(secondSnapshot, snapshotsInProgress);
323+
return snapshotsInProgress.count() == 2 && snapshotHasCompletedShard(repoName, secondSnapshot, snapshotsInProgress);
324324
});
325325

326326
final ActionFuture<AcknowledgedResponse> deleteSnapshotsResponse = startDeleteSnapshot(repoName, firstSnapshot);
@@ -378,7 +378,7 @@ public void testCascadedAborts() throws Exception {
378378
logger.info("--> wait for snapshot on second data node to finish");
379379
awaitClusterState(state -> {
380380
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
381-
return snapshotsInProgress.entries().size() == 2 && snapshotHasCompletedShard(secondSnapshot, snapshotsInProgress);
381+
return snapshotsInProgress.count() == 2 && snapshotHasCompletedShard(repoName, secondSnapshot, snapshotsInProgress);
382382
});
383383

384384
final ActionFuture<AcknowledgedResponse> deleteSnapshotsResponse = startDeleteSnapshot(repoName, firstSnapshot);
@@ -398,7 +398,7 @@ public void testCascadedAborts() throws Exception {
398398
assertBusy(() -> {
399399
assertThat(currentSnapshots(repoName), hasSize(1));
400400
final SnapshotsInProgress snapshotsInProgress = clusterService().state().custom(SnapshotsInProgress.TYPE);
401-
assertThat(snapshotsInProgress.entries().get(0).state(), is(SnapshotsInProgress.State.ABORTED));
401+
assertThat(snapshotsInProgress.forRepo(repoName).get(0).state(), is(SnapshotsInProgress.State.ABORTED));
402402
}, 30L, TimeUnit.SECONDS);
403403

404404
unblockNode(repoName, dataNode);
@@ -441,7 +441,7 @@ public void testMasterFailOverWithQueuedDeletes() throws Exception {
441441
logger.info("--> wait for snapshot on second data node to finish");
442442
awaitClusterState(state -> {
443443
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
444-
return snapshotsInProgress.entries().size() == 2 && snapshotHasCompletedShard(secondSnapshot, snapshotsInProgress);
444+
return snapshotsInProgress.count() == 2 && snapshotHasCompletedShard(repoName, secondSnapshot, snapshotsInProgress);
445445
});
446446

447447
final ActionFuture<AcknowledgedResponse> firstDeleteFuture = startDeleteFromNonMasterClient(repoName, firstSnapshot);
@@ -469,7 +469,7 @@ public void testMasterFailOverWithQueuedDeletes() throws Exception {
469469
assertThat(currentSnapshots(repoName), hasSize(2));
470470
for (SnapshotsInProgress.Entry entry : clusterService().state()
471471
.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
472-
.entries()) {
472+
.forRepo(repoName)) {
473473
assertThat(entry.state(), is(SnapshotsInProgress.State.ABORTED));
474474
assertThat(entry.snapshot().getSnapshotId().getName(), not(secondSnapshot));
475475
}
@@ -1503,12 +1503,15 @@ public void testOutOfOrderAndConcurrentFinalization() throws Exception {
15031503
.execute();
15041504

15051505
awaitClusterState(state -> {
1506-
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
1507-
return snapshotsInProgress.entries().size() == 2 && snapshotsInProgress.entries().get(1).state().completed();
1506+
final List<SnapshotsInProgress.Entry> snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
1507+
.forRepo(repository);
1508+
return snapshotsInProgress.size() == 2 && snapshotsInProgress.get(1).state().completed();
15081509
});
15091510

15101511
unblockAllDataNodes(repository);
1511-
awaitClusterState(state -> state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries().get(0).state().completed());
1512+
awaitClusterState(
1513+
state -> state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).forRepo(repository).get(0).state().completed()
1514+
);
15121515

15131516
unblockNode(repository, master);
15141517
assertSuccessful(snapshot2);
@@ -1990,8 +1993,8 @@ private void createIndexWithContent(String indexName, String nodeInclude, String
19901993
);
19911994
}
19921995

1993-
private static boolean snapshotHasCompletedShard(String snapshot, SnapshotsInProgress snapshotsInProgress) {
1994-
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
1996+
private static boolean snapshotHasCompletedShard(String repoName, String snapshot, SnapshotsInProgress snapshotsInProgress) {
1997+
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.forRepo(repoName)) {
19951998
if (entry.snapshot().getSnapshotId().getName().equals(snapshot)) {
19961999
for (ObjectCursor<SnapshotsInProgress.ShardSnapshotStatus> shard : entry.shards().values()) {
19972000
if (shard.value.state().completed()) {

server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1066,7 +1066,7 @@ public void onRequestSent(
10661066
logger,
10671067
otherDataNode,
10681068
state -> state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
1069-
.entries()
1069+
.forRepo(repoName)
10701070
.stream()
10711071
.anyMatch(entry -> entry.state() == SnapshotsInProgress.State.ABORTED)
10721072
);

server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ public ClusterState execute(ClusterState currentState) {
201201
);
202202
}
203203
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
204-
if (snapshots.entries().isEmpty() == false) {
204+
if (snapshots.isEmpty() == false) {
205205
throw new IllegalStateException(
206206
"Cannot cleanup [" + repositoryName + "] - a snapshot is currently running in [" + snapshots + "]"
207207
);

0 commit comments

Comments
 (0)