Skip to content

Commit 4d29cb2

Browse files
Add shortcuts for getting snapshot customs from cluster state (#98906)
Drying up the logic for looking up the customs and falling back to the empty default. Hopefully, this improves readability somewhat by taking away that complexity and some null checks. I like it at least, saves a lot of clutter. Re-revert of #98896, only fixing a single null check in tests.
1 parent 18f960c commit 4d29cb2

File tree

50 files changed

+209
-357
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+209
-357
lines changed

qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/snapshots/RestGetSnapshotsIT.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -201,15 +201,14 @@ public void testSortAndPaginateWithInProgress() throws Exception {
201201
}
202202
AbstractSnapshotIntegTestCase.awaitNumberOfSnapshotsInProgress(logger, inProgressCount);
203203
AbstractSnapshotIntegTestCase.awaitClusterState(logger, state -> {
204-
boolean firstIndexSuccessfullySnapshot = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
205-
.asStream()
204+
final var snapshotsInProgress = SnapshotsInProgress.get(state);
205+
boolean firstIndexSuccessfullySnapshot = snapshotsInProgress.asStream()
206206
.flatMap(s -> s.shards().entrySet().stream())
207207
.allMatch(
208208
e -> e.getKey().getIndexName().equals("test-index-1") == false
209209
|| e.getValue().state() == SnapshotsInProgress.ShardState.SUCCESS
210210
);
211-
boolean secondIndexIsBlocked = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
212-
.asStream()
211+
boolean secondIndexIsBlocked = snapshotsInProgress.asStream()
213212
.flatMap(s -> s.shards().entrySet().stream())
214213
.filter(e -> e.getKey().getIndexName().equals("test-index-2"))
215214
.map(e -> e.getValue().state())

server/src/internalClusterTest/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,7 @@ public void clusterChanged(ClusterChangedEvent event) {
8787
if (snapshots != null && snapshots.isEmpty() == false) {
8888
final SnapshotsInProgress.Entry snapshotEntry = snapshots.forRepo(repoName).get(0);
8989
if (snapshotEntry.state() == SnapshotsInProgress.State.SUCCESS) {
90-
final RepositoriesMetadata repoMeta = event.state().metadata().custom(RepositoriesMetadata.TYPE);
91-
final RepositoryMetadata metadata = repoMeta.repository(repoName);
90+
final RepositoryMetadata metadata = RepositoriesMetadata.get(event.state()).repository(repoName);
9291
if (metadata.pendingGeneration() > snapshotEntry.repositoryStateId()) {
9392
logger.info("--> starting disruption");
9493
networkDisruption.startDisrupting();

server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,7 @@ public void testMasterFailoverDuringCleanup() throws Exception {
4141
ensureStableCluster(nodeCount - 1);
4242

4343
logger.info("--> wait for cleanup to finish and disappear from cluster state");
44-
awaitClusterState(
45-
state -> state.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY).hasCleanupInProgress() == false
46-
);
44+
awaitClusterState(state -> RepositoryCleanupInProgress.get(state).hasCleanupInProgress() == false);
4745

4846
try {
4947
cleanupFuture.get();
@@ -70,9 +68,7 @@ public void testRepeatCleanupsDontRemove() throws Exception {
7068
unblockNode("test-repo", internalCluster().getMasterName());
7169

7270
logger.info("--> wait for cleanup to finish and disappear from cluster state");
73-
awaitClusterState(
74-
state -> state.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY).hasCleanupInProgress() == false
75-
);
71+
awaitClusterState(state -> RepositoryCleanupInProgress.get(state).hasCleanupInProgress() == false);
7672

7773
final ExecutionException e = expectThrows(ExecutionException.class, cleanupFuture::get);
7874
final Throwable ioe = ExceptionsHelper.unwrap(e, IOException.class);
@@ -119,9 +115,7 @@ private ActionFuture<CleanupRepositoryResponse> startBlockedCleanup(String repoN
119115

120116
final String masterNode = internalCluster().getMasterName();
121117
waitForBlock(masterNode, repoName);
122-
awaitClusterState(
123-
state -> state.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY).hasCleanupInProgress()
124-
);
118+
awaitClusterState(state -> RepositoryCleanupInProgress.get(state).hasCleanupInProgress());
125119
return future;
126120
}
127121

server/src/internalClusterTest/java/org/elasticsearch/snapshots/AbortedSnapshotIT.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,7 @@ public void run() {
6363
clusterAdmin().prepareCreateSnapshot(repoName, "snapshot-1").setWaitForCompletion(false).setPartial(true).get();
6464
// resulting cluster state has been applied on all nodes, which means the first task for the SNAPSHOT pool is queued up
6565

66-
final var snapshot = clusterService.state()
67-
.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
68-
.forRepo(repoName)
69-
.get(0)
70-
.snapshot();
66+
final var snapshot = SnapshotsInProgress.get(clusterService.state()).forRepo(repoName).get(0).snapshot();
7167
final var snapshotShardsService = internalCluster().getInstance(SnapshotShardsService.class, dataNode);
7268

7369
// Run up to 3 snapshot tasks, which are (in order):
@@ -105,7 +101,7 @@ public void run() {
105101
stopBlocking.set(true);
106102
safeAwait(barrier); // release snapshot thread
107103

108-
assertBusy(() -> assertTrue(clusterService.state().custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).isEmpty()));
104+
assertBusy(() -> assertTrue(SnapshotsInProgress.get(clusterService.state()).isEmpty()));
109105
}
110106

111107
}

server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -642,8 +642,7 @@ public void testStartCloneWithSuccessfulShardSnapshotPendingFinalization() throw
642642
logger.info("--> wait for clone to start fully with shards assigned in the cluster state");
643643
try {
644644
awaitClusterState(clusterState -> {
645-
final List<SnapshotsInProgress.Entry> entries = clusterState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
646-
.forRepo(repoName);
645+
final List<SnapshotsInProgress.Entry> entries = SnapshotsInProgress.get(clusterState).forRepo(repoName);
647646
return entries.size() == 2 && entries.get(1).shardsByRepoShardId().isEmpty() == false;
648647
});
649648
assertFalse(blockedSnapshot.isDone());
@@ -680,7 +679,7 @@ public void testStartCloneDuringRunningDelete() throws Exception {
680679
final ActionFuture<AcknowledgedResponse> cloneFuture = startClone(repoName, sourceSnapshot, "target-snapshot", indexName);
681680
logger.info("--> waiting for snapshot clone to be fully initialized");
682681
awaitClusterState(state -> {
683-
for (SnapshotsInProgress.Entry entry : state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).forRepo(repoName)) {
682+
for (SnapshotsInProgress.Entry entry : SnapshotsInProgress.get(state).forRepo(repoName)) {
684683
if (entry.shardsByRepoShardId().isEmpty() == false) {
685684
assertEquals(sourceSnapshot, entry.source().getName());
686685
for (SnapshotsInProgress.ShardSnapshotStatus value : entry.shardsByRepoShardId().values()) {
@@ -721,12 +720,7 @@ public void testManyConcurrentClonesStartOutOfOrder() throws Exception {
721720
final ActionFuture<AcknowledgedResponse> clone2 = startClone(repoName, sourceSnapshot, "target-snapshot-2", testIndex);
722721

723722
awaitNumberOfSnapshotsInProgress(2);
724-
awaitClusterState(
725-
state -> state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
726-
.forRepo(repoName)
727-
.stream()
728-
.anyMatch(entry -> entry.state().completed())
729-
);
723+
awaitClusterState(state -> SnapshotsInProgress.get(state).forRepo(repoName).stream().anyMatch(entry -> entry.state().completed()));
730724
repo.unblock();
731725

732726
assertAcked(clone1.get());

server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ public void testAbortOneOfMultipleSnapshots() throws Exception {
366366

367367
logger.info("--> wait for snapshot on second data node to finish");
368368
awaitClusterState(state -> {
369-
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
369+
final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(state);
370370
return snapshotsInProgress.count() == 2 && snapshotHasCompletedShard(repoName, secondSnapshot, snapshotsInProgress);
371371
});
372372

@@ -422,7 +422,7 @@ public void testCascadedAborts() throws Exception {
422422

423423
logger.info("--> wait for snapshot on second data node to finish");
424424
awaitClusterState(state -> {
425-
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
425+
final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(state);
426426
return snapshotsInProgress.count() == 2 && snapshotHasCompletedShard(repoName, secondSnapshot, snapshotsInProgress);
427427
});
428428

@@ -442,8 +442,10 @@ public void testCascadedAborts() throws Exception {
442442
logger.info("--> waiting for second and third snapshot to finish");
443443
assertBusy(() -> {
444444
assertThat(currentSnapshots(repoName), hasSize(1));
445-
final SnapshotsInProgress snapshotsInProgress = clusterService().state().custom(SnapshotsInProgress.TYPE);
446-
assertThat(snapshotsInProgress.forRepo(repoName).get(0).state(), is(SnapshotsInProgress.State.ABORTED));
445+
assertThat(
446+
SnapshotsInProgress.get(clusterService().state()).forRepo(repoName).get(0).state(),
447+
is(SnapshotsInProgress.State.ABORTED)
448+
);
447449
}, 30L, TimeUnit.SECONDS);
448450

449451
unblockNode(repoName, dataNode);
@@ -485,7 +487,7 @@ public void testMasterFailOverWithQueuedDeletes() throws Exception {
485487

486488
logger.info("--> wait for snapshot on second data node to finish");
487489
awaitClusterState(state -> {
488-
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
490+
final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(state);
489491
return snapshotsInProgress.count() == 2 && snapshotHasCompletedShard(repoName, secondSnapshot, snapshotsInProgress);
490492
});
491493

@@ -512,9 +514,7 @@ public void testMasterFailOverWithQueuedDeletes() throws Exception {
512514
logger.info("--> waiting for second snapshot to finish and the other two snapshots to become aborted");
513515
assertBusy(() -> {
514516
assertThat(currentSnapshots(repoName), hasSize(2));
515-
for (SnapshotsInProgress.Entry entry : clusterService().state()
516-
.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
517-
.forRepo(repoName)) {
517+
for (SnapshotsInProgress.Entry entry : SnapshotsInProgress.get(clusterService().state()).forRepo(repoName)) {
518518
assertThat(entry.state(), is(SnapshotsInProgress.State.ABORTED));
519519
assertThat(entry.snapshot().getSnapshotId().getName(), not(secondSnapshot));
520520
}
@@ -1583,15 +1583,12 @@ public void testOutOfOrderAndConcurrentFinalization() throws Exception {
15831583
.execute();
15841584

15851585
awaitClusterState(state -> {
1586-
final List<SnapshotsInProgress.Entry> snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
1587-
.forRepo(repository);
1586+
final List<SnapshotsInProgress.Entry> snapshotsInProgress = SnapshotsInProgress.get(state).forRepo(repository);
15881587
return snapshotsInProgress.size() == 2 && snapshotsInProgress.get(1).state().completed();
15891588
});
15901589

15911590
unblockAllDataNodes(repository);
1592-
awaitClusterState(
1593-
state -> state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).forRepo(repository).get(0).state().completed()
1594-
);
1591+
awaitClusterState(state -> SnapshotsInProgress.get(state).forRepo(repository).get(0).state().completed());
15951592

15961593
unblockNode(repository, master);
15971594
assertSuccessful(snapshot2);

server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,7 @@ public void testFindDanglingLatestGeneration() throws Exception {
219219
Metadata.builder(currentState.getMetadata())
220220
.putCustom(
221221
RepositoriesMetadata.TYPE,
222-
currentState.metadata()
223-
.<RepositoriesMetadata>custom(RepositoriesMetadata.TYPE)
222+
RepositoriesMetadata.get(currentState)
224223
.withUpdatedGeneration(repository.getMetadata().name(), beforeMoveGen, beforeMoveGen + 1)
225224
)
226225
.build()

server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1056,7 +1056,7 @@ public void onRequestSent(
10561056
awaitClusterState(
10571057
logger,
10581058
otherDataNode,
1059-
state -> state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
1059+
state -> SnapshotsInProgress.get(state)
10601060
.forRepo(repoName)
10611061
.stream()
10621062
.anyMatch(entry -> entry.state() == SnapshotsInProgress.State.ABORTED)

server/src/internalClusterTest/java/org/elasticsearch/snapshots/GetSnapshotsIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ public void testSortAndPaginateWithInProgress() throws Exception {
182182
}
183183
awaitNumberOfSnapshotsInProgress(inProgressCount);
184184
awaitClusterState(
185-
state -> state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
185+
state -> SnapshotsInProgress.get(state)
186186
.asStream()
187187
.flatMap(s -> s.shards().entrySet().stream())
188188
.allMatch(

server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1102,7 +1102,7 @@ public void testSnapshotStatus() throws Exception {
11021102
waitForBlock(blockedNode, "test-repo");
11031103

11041104
awaitClusterState(state -> {
1105-
SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
1105+
SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(state);
11061106
Set<Snapshot> snapshots = snapshotsInProgress.asStream().map(SnapshotsInProgress.Entry::snapshot).collect(Collectors.toSet());
11071107
if (snapshots.size() != 1) {
11081108
return false;

0 commit comments

Comments
 (0)