diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index ae26e958b7aee..a334479671b18 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -226,7 +226,8 @@ public ClusterState execute(ClusterState currentState) { RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE); // Check if the snapshot to restore is currently being deleted SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); - if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { + if (deletionsInProgress != null + && deletionsInProgress.getEntries().stream().anyMatch(entry -> entry.getSnapshot().equals(snapshot))) { throw new ConcurrentSnapshotExecutionException(snapshot, "cannot restore a snapshot while a snapshot deletion is in-progress [" + deletionsInProgress.getEntries().get(0).getSnapshot() + "]"); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index ad3f99733b427..22abd8dd705aa 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -1279,9 +1279,12 @@ public ClusterState execute(ClusterState currentState) { // don't allow snapshot deletions while a restore is taking place, // otherwise we could end up deleting a snapshot that is being restored // and the files the restore depends on would all be gone - if (restoreInProgress.isEmpty() == false) { - throw new ConcurrentSnapshotExecutionException(snapshot, - "cannot delete snapshot during a restore in progress in [" + restoreInProgress + "]"); + + for (RestoreInProgress.Entry entry : restoreInProgress) { + if (entry.snapshot().equals(snapshot)) { + throw new ConcurrentSnapshotExecutionException(snapshot, + "cannot delete snapshot during a restore in progress in [" + restoreInProgress + "]"); + } } } ClusterState.Builder clusterStateBuilder = ClusterState.builder(currentState); diff --git a/server/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java index e93bb00d3d09e..9f437cdf70d35 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java @@ -152,58 +152,4 @@ public void testSnapshottingWithInProgressDeletionNotAllowed() throws Exception client().admin().cluster().prepareCreateSnapshot(repo, snapshot2).setWaitForCompletion(true).get(); assertEquals(1, client().admin().cluster().prepareGetSnapshots(repo).setSnapshots("_all").get().getSnapshots(repo).size()); } - - public void testRestoreWithInProgressDeletionsNotAllowed() throws Exception { - logger.info("--> creating repository"); - final String repo = "test-repo"; - assertAcked(client().admin().cluster().preparePutRepository(repo).setType("mock").setSettings( - Settings.builder() - .put("location", randomRepoPath()) - .put("random", randomAlphaOfLength(10)) - .put("wait_after_unblock", 200)).get()); - - logger.info("--> snapshot"); - final String index = "test-idx"; - assertAcked(prepareCreate(index, 1, Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0))); - for (int i = 0; i < 10; i++) { - indexDoc(index, Integer.toString(i), "foo", "bar" + i); - } - refresh(); - final String snapshot1 = "test-snap1"; - client().admin().cluster().prepareCreateSnapshot(repo, snapshot1).setWaitForCompletion(true).get(); - final String index2 = "test-idx2"; - assertAcked(prepareCreate(index2, 1, Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0))); - for (int i = 0; i < 10; i++) { - indexDoc(index2, Integer.toString(i), "foo", "bar" + i); - } - refresh(); - final String snapshot2 = "test-snap2"; - client().admin().cluster().prepareCreateSnapshot(repo, snapshot2).setWaitForCompletion(true).get(); - client().admin().indices().prepareClose(index, index2).get(); - - String blockedNode = internalCluster().getMasterName(); - ((MockRepository)internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)).blockOnDataFiles(true); - logger.info("--> start deletion of snapshot"); - ActionFuture future = client().admin().cluster().prepareDeleteSnapshot(repo, snapshot2).execute(); - logger.info("--> waiting for block to kick in on node [{}]", blockedNode); - waitForBlock(blockedNode, repo, TimeValue.timeValueSeconds(10)); - - logger.info("--> try restoring the other snapshot, should fail because the deletion is in progress"); - try { - client().admin().cluster().prepareRestoreSnapshot(repo, snapshot1).setWaitForCompletion(true).get(); - fail("should not be able to restore a snapshot while another is being deleted"); - } catch (ConcurrentSnapshotExecutionException e) { - assertThat(e.getMessage(), containsString("cannot restore a snapshot while a snapshot deletion is in-progress")); - } - - logger.info("--> unblocking blocked node [{}]", blockedNode); - unblockNode(repo, blockedNode); - - logger.info("--> wait until snapshot deletion is finished"); - assertAcked(future.actionGet()); - - logger.info("--> restoring snapshot, which should now work"); - client().admin().cluster().prepareRestoreSnapshot(repo, snapshot1).setWaitForCompletion(true).get(); - assertEquals(1, client().admin().cluster().prepareGetSnapshots(repo).setSnapshots("_all").get().getSnapshots(repo).size()); - } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index d5f6c98711cf4..d74f99b9b9326 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -2712,13 +2712,6 @@ public void testDeleteSnapshotWhileRestoringFails() throws Exception { assertEquals(repoName, e.getRepositoryName()); assertEquals(snapshotName, e.getSnapshotName()); assertThat(e.getMessage(), containsString("cannot delete snapshot during a restore")); - - logger.info("-- try deleting another snapshot while the restore is in progress (should throw an error)"); - e = expectThrows(ConcurrentSnapshotExecutionException.class, () -> - client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName2).get()); - assertEquals(repoName, e.getRepositoryName()); - assertEquals(snapshotName2, e.getSnapshotName()); - assertThat(e.getMessage(), containsString("cannot delete snapshot during a restore")); } finally { // unblock even if the try block fails otherwise we will get bogus failures when we delete all indices in test teardown. logger.info("--> unblocking all data nodes"); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 2278ed75a7f84..2c3c276d8d2d9 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -207,6 +207,7 @@ import static org.elasticsearch.action.support.ActionTestUtils.assertNoFailureListener; import static org.elasticsearch.env.Environment.PATH_HOME_SETTING; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.empty; @@ -517,6 +518,92 @@ public void testConcurrentSnapshotCreateAndDeleteOther() { } } + public void testConcurrentSnapshotRestoreAndDeleteOther() { + setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10)); + + String repoName = "repo"; + String snapshotName = "snapshot"; + final String index = "test"; + final int shards = randomIntBetween(1, 10); + + TestClusterNodes.TestClusterNode masterNode = + testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state()); + + final StepListener createSnapshotResponseStepListener = new StepListener<>(); + + final int documentsFirstSnapshot = randomIntBetween(0, 100); + + continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse -> indexNDocuments( + documentsFirstSnapshot, index, () -> client().admin().cluster() + .prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true).execute(createSnapshotResponseStepListener))); + + final int documentsSecondSnapshot = randomIntBetween(0, 100); + + final StepListener createOtherSnapshotResponseStepListener = new StepListener<>(); + + final String secondSnapshotName = "snapshot-2"; + continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> indexNDocuments( + documentsSecondSnapshot, index, () -> client().admin().cluster().prepareCreateSnapshot(repoName, secondSnapshotName) + .setWaitForCompletion(true).execute(createOtherSnapshotResponseStepListener))); + + final StepListener deleteSnapshotStepListener = new StepListener<>(); + final StepListener restoreSnapshotResponseListener = new StepListener<>(); + + continueOrDie(createOtherSnapshotResponseStepListener, + createSnapshotResponse -> { + scheduleNow( + () -> client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).execute(deleteSnapshotStepListener)); + scheduleNow(() -> client().admin().cluster().restoreSnapshot( + new RestoreSnapshotRequest(repoName, secondSnapshotName).waitForCompletion(true) + .renamePattern("(.+)").renameReplacement("restored_$1"), + restoreSnapshotResponseListener)); + }); + + final StepListener searchResponseListener = new StepListener<>(); + continueOrDie(restoreSnapshotResponseListener, restoreSnapshotResponse -> { + assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards()); + client().search(new SearchRequest("restored_" + index).source(new SearchSourceBuilder().size(0).trackTotalHits(true)), + searchResponseListener); + }); + + deterministicTaskQueue.runAllRunnableTasks(); + + assertEquals(documentsFirstSnapshot + documentsSecondSnapshot, + Objects.requireNonNull(searchResponseListener.result().getHits().getTotalHits()).value); + assertThat(deleteSnapshotStepListener.result().isAcknowledged(), is(true)); + assertThat(restoreSnapshotResponseListener.result().getRestoreInfo().failedShards(), is(0)); + + final Repository repository = masterNode.repositoriesService.repository(repoName); + Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); + assertThat(snapshotIds, contains(createOtherSnapshotResponseStepListener.result().getSnapshotInfo().snapshotId())); + + for (SnapshotId snapshotId : snapshotIds) { + final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId); + assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); + assertThat(snapshotInfo.indices(), containsInAnyOrder(index)); + assertEquals(shards, snapshotInfo.successfulShards()); + assertEquals(0, snapshotInfo.failedShards()); + } + } + + private void indexNDocuments(int documents, String index, Runnable afterIndexing) { + if (documents == 0) { + afterIndexing.run(); + return; + } + final BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int i = 0; i < documents; ++i) { + bulkRequest.add(new IndexRequest(index).source(Collections.singletonMap("foo", "bar" + i))); + } + final StepListener bulkResponseStepListener = new StepListener<>(); + client().bulk(bulkRequest, bulkResponseStepListener); + continueOrDie(bulkResponseStepListener, bulkResponse -> { + assertFalse("Failures in bulk response: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures()); + assertEquals(documents, bulkResponse.getItems().length); + afterIndexing.run(); + }); + } + public void testConcurrentSnapshotDeleteAndDeleteIndex() throws IOException { setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));