diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java index ebe62d1f34335..397c9def6411e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java @@ -61,7 +61,6 @@ import java.util.List; import java.util.Locale; import java.util.concurrent.TimeUnit; -import java.util.function.Predicate; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; @@ -243,7 +242,7 @@ public void testMultipleReposAreIndependent2() throws Exception { logger.info("--> waiting for concurrent snapshot(s) to finish"); createNSnapshots(otherRepoName, randomIntBetween(1, 5)); - assertAcked(startDelete(otherRepoName, "*").get()); + assertAcked(startDeleteSnapshot(otherRepoName, "*").get()); unblockNode(blockedRepoName, dataNode); assertSuccessful(createSlowFuture); @@ -260,11 +259,11 @@ public void testMultipleReposAreIndependent3() throws Exception { createFullSnapshot( blockedRepoName, "blocked-snapshot"); blockNodeOnAnyFiles(blockedRepoName, masterNode); - final ActionFuture slowDeleteFuture = startDelete(blockedRepoName, "*"); + final ActionFuture slowDeleteFuture = startDeleteSnapshot(blockedRepoName, "*"); logger.info("--> waiting for concurrent snapshot(s) to finish"); createNSnapshots(otherRepoName, randomIntBetween(1, 5)); - assertAcked(startDelete(otherRepoName, "*").get()); + assertAcked(startDeleteSnapshot(otherRepoName, "*").get()); unblockNode(blockedRepoName, masterNode); assertAcked(slowDeleteFuture.actionGet()); @@ -283,7 +282,7 @@ public void testSnapshotRunsAfterInProgressDelete() throws Exception { createFullSnapshot(repoName, firstSnapshot); blockMasterFromFinalizingSnapshotOnIndexFile(repoName); - final ActionFuture deleteFuture = startDelete(repoName, firstSnapshot); + final ActionFuture deleteFuture = startDeleteSnapshot(repoName, firstSnapshot); waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L)); final ActionFuture snapshotFuture = startFullSnapshot(repoName, "second-snapshot"); @@ -321,7 +320,7 @@ public void testAbortOneOfMultipleSnapshots() throws Exception { return snapshotsInProgress.entries().size() == 2 && snapshotHasCompletedShard(secondSnapshot, snapshotsInProgress); }); - final ActionFuture deleteSnapshotsResponse = startDelete(repoName, firstSnapshot); + final ActionFuture deleteSnapshotsResponse = startDeleteSnapshot(repoName, firstSnapshot); awaitNDeletionsInProgress(1); logger.info("--> start third snapshot"); @@ -370,7 +369,7 @@ public void testCascadedAborts() throws Exception { return snapshotsInProgress.entries().size() == 2 && snapshotHasCompletedShard(secondSnapshot, snapshotsInProgress); }); - final ActionFuture deleteSnapshotsResponse = startDelete(repoName, firstSnapshot); + final ActionFuture deleteSnapshotsResponse = startDeleteSnapshot(repoName, firstSnapshot); awaitNDeletionsInProgress(1); final ActionFuture thirdSnapshotResponse = startFullSnapshot(repoName, "snapshot-three"); @@ -381,7 +380,7 @@ public void testCascadedAborts() throws Exception { logger.info("--> waiting for all three snapshots to show up as in-progress"); assertBusy(() -> assertThat(currentSnapshots(repoName), hasSize(3)), 30L, TimeUnit.SECONDS); - final ActionFuture allDeletedResponse = startDelete(repoName, "*"); + final ActionFuture allDeletedResponse = startDeleteSnapshot(repoName, "*"); logger.info("--> waiting for second and third snapshot to finish"); assertBusy(() -> { @@ -533,13 +532,13 @@ public void testQueuedDeletesWithFailures() throws Exception { createNSnapshots(repoName, randomIntBetween(2, 5)); blockMasterFromFinalizingSnapshotOnIndexFile(repoName); - final ActionFuture firstDeleteFuture = startDelete(repoName, "*"); + final ActionFuture firstDeleteFuture = startDeleteSnapshot(repoName, "*"); waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L)); final ActionFuture snapshotFuture = startFullSnapshot(repoName, "snapshot-queued"); - awaitNSnapshotsInProgress(1); + awaitNumberOfSnapshotsInProgress(1); - final ActionFuture secondDeleteFuture = startDelete(repoName, "*"); + final ActionFuture secondDeleteFuture = startDeleteSnapshot(repoName, "*"); awaitNDeletionsInProgress(2); unblockNode(repoName, masterNode); @@ -564,9 +563,9 @@ public void testQueuedDeletesWithOverlap() throws Exception { final ActionFuture firstDeleteFuture = startAndBlockOnDeleteSnapshot(repoName, "*"); final ActionFuture snapshotFuture = startFullSnapshot(repoName, "snapshot-queued"); - awaitNSnapshotsInProgress(1); + awaitNumberOfSnapshotsInProgress(1); - final ActionFuture secondDeleteFuture = startDelete(repoName, "*"); + final ActionFuture secondDeleteFuture = startDeleteSnapshot(repoName, "*"); awaitNDeletionsInProgress(2); unblockNode(repoName, masterNode); @@ -593,7 +592,7 @@ public void testQueuedOperationsOnMasterRestart() throws Exception { client().admin().cluster().prepareCreateSnapshot(repoName, "snapshot-three").setWaitForCompletion(false).get(); - startDelete(repoName, "*"); + startDeleteSnapshot(repoName, "*"); awaitNDeletionsInProgress(2); internalCluster().stopCurrentMasterNode(); @@ -621,7 +620,7 @@ public void testQueuedOperationsOnMasterDisconnect() throws Exception { final ActionFuture createThirdSnapshot = client(masterNode).admin().cluster() .prepareCreateSnapshot(repoName, "snapshot-three").setWaitForCompletion(true).execute(); - awaitNSnapshotsInProgress(1); + awaitNumberOfSnapshotsInProgress(1); final ActionFuture secondDeleteFuture = client(masterNode).admin().cluster().prepareDeleteSnapshot(repoName, "*").execute(); @@ -658,7 +657,7 @@ public void testQueuedOperationsOnMasterDisconnectAndRepoFailure() throws Except waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L)); final ActionFuture secondFailedSnapshotFuture = startFullSnapshotFromMasterClient(repoName, "failing-snapshot-2"); - awaitNSnapshotsInProgress(2); + awaitNumberOfSnapshotsInProgress(2); final ActionFuture failedDeleteFuture = client(masterNode).admin().cluster().prepareDeleteSnapshot(repoName, "*").execute(); @@ -754,7 +753,7 @@ public void testQueuedSnapshotOperationsAndBrokenRepoOnMasterFailOver2() throws corruptIndexN(repoPath, generation); final ActionFuture snapshotFour = startFullSnapshotFromNonMasterClient(repoName, "snapshot-four"); - awaitNSnapshotsInProgress(2); + awaitNumberOfSnapshotsInProgress(2); final NetworkDisruption networkDisruption = isolateMasterDisruption(NetworkDisruption.DISCONNECT); internalCluster().setDisruptionScheme(networkDisruption); @@ -789,18 +788,18 @@ public void testQueuedSnapshotOperationsAndBrokenRepoOnMasterFailOverMultipleRep awaitNDeletionsInProgress(1); final ActionFuture createBlockedSnapshot = startFullSnapshotFromNonMasterClient(blockedRepoName, "queued-snapshot"); - awaitNSnapshotsInProgress(1); + awaitNumberOfSnapshotsInProgress(1); final long generation = getRepositoryData(repoName).getGenId(); blockNodeOnAnyFiles(repoName, masterNode); final ActionFuture snapshotThree = startFullSnapshotFromNonMasterClient(repoName, "snapshot-three"); waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L)); - awaitNSnapshotsInProgress(2); + awaitNumberOfSnapshotsInProgress(2); corruptIndexN(repoPath, generation); final ActionFuture snapshotFour = startFullSnapshotFromNonMasterClient(repoName, "snapshot-four"); - awaitNSnapshotsInProgress(3); + awaitNumberOfSnapshotsInProgress(3); internalCluster().stopCurrentMasterNode(); ensureStableCluster(3); @@ -842,7 +841,7 @@ public void testMultiplePartialSnapshotsQueuedAfterDelete() throws Exception { final ActionFuture deleteFuture = startAndBlockOnDeleteSnapshot(repoName, "*"); final ActionFuture snapshotThree = startFullSnapshot(repoName, "snapshot-three", true); final ActionFuture snapshotFour = startFullSnapshot(repoName, "snapshot-four", true); - awaitNSnapshotsInProgress(2); + awaitNumberOfSnapshotsInProgress(2); assertAcked(client().admin().indices().prepareDelete("index-two")); unblockNode(repoName, masterNode); @@ -908,7 +907,7 @@ public void testBackToBackQueuedDeletes() throws Exception { final String snapshotTwo = snapshots.get(1); final ActionFuture deleteSnapshotOne = startAndBlockOnDeleteSnapshot(repoName, snapshotOne); - final ActionFuture deleteSnapshotTwo = startDelete(repoName, snapshotTwo); + final ActionFuture deleteSnapshotTwo = startDeleteSnapshot(repoName, snapshotTwo); awaitNDeletionsInProgress(2); unblockNode(repoName, masterName); @@ -935,7 +934,7 @@ public void testQueuedOperationsAfterFinalizationFailure() throws Exception { final String masterName = internalCluster().getMasterName(); final String snapshotOne = snapshotNames.get(0); - final ActionFuture deleteSnapshotOne = startDelete(repoName, snapshotOne); + final ActionFuture deleteSnapshotOne = startDeleteSnapshot(repoName, snapshotOne); awaitNDeletionsInProgress(1); unblockNode(repoName, masterName); @@ -955,7 +954,7 @@ public void testStartDeleteDuringFinalizationCleanup() throws Exception { blockMasterFromDeletingIndexNFile(repoName); final ActionFuture snapshotFuture = startFullSnapshot(repoName, snapshotName); waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L)); - final ActionFuture deleteFuture = startDelete(repoName, snapshotName); + final ActionFuture deleteFuture = startDeleteSnapshot(repoName, snapshotName); awaitNDeletionsInProgress(1); unblockNode(repoName, masterName); assertSuccessful(snapshotFuture); @@ -1004,7 +1003,7 @@ public void testMasterFailoverOnFinalizationLoop() throws Exception { waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L)); final String snapshotOne = snapshotNames.get(0); - final ActionFuture deleteSnapshotOne = startDelete(repoName, snapshotOne); + final ActionFuture deleteSnapshotOne = startDeleteSnapshot(repoName, snapshotOne); awaitNDeletionsInProgress(1); networkDisruption.startDisrupting(); ensureStableCluster(3, dataNode); @@ -1039,14 +1038,14 @@ public void testStatusMultipleSnapshotsMultipleRepos() throws Exception { startFullSnapshotBlockedOnDataNode("blocked-snapshot-2", blockedRepoName, dataNode); final ActionFuture createSlowFuture3 = startFullSnapshotBlockedOnDataNode("other-blocked-snapshot", otherBlockedRepoName, dataNode); - awaitNSnapshotsInProgress(3); + awaitNumberOfSnapshotsInProgress(3); assertSnapshotStatusCountOnRepo("_all", 3); assertSnapshotStatusCountOnRepo(blockedRepoName, 2); assertSnapshotStatusCountOnRepo(otherBlockedRepoName, 1); unblockNode(blockedRepoName, dataNode); - awaitNSnapshotsInProgress(1); + awaitNumberOfSnapshotsInProgress(1); assertSnapshotStatusCountOnRepo("_all", 1); assertSnapshotStatusCountOnRepo(blockedRepoName, 0); assertSnapshotStatusCountOnRepo(otherBlockedRepoName, 1); @@ -1074,7 +1073,7 @@ public void testInterleavedAcrossMultipleRepos() throws Exception { startFullSnapshotBlockedOnDataNode("blocked-snapshot-2", blockedRepoName, dataNode); final ActionFuture createSlowFuture3 = startFullSnapshotBlockedOnDataNode("other-blocked-snapshot", otherBlockedRepoName, dataNode); - awaitNSnapshotsInProgress(3); + awaitNumberOfSnapshotsInProgress(3); unblockNode(blockedRepoName, dataNode); unblockNode(otherBlockedRepoName, dataNode); @@ -1108,7 +1107,7 @@ public void testMasterFailoverAndMultipleQueuedUpSnapshotsAcrossTwoRepos() throw client().admin().cluster().prepareCreateSnapshot(otherRepoName, "snapshot-other-blocked-1").setWaitForCompletion(false).get(); client().admin().cluster().prepareCreateSnapshot(otherRepoName, "snapshot-other-blocked-2").setWaitForCompletion(false).get(); - awaitNSnapshotsInProgress(4); + awaitNumberOfSnapshotsInProgress(4); final String initialMaster = internalCluster().getMasterName(); waitForBlock(initialMaster, repoName, TimeValue.timeValueSeconds(30L)); waitForBlock(initialMaster, otherRepoName, TimeValue.timeValueSeconds(30L)); @@ -1144,10 +1143,10 @@ public void testConcurrentOperationsLimit() throws Exception { ++blockedSnapshots; } else { blockedDelete = true; - deleteFuture = startDelete(repoName, randomFrom(snapshotNames)); + deleteFuture = startDeleteSnapshot(repoName, randomFrom(snapshotNames)); } } - awaitNSnapshotsInProgress(blockedSnapshots); + awaitNumberOfSnapshotsInProgress(blockedSnapshots); if (blockedDelete) { awaitNDeletionsInProgress(1); } @@ -1213,7 +1212,7 @@ public void testQueuedDeleteAfterFinalizationFailure() throws Exception { final String snapshotName = "snap-1"; final ActionFuture snapshotFuture = startFullSnapshot(repoName, snapshotName); waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L)); - final ActionFuture deleteFuture = startDelete(repoName, snapshotName); + final ActionFuture deleteFuture = startDeleteSnapshot(repoName, snapshotName); awaitNDeletionsInProgress(1); unblockNode(repoName, masterNode); assertAcked(deleteFuture.get()); @@ -1234,9 +1233,9 @@ public void testAbortNotStartedSnapshotWithoutIO() throws Exception { final String snapshotTwo = "second-snapshot"; final ActionFuture createSnapshot2Future = startFullSnapshot(repoName, snapshotTwo); - awaitNSnapshotsInProgress(2); + awaitNumberOfSnapshotsInProgress(2); - assertAcked(startDelete(repoName, snapshotTwo).get()); + assertAcked(startDeleteSnapshot(repoName, snapshotTwo).get()); final SnapshotException sne = expectThrows(SnapshotException.class, createSnapshot2Future::actionGet); assertFalse(createSnapshot1Future.isDone()); @@ -1277,11 +1276,6 @@ private ActionFuture startDeleteFromNonMasterClient(String return internalCluster().nonMasterClient().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).execute(); } - private ActionFuture startDelete(String repoName, String snapshotName) { - logger.info("--> deleting snapshot [{}] from repo [{}]", snapshotName, repoName); - return client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).execute(); - } - private ActionFuture startFullSnapshotFromNonMasterClient(String repoName, String snapshotName) { logger.info("--> creating full snapshot [{}] to repo [{}] from non master client", snapshotName, repoName); return internalCluster().nonMasterClient().admin().cluster().prepareCreateSnapshot(repoName, snapshotName) @@ -1294,44 +1288,17 @@ private ActionFuture startFullSnapshotFromMasterClient(S .setWaitForCompletion(true).execute(); } - private ActionFuture startFullSnapshot(String repoName, String snapshotName) { - return startFullSnapshot(repoName, snapshotName, false); - } - - private ActionFuture startFullSnapshot(String repoName, String snapshotName, boolean partial) { - logger.info("--> creating full snapshot [{}] to repo [{}]", snapshotName, repoName); - return client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true) - .setPartial(partial).execute(); - } - - private void awaitClusterState(Predicate statePredicate) throws Exception { - awaitClusterState(internalCluster().getMasterName(), statePredicate); - } - // Large snapshot pool settings to set up nodes for tests involving multiple repositories that need to have enough // threads so that blocking some threads on one repository doesn't block other repositories from doing work private static final Settings LARGE_SNAPSHOT_POOL_SETTINGS = Settings.builder() .put("thread_pool.snapshot.core", 5).put("thread_pool.snapshot.max", 5).build(); - private static final Settings SINGLE_SHARD_NO_REPLICA = indexSettingsNoReplicas(1).build(); - - private void createIndexWithContent(String indexName) { - createIndexWithContent(indexName, SINGLE_SHARD_NO_REPLICA); - } - private void createIndexWithContent(String indexName, String nodeInclude, String nodeExclude) { createIndexWithContent(indexName, indexSettingsNoReplicas(1) .put("index.routing.allocation.include._name", nodeInclude) .put("index.routing.allocation.exclude._name", nodeExclude).build()); } - private void createIndexWithContent(String indexName, Settings indexSettings) { - logger.info("--> creating index [{}]", indexName); - createIndex(indexName, indexSettings); - ensureGreen(indexName); - indexDoc(indexName, "some_id", "foo", "bar"); - } - private static boolean snapshotHasCompletedShard(String snapshot, SnapshotsInProgress snapshotsInProgress) { for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { if (entry.snapshot().getSnapshotId().getName().equals(snapshot)) { @@ -1345,12 +1312,6 @@ private static boolean snapshotHasCompletedShard(String snapshot, SnapshotsInPro return false; } - private static SnapshotInfo assertSuccessful(ActionFuture future) throws Exception { - final SnapshotInfo snapshotInfo = future.get().getSnapshotInfo(); - assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS)); - return snapshotInfo; - } - private void corruptIndexN(Path repoPath, long generation) throws IOException { logger.info("--> corrupting [index-{}] in [{}]", generation, repoPath); Path indexNBlob = repoPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + generation); @@ -1364,12 +1325,6 @@ private void awaitNDeletionsInProgress(int count) throws Exception { state.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY).getEntries().size() == count); } - private void awaitNSnapshotsInProgress(int count) throws Exception { - logger.info("--> wait for [{}] snapshots to show up in the cluster state", count); - awaitClusterState(state -> - state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries().size() == count); - } - private static List currentSnapshots(String repoName) { return client().admin().cluster().prepareGetSnapshots(repoName).setSnapshots(GetSnapshotsRequest.CURRENT_SNAPSHOT) .get().getSnapshots(repoName); @@ -1379,7 +1334,7 @@ private ActionFuture startAndBlockOnDeleteSnapshot(String throws InterruptedException { final String masterName = internalCluster().getMasterName(); blockNodeOnAnyFiles(repoName, masterName); - final ActionFuture fut = startDelete(repoName, snapshotName); + final ActionFuture fut = startDeleteSnapshot(repoName, snapshotName); waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L)); return fut; } @@ -1391,12 +1346,4 @@ private ActionFuture startAndBlockFailingFullSnapshot(St waitForBlock(internalCluster().getMasterName(), blockedRepoName, TimeValue.timeValueSeconds(30L)); return fut; } - - private ActionFuture startFullSnapshotBlockedOnDataNode(String snapshotName, String repoName, String dataNode) - throws InterruptedException { - blockDataNode(repoName, dataNode); - final ActionFuture fut = startFullSnapshot(repoName, snapshotName); - waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L)); - return fut; - } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java index e0c00fb8079fc..16d6ba09bbd11 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java @@ -49,6 +49,7 @@ import java.util.Collections; import java.util.Locale; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collectors; @@ -111,8 +112,7 @@ public void testConcurrentlyChangeRepositoryContents() throws Exception { .put("compress", false) .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); - logger.info("--> delete snapshot"); - client.admin().cluster().prepareDeleteSnapshot(repoName, snapshot).get(); + startDeleteSnapshot(repoName, snapshot).get(); logger.info("--> make sure snapshot doesn't exist"); expectThrows(SnapshotMissingException.class, () -> client.admin().cluster().prepareGetSnapshots(repoName) @@ -180,8 +180,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS logger.info("--> verify index-N blob is found at the new location"); assertThat(getRepositoryData(repoName).getGenId(), is(beforeMoveGen + 1)); - logger.info("--> delete snapshot"); - client().admin().cluster().prepareDeleteSnapshot(repoName, snapshot).get(); + startDeleteSnapshot(repoName, snapshot).get(); logger.info("--> verify index-N blob is found at the expected location"); assertThat(getRepositoryData(repoName).getGenId(), is(beforeMoveGen + 2)); @@ -241,7 +240,7 @@ public void testHandlingMissingRootLevelSnapshotMetadata() throws Exception { is(SnapshotsService.OLD_SNAPSHOT_FORMAT)); logger.info("--> verify that snapshot with missing root level metadata can be deleted"); - assertAcked(client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotToCorrupt.getName()).get()); + assertAcked(startDeleteSnapshot(repoName, snapshotToCorrupt.getName()).get()); logger.info("--> verify that repository is assumed in new metadata format after removing corrupted snapshot"); assertThat(PlainActionFuture.get(f -> threadPool.generic().execute( @@ -291,7 +290,7 @@ public void testMountCorruptedRepositoryData() throws Exception { expectThrows(RepositoryException.class, () -> getRepositoryData(otherRepo)); } - public void testHandleSnapshotErrorWithBwCFormat() throws IOException { + public void testHandleSnapshotErrorWithBwCFormat() throws IOException, ExecutionException, InterruptedException { final String repoName = "test-repo"; final Path repoPath = randomRepoPath(); createRepository(repoName, "fs", repoPath); @@ -315,13 +314,12 @@ public void testHandleSnapshotErrorWithBwCFormat() throws IOException { assertFileExists(initialShardMetaPath); Files.move(initialShardMetaPath, shardPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + "1")); - logger.info("--> delete old version snapshot"); - client().admin().cluster().prepareDeleteSnapshot(repoName, oldVersionSnapshot).get(); + startDeleteSnapshot(repoName, oldVersionSnapshot).get(); createFullSnapshot(repoName, "snapshot-2"); } - public void testRepairBrokenShardGenerations() throws IOException { + public void testRepairBrokenShardGenerations() throws Exception { final String repoName = "test-repo"; final Path repoPath = randomRepoPath(); createRepository(repoName, "fs", repoPath); @@ -336,8 +334,7 @@ public void testRepairBrokenShardGenerations() throws IOException { createFullSnapshot(repoName, "snapshot-1"); - logger.info("--> delete old version snapshot"); - client().admin().cluster().prepareDeleteSnapshot(repoName, oldVersionSnapshot).get(); + startDeleteSnapshot(repoName, oldVersionSnapshot).get(); logger.info("--> move shard level metadata to new generation and make RepositoryData point at an older generation"); final IndexId indexId = getRepositoryData(repoName).resolveIndexId(indexName); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index d72931d2d5d77..85b84c7e889b8 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -32,14 +32,11 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.support.ActiveShardCount; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.Client; import org.elasticsearch.client.node.NodeClient; -import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.SnapshotsInProgress; @@ -111,6 +108,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.function.Function; import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; import static org.elasticsearch.test.NodeRoles.nonMasterNode; @@ -340,13 +338,13 @@ public void testRestoreCustomMetadata() throws Exception { equalTo("before_snapshot_s_gw_noapi")); } - private void updateClusterState(final ClusterStateUpdater updater) throws InterruptedException { + private void updateClusterState(final Function updater) throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(1); final ClusterService clusterService = internalCluster().getInstance(ClusterService.class); clusterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() { @Override - public ClusterState execute(ClusterState currentState) throws Exception { - return updater.execute(currentState); + public ClusterState execute(ClusterState currentState) { + return updater.apply(currentState); } @Override @@ -362,10 +360,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS countDownLatch.await(); } - private interface ClusterStateUpdater { - ClusterState execute(ClusterState currentState) throws Exception; - } - public void testSnapshotDuringNodeShutdown() throws Exception { logger.info("--> start 2 nodes"); Client client = client(); @@ -857,10 +851,9 @@ public void testRestoreShrinkIndex() throws Exception { assertAcked(client.admin().indices().prepareResizeIndex(sourceIdx, shrunkIdx).get()); logger.info("--> snapshot the shrunk index"); - CreateSnapshotResponse createResponse = client.admin().cluster() + assertSuccessful(client.admin().cluster() .prepareCreateSnapshot(repo, snapshot) - .setWaitForCompletion(true).setIndices(shrunkIdx).get(); - assertEquals(SnapshotState.SUCCESS, createResponse.getSnapshotInfo().state()); + .setWaitForCompletion(true).setIndices(shrunkIdx).execute()); logger.info("--> delete index and stop the data node"); assertAcked(client.admin().indices().prepareDelete(sourceIdx).get()); @@ -912,7 +905,7 @@ public void testSnapshotWithDateMath() { assertThat(snapshots.get(0).getState().completed(), equalTo(true)); } - public void testSnapshotTotalAndIncrementalSizes() throws IOException { + public void testSnapshotTotalAndIncrementalSizes() throws Exception { Client client = client(); final String indexName = "test-blocks-1"; final String repositoryName = "repo-" + indexName; @@ -966,7 +959,7 @@ public void testSnapshotTotalAndIncrementalSizes() throws IOException { createFullSnapshot(repositoryName, snapshot1); // drop 1st one to avoid miscalculation as snapshot reuses some files of prev snapshot - assertAcked(client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot0).get()); + assertAcked(startDeleteSnapshot(repositoryName, snapshot0).get()); response = client.admin().cluster().prepareSnapshotStatus(repositoryName) .setSnapshots(snapshot1) @@ -1209,24 +1202,9 @@ public void testAbortWaitsOnDataNode() throws Exception { createRepository(repoName, "mock"); blockAllDataNodes(repoName); final String snapshotName = "test-snap"; - final ActionFuture snapshotResponse = - client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true).execute(); + final ActionFuture snapshotResponse = startFullSnapshot(repoName, snapshotName); waitForBlock(dataNodeName, repoName, TimeValue.timeValueSeconds(30L)); - final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, otherDataNode); - final PlainActionFuture abortVisibleFuture = PlainActionFuture.newFuture(); - clusterService.addListener(new ClusterStateListener() { - @Override - public void clusterChanged(ClusterChangedEvent event) { - final SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE); - if (snapshotsInProgress != null && snapshotsInProgress.entries().stream() - .anyMatch(entry -> entry.state() == SnapshotsInProgress.State.ABORTED)) { - abortVisibleFuture.onResponse(null); - clusterService.removeListener(this); - } - } - }); - final AtomicBoolean blocked = new AtomicBoolean(true); final TransportService transportService = internalCluster().getInstance(TransportService.class, otherDataNode); @@ -1241,10 +1219,10 @@ public void onRequestSent(DiscoveryNode node, long requestId, String action, Tra }); logger.info("--> abort snapshot"); - final ActionFuture deleteResponse = - client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).execute(); + final ActionFuture deleteResponse = startDeleteSnapshot(repoName, snapshotName); - abortVisibleFuture.get(30L, TimeUnit.SECONDS); + awaitClusterState(otherDataNode, state -> state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY) + .entries().stream().anyMatch(entry -> entry.state() == SnapshotsInProgress.State.ABORTED)); assertFalse("delete should not be able to finish until data node is unblocked", deleteResponse.isDone()); blocked.set(false); @@ -1261,8 +1239,7 @@ public void testPartialSnapshotAllShardsMissing() throws Exception { createIndex("some-index"); stopNode(dataNode); ensureStableCluster(1); - final CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot(repoName, "test-snap") - .setPartial(true).setWaitForCompletion(true).get(); + final CreateSnapshotResponse createSnapshotResponse = startFullSnapshot(repoName, "test-snap", true).get(); assertThat(createSnapshotResponse.getSnapshotInfo().state(), is(SnapshotState.PARTIAL)); } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index b9ddad7a8322e..a76587deaf576 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -20,7 +20,6 @@ package org.elasticsearch.snapshots; import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; @@ -101,6 +100,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Predicate; @@ -140,7 +140,6 @@ import static org.hamcrest.Matchers.nullValue; // The tests in here do a lot of state updates and other writes to disk and are slowed down too much by WindowsFS -@LuceneTestCase.SuppressFileSystems(value = "WindowsFS") public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCase { @Override @@ -423,8 +422,7 @@ public void testEmptySnapshot() throws Exception { createRepository("test-repo", "fs"); logger.info("--> snapshot"); - CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") - .setWaitForCompletion(true).get(); + CreateSnapshotResponse createSnapshotResponse = startFullSnapshot("test-repo", "test-snap").get(); assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), equalTo(0)); assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(0)); @@ -1186,8 +1184,7 @@ public void testDeleteSnapshot() throws Exception { assertDocCount("test-idx", 10L * numberOfSnapshots); - logger.info("--> delete the last snapshot"); - client.admin().cluster().prepareDeleteSnapshot("test-repo", lastSnapshot).get(); + startDeleteSnapshot("test-repo", lastSnapshot).get(); logger.info("--> make sure that number of files is back to what it was when the first snapshot was made"); assertFileCount(repo, numberOfFiles[0]); } @@ -1318,8 +1315,7 @@ public void testDeleteSnapshotWithMissingIndexAndShardMetadata() throws Exceptio Files.delete(shardZero.resolve("snap-" + snapshotInfo.snapshotId().getUUID() + ".dat")); } - logger.info("--> delete snapshot"); - client.admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap-1").get(); + startDeleteSnapshot("test-repo", "test-snap-1").get(); logger.info("--> make sure snapshot doesn't exist"); @@ -1354,8 +1350,7 @@ public void testDeleteSnapshotWithMissingMetadata() throws Exception { Path metadata = repo.resolve("meta-" + createSnapshotResponse.getSnapshotInfo().snapshotId().getUUID() + ".dat"); Files.delete(metadata); - logger.info("--> delete snapshot"); - client.admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap-1").get(); + startDeleteSnapshot("test-repo", "test-snap-1").get(); logger.info("--> make sure snapshot doesn't exist"); expectThrows(SnapshotMissingException.class, () -> client.admin().cluster().prepareGetSnapshots("test-repo") @@ -1388,8 +1383,7 @@ public void testDeleteSnapshotWithCorruptedSnapshotFile() throws Exception { try(SeekableByteChannel outChan = Files.newByteChannel(snapshotPath, StandardOpenOption.WRITE)) { outChan.truncate(randomInt(10)); } - logger.info("--> delete snapshot"); - client.admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap-1").get(); + startDeleteSnapshot("test-repo", "test-snap-1").get(); logger.info("--> make sure snapshot doesn't exist"); expectThrows(SnapshotMissingException.class, @@ -1442,7 +1436,7 @@ public void testDeleteSnapshotWithCorruptedGlobalState() throws Exception { assertThat(snapshotStatusResponse.getSnapshots(), hasSize(1)); assertThat(snapshotStatusResponse.getSnapshots().get(0).getSnapshot().getSnapshotId().getName(), equalTo("test-snap")); - assertAcked(client().admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap").get()); + assertAcked(startDeleteSnapshot("test-repo", "test-snap").get()); expectThrows(SnapshotMissingException.class, () -> client().admin().cluster() .prepareGetSnapshots("test-repo").addSnapshots("test-snap").get().getSnapshots("test-repo")); assertRequestBuilderThrows(client().admin().cluster().prepareSnapshotStatus("test-repo").addSnapshots("test-snap"), @@ -2637,7 +2631,7 @@ public void testRestoreSnapshotWithCorruptedIndexMetadata() throws Exception { } } - assertAcked(client().admin().cluster().prepareDeleteSnapshot("test-repo", snapshotInfo.snapshotId().getName()).get()); + assertAcked(startDeleteSnapshot("test-repo", snapshotInfo.snapshotId().getName()).get()); } /** @@ -2756,8 +2750,7 @@ public void testCannotCreateSnapshotsWithSameName() throws Exception { assertThat(e.getMessage(), containsString("snapshot with the same name already exists")); } - logger.info("--> delete the first snapshot"); - client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshotName).get(); + startDeleteSnapshot(repositoryName, snapshotName).get(); logger.info("--> try creating a snapshot with the same name, now it should work because the first one was deleted"); createSnapshotResponse = client.admin() @@ -2816,7 +2809,7 @@ public void testGetSnapshotsRequest() throws Exception { assertEquals(1, getSnapshotsResponse.getSnapshots("test-repo").size()); assertEquals("snap-on-empty-repo", getSnapshotsResponse.getSnapshots("test-repo").get(0).snapshotId().getName()); unblockNode(repositoryName, initialBlockedNode); // unblock node - client.admin().cluster().prepareDeleteSnapshot(repositoryName, "snap-on-empty-repo").get(); + startDeleteSnapshot(repositoryName, "snap-on-empty-repo").get(); final int numSnapshots = randomIntBetween(1, 3) + 1; logger.info("--> take {} snapshot(s)", numSnapshots - 1); @@ -3319,7 +3312,7 @@ public void testRestoreIncreasesPrimaryTerms() { assertThat(restoredIndexMetadata.getSettings().get(IndexMetadata.SETTING_HISTORY_UUID), notNullValue()); } - public void testSnapshotDifferentIndicesBySameName() throws InterruptedException { + public void testSnapshotDifferentIndicesBySameName() throws InterruptedException, ExecutionException { String indexName = "testindex"; String repoName = "test-repo"; Path absolutePath = randomRepoPath().toAbsolutePath(); @@ -3376,8 +3369,7 @@ public void testSnapshotDifferentIndicesBySameName() throws InterruptedException snapshotToRestore = "snap-1"; expectedCount = docCount; } - logger.info("--> deleting snapshot [{}]", snapshotToDelete); - assertAcked(client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotToDelete).get()); + assertAcked(startDeleteSnapshot(repoName, snapshotToDelete).get()); logger.info("--> restoring snapshot [{}]", snapshotToRestore); client().admin().cluster().prepareRestoreSnapshot(repoName, snapshotToRestore).setIndices(indexName).setRenamePattern(indexName) .setRenameReplacement("restored-3").setWaitForCompletion(true).get(); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java index c175b679ba5ac..03cea2203b659 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java @@ -101,16 +101,9 @@ public void testStatusAPICallInProgressSnapshot() throws Exception { logger.info("--> wait for data nodes to get blocked"); waitForBlockOnAnyDataNode("test-repo", TimeValue.timeValueMinutes(1)); - - assertBusy(() -> { - try { - assertEquals(SnapshotsInProgress.State.STARTED, client.admin().cluster().snapshotsStatus( - new SnapshotsStatusRequest("test-repo", new String[]{"test-snap"})).actionGet().getSnapshots().get(0) - .getState()); - } catch (SnapshotMissingException sme) { - throw new AssertionError(sme); - } - }, 1L, TimeUnit.MINUTES); + awaitNumberOfSnapshotsInProgress(1); + assertEquals(SnapshotsInProgress.State.STARTED, client.admin().cluster().prepareSnapshotStatus("test-repo") + .setSnapshots("test-snap").get().getSnapshots().get(0).getState()); logger.info("--> unblock all data nodes"); unblockAllDataNodes("test-repo"); @@ -160,15 +153,12 @@ public void testExceptionOnMissingShardLevelSnapBlob() throws IOException { .prepareSnapshotStatus("test-repo").setSnapshots("test-snap").execute().actionGet()); } - public void testGetSnapshotsWithoutIndices() { + public void testGetSnapshotsWithoutIndices() throws Exception { createRepository("test-repo", "fs"); logger.info("--> snapshot"); - final SnapshotInfo snapshotInfo = - client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") - .setIndices().setWaitForCompletion(true).get().getSnapshotInfo(); - - assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS)); + final SnapshotInfo snapshotInfo = assertSuccessful(client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") + .setIndices().setWaitForCompletion(true).execute()); assertThat(snapshotInfo.totalShards(), is(0)); logger.info("--> verify that snapshot without index shows up in non-verbose listing"); diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index 217209198fc74..3929cf4170686 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -19,11 +19,13 @@ package org.elasticsearch.snapshots; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.SnapshotDeletionsInProgress; @@ -440,6 +442,10 @@ protected void awaitNoMoreRunningOperations(String viaNode) throws Exception { state.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY).hasDeletionsInProgress() == false); } + protected void awaitClusterState(Predicate statePredicate) throws Exception { + awaitClusterState(internalCluster().getMasterName(), statePredicate); + } + protected void awaitClusterState(String viaNode, Predicate statePredicate) throws Exception { final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, viaNode); final ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, viaNode); @@ -465,4 +471,52 @@ public void onTimeout(TimeValue timeout) { future.get(30L, TimeUnit.SECONDS); } } + + protected ActionFuture startFullSnapshotBlockedOnDataNode(String snapshotName, String repoName, + String dataNode) throws InterruptedException { + blockDataNode(repoName, dataNode); + final ActionFuture fut = startFullSnapshot(repoName, snapshotName); + waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L)); + return fut; + } + + protected ActionFuture startFullSnapshot(String repoName, String snapshotName) { + return startFullSnapshot(repoName, snapshotName, false); + } + + protected ActionFuture startFullSnapshot(String repoName, String snapshotName, boolean partial) { + logger.info("--> creating full snapshot [{}] to repo [{}]", snapshotName, repoName); + return client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true) + .setPartial(partial).execute(); + } + + protected void awaitNumberOfSnapshotsInProgress(int count) throws Exception { + logger.info("--> wait for [{}] snapshots to show up in the cluster state", count); + awaitClusterState(state -> + state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries().size() == count); + } + + protected static SnapshotInfo assertSuccessful(ActionFuture future) throws Exception { + final SnapshotInfo snapshotInfo = future.get().getSnapshotInfo(); + assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS)); + return snapshotInfo; + } + + private static final Settings SINGLE_SHARD_NO_REPLICA = indexSettingsNoReplicas(1).build(); + + protected void createIndexWithContent(String indexName) { + createIndexWithContent(indexName, SINGLE_SHARD_NO_REPLICA); + } + + protected void createIndexWithContent(String indexName, Settings indexSettings) { + logger.info("--> creating index [{}]", indexName); + createIndex(indexName, indexSettings); + ensureGreen(indexName); + indexDoc(indexName, "some_id", "foo", "bar"); + } + + protected ActionFuture startDeleteSnapshot(String repoName, String snapshotName) { + logger.info("--> deleting snapshot [{}] from repo [{}]", snapshotName, repoName); + return client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).execute(); + } }