From 68299a42f9b937484550b0ad4e237fa438958cec Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 29 Jan 2020 13:54:05 +0100 Subject: [PATCH 1/4] Allow Parallel Snapshot Restore And Delete There is no reason not to allow deletes in parallel to restores if they're dealing with different snapshots. A delete will not remove any files related to the snapshot that is being restored if it is different from the deleted snapshot because those files will still be referenced by the restoring snapshot. Loading RepositoryData concurrently to modifying it is concurrency safe nowadays as well since the repo generation is tracked in the cluster state. Closes #41463 --- .../snapshots/RestoreService.java | 3 +- .../snapshots/SnapshotsService.java | 9 ++- .../MinThreadsSnapshotRestoreIT.java | 54 --------------- .../SharedClusterSnapshotRestoreIT.java | 67 ------------------- .../snapshots/SnapshotResiliencyTests.java | 57 ++++++++++++++++ 5 files changed, 65 insertions(+), 125 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index ae26e958b7aee..a334479671b18 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -226,7 +226,8 @@ public ClusterState execute(ClusterState currentState) { RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE); // Check if the snapshot to restore is currently being deleted SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); - if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { + if (deletionsInProgress != null + && deletionsInProgress.getEntries().stream().anyMatch(entry -> entry.getSnapshot().equals(snapshot))) { throw new ConcurrentSnapshotExecutionException(snapshot, "cannot restore a snapshot while a snapshot deletion is in-progress [" + deletionsInProgress.getEntries().get(0).getSnapshot() + "]"); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index ad3f99733b427..22abd8dd705aa 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -1279,9 +1279,12 @@ public ClusterState execute(ClusterState currentState) { // don't allow snapshot deletions while a restore is taking place, // otherwise we could end up deleting a snapshot that is being restored // and the files the restore depends on would all be gone - if (restoreInProgress.isEmpty() == false) { - throw new ConcurrentSnapshotExecutionException(snapshot, - "cannot delete snapshot during a restore in progress in [" + restoreInProgress + "]"); + + for (RestoreInProgress.Entry entry : restoreInProgress) { + if (entry.snapshot().equals(snapshot)) { + throw new ConcurrentSnapshotExecutionException(snapshot, + "cannot delete snapshot during a restore in progress in [" + restoreInProgress + "]"); + } } } ClusterState.Builder clusterStateBuilder = ClusterState.builder(currentState); diff --git a/server/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java index e93bb00d3d09e..9f437cdf70d35 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java @@ -152,58 +152,4 @@ public void testSnapshottingWithInProgressDeletionNotAllowed() throws Exception client().admin().cluster().prepareCreateSnapshot(repo, snapshot2).setWaitForCompletion(true).get(); assertEquals(1, client().admin().cluster().prepareGetSnapshots(repo).setSnapshots("_all").get().getSnapshots(repo).size()); } - - public void testRestoreWithInProgressDeletionsNotAllowed() throws Exception { - logger.info("--> creating repository"); - final String repo = "test-repo"; - assertAcked(client().admin().cluster().preparePutRepository(repo).setType("mock").setSettings( - Settings.builder() - .put("location", randomRepoPath()) - .put("random", randomAlphaOfLength(10)) - .put("wait_after_unblock", 200)).get()); - - logger.info("--> snapshot"); - final String index = "test-idx"; - assertAcked(prepareCreate(index, 1, Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0))); - for (int i = 0; i < 10; i++) { - indexDoc(index, Integer.toString(i), "foo", "bar" + i); - } - refresh(); - final String snapshot1 = "test-snap1"; - client().admin().cluster().prepareCreateSnapshot(repo, snapshot1).setWaitForCompletion(true).get(); - final String index2 = "test-idx2"; - assertAcked(prepareCreate(index2, 1, Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0))); - for (int i = 0; i < 10; i++) { - indexDoc(index2, Integer.toString(i), "foo", "bar" + i); - } - refresh(); - final String snapshot2 = "test-snap2"; - client().admin().cluster().prepareCreateSnapshot(repo, snapshot2).setWaitForCompletion(true).get(); - client().admin().indices().prepareClose(index, index2).get(); - - String blockedNode = internalCluster().getMasterName(); - ((MockRepository)internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)).blockOnDataFiles(true); - logger.info("--> start deletion of snapshot"); - ActionFuture future = client().admin().cluster().prepareDeleteSnapshot(repo, snapshot2).execute(); - logger.info("--> waiting for block to kick in on node [{}]", blockedNode); - waitForBlock(blockedNode, repo, TimeValue.timeValueSeconds(10)); - - logger.info("--> try restoring the other snapshot, should fail because the deletion is in progress"); - try { - client().admin().cluster().prepareRestoreSnapshot(repo, snapshot1).setWaitForCompletion(true).get(); - fail("should not be able to restore a snapshot while another is being deleted"); - } catch (ConcurrentSnapshotExecutionException e) { - assertThat(e.getMessage(), containsString("cannot restore a snapshot while a snapshot deletion is in-progress")); - } - - logger.info("--> unblocking blocked node [{}]", blockedNode); - unblockNode(repo, blockedNode); - - logger.info("--> wait until snapshot deletion is finished"); - assertAcked(future.actionGet()); - - logger.info("--> restoring snapshot, which should now work"); - client().admin().cluster().prepareRestoreSnapshot(repo, snapshot1).setWaitForCompletion(true).get(); - assertEquals(1, client().admin().cluster().prepareGetSnapshots(repo).setSnapshots("_all").get().getSnapshots(repo).size()); - } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index d5f6c98711cf4..fe6eb309208af 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -2662,73 +2662,6 @@ public void testCloseIndexDuringRestore() throws Exception { assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0)); } - public void testDeleteSnapshotWhileRestoringFails() throws Exception { - Client client = client(); - - logger.info("--> creating repository"); - final String repoName = "test-repo"; - assertAcked(client.admin().cluster().preparePutRepository(repoName) - .setType("mock") - .setSettings(Settings.builder().put("location", randomRepoPath()))); - - logger.info("--> creating index"); - final String indexName = "test-idx"; - assertAcked(prepareCreate(indexName).setWaitForActiveShards(ActiveShardCount.ALL)); - - logger.info("--> indexing some data"); - for (int i = 0; i < 100; i++) { - indexDoc(indexName, Integer.toString(i), "foo", "bar" + i); - } - refresh(); - assertThat(client.prepareSearch(indexName).setSize(0).get().getHits().getTotalHits().value, equalTo(100L)); - - logger.info("--> take snapshots"); - final String snapshotName = "test-snap"; - assertThat(client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName) - .setIndices(indexName).setWaitForCompletion(true).get().getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS)); - final String snapshotName2 = "test-snap-2"; - assertThat(client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName2) - .setIndices(indexName).setWaitForCompletion(true).get().getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS)); - - logger.info("--> delete index before restoring"); - assertAcked(client.admin().indices().prepareDelete(indexName).get()); - - logger.info("--> execution will be blocked on all data nodes"); - blockAllDataNodes(repoName); - - final ActionFuture restoreFut; - try { - logger.info("--> start restore"); - restoreFut = client.admin().cluster().prepareRestoreSnapshot(repoName, snapshotName) - .setWaitForCompletion(true) - .execute(); - - logger.info("--> waiting for block to kick in"); - waitForBlockOnAnyDataNode(repoName, TimeValue.timeValueMinutes(1)); - - logger.info("--> try deleting the snapshot while the restore is in progress (should throw an error)"); - ConcurrentSnapshotExecutionException e = expectThrows(ConcurrentSnapshotExecutionException.class, () -> - client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).get()); - assertEquals(repoName, e.getRepositoryName()); - assertEquals(snapshotName, e.getSnapshotName()); - assertThat(e.getMessage(), containsString("cannot delete snapshot during a restore")); - - logger.info("-- try deleting another snapshot while the restore is in progress (should throw an error)"); - e = expectThrows(ConcurrentSnapshotExecutionException.class, () -> - client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName2).get()); - assertEquals(repoName, e.getRepositoryName()); - assertEquals(snapshotName2, e.getSnapshotName()); - assertThat(e.getMessage(), containsString("cannot delete snapshot during a restore")); - } finally { - // unblock even if the try block fails otherwise we will get bogus failures when we delete all indices in test teardown. - logger.info("--> unblocking all data nodes"); - unblockAllDataNodes(repoName); - } - - logger.info("--> wait for restore to finish"); - restoreFut.get(); - } - private void waitForIndex(final String index, TimeValue timeout) throws Exception { assertBusy( () -> assertTrue("Expected index [" + index + "] to exist", indexExists(index)), diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 2278ed75a7f84..3750eaf26c390 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -517,6 +517,63 @@ public void testConcurrentSnapshotCreateAndDeleteOther() { } } + public void testConcurrentSnapshotRestoreAndDeleteOther() { + setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10)); + + String repoName = "repo"; + String snapshotName = "snapshot"; + final String index = "test"; + final int shards = randomIntBetween(1, 10); + + TestClusterNodes.TestClusterNode masterNode = + testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state()); + + final StepListener createSnapshotResponseStepListener = new StepListener<>(); + + continueOrDie(createRepoAndIndex(repoName, index, shards), + createIndexResponse -> client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName) + .setWaitForCompletion(true).execute(createSnapshotResponseStepListener)); + + final StepListener createOtherSnapshotResponseStepListener = new StepListener<>(); + + continueOrDie(createSnapshotResponseStepListener, + createSnapshotResponse -> client().admin().cluster().prepareCreateSnapshot(repoName, "snapshot-2") + .setWaitForCompletion(true) + .execute(createOtherSnapshotResponseStepListener)); + + final StepListener deleteSnapshotStepListener = new StepListener<>(); + final StepListener restoreSnapshotResponseListener = new StepListener<>(); + + continueOrDie(createOtherSnapshotResponseStepListener, + createSnapshotResponse -> { + scheduleNow( + () -> client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).execute(deleteSnapshotStepListener)); + scheduleNow(() -> client().admin().cluster().restoreSnapshot( + new RestoreSnapshotRequest(repoName, "snapshot-2").waitForCompletion(true) + .renamePattern("(.+)").renameReplacement("restored_$1"), + restoreSnapshotResponseListener)); + }); + + deterministicTaskQueue.runAllRunnableTasks(); + + assertThat(deleteSnapshotStepListener.result().isAcknowledged(), is(true)); + assertThat(restoreSnapshotResponseListener.result().getRestoreInfo().failedShards(), is(0)); + + SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); + assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); + final Repository repository = masterNode.repositoriesService.repository(repoName); + Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); + assertThat(snapshotIds, hasSize(1)); + + for (SnapshotId snapshotId : snapshotIds) { + final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId); + assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); + assertThat(snapshotInfo.indices(), containsInAnyOrder(index)); + assertEquals(shards, snapshotInfo.successfulShards()); + assertEquals(0, snapshotInfo.failedShards()); + } + } + public void testConcurrentSnapshotDeleteAndDeleteIndex() throws IOException { setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10)); From 52bbaf4308e07725e36da688edcc0c4ecd875406 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 29 Jan 2020 14:46:21 +0100 Subject: [PATCH 2/4] this test still makes sense ... --- .../SharedClusterSnapshotRestoreIT.java | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index fe6eb309208af..d74f99b9b9326 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -2662,6 +2662,66 @@ public void testCloseIndexDuringRestore() throws Exception { assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0)); } + public void testDeleteSnapshotWhileRestoringFails() throws Exception { + Client client = client(); + + logger.info("--> creating repository"); + final String repoName = "test-repo"; + assertAcked(client.admin().cluster().preparePutRepository(repoName) + .setType("mock") + .setSettings(Settings.builder().put("location", randomRepoPath()))); + + logger.info("--> creating index"); + final String indexName = "test-idx"; + assertAcked(prepareCreate(indexName).setWaitForActiveShards(ActiveShardCount.ALL)); + + logger.info("--> indexing some data"); + for (int i = 0; i < 100; i++) { + indexDoc(indexName, Integer.toString(i), "foo", "bar" + i); + } + refresh(); + assertThat(client.prepareSearch(indexName).setSize(0).get().getHits().getTotalHits().value, equalTo(100L)); + + logger.info("--> take snapshots"); + final String snapshotName = "test-snap"; + assertThat(client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName) + .setIndices(indexName).setWaitForCompletion(true).get().getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS)); + final String snapshotName2 = "test-snap-2"; + assertThat(client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName2) + .setIndices(indexName).setWaitForCompletion(true).get().getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS)); + + logger.info("--> delete index before restoring"); + assertAcked(client.admin().indices().prepareDelete(indexName).get()); + + logger.info("--> execution will be blocked on all data nodes"); + blockAllDataNodes(repoName); + + final ActionFuture restoreFut; + try { + logger.info("--> start restore"); + restoreFut = client.admin().cluster().prepareRestoreSnapshot(repoName, snapshotName) + .setWaitForCompletion(true) + .execute(); + + logger.info("--> waiting for block to kick in"); + waitForBlockOnAnyDataNode(repoName, TimeValue.timeValueMinutes(1)); + + logger.info("--> try deleting the snapshot while the restore is in progress (should throw an error)"); + ConcurrentSnapshotExecutionException e = expectThrows(ConcurrentSnapshotExecutionException.class, () -> + client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).get()); + assertEquals(repoName, e.getRepositoryName()); + assertEquals(snapshotName, e.getSnapshotName()); + assertThat(e.getMessage(), containsString("cannot delete snapshot during a restore")); + } finally { + // unblock even if the try block fails otherwise we will get bogus failures when we delete all indices in test teardown. + logger.info("--> unblocking all data nodes"); + unblockAllDataNodes(repoName); + } + + logger.info("--> wait for restore to finish"); + restoreFut.get(); + } + private void waitForIndex(final String index, TimeValue timeout) throws Exception { assertBusy( () -> assertTrue("Expected index [" + index + "] to exist", indexExists(index)), From ece5a807f3ec17b1667ce8fad0a5bceb95e858fb Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 30 Jan 2020 10:42:21 +0100 Subject: [PATCH 3/4] more accurate test --- .../org/elasticsearch/snapshots/SnapshotResiliencyTests.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 3750eaf26c390..787cae85f333c 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -207,6 +207,7 @@ import static org.elasticsearch.action.support.ActionTestUtils.assertNoFailureListener; import static org.elasticsearch.env.Environment.PATH_HOME_SETTING; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.empty; @@ -559,11 +560,9 @@ public void testConcurrentSnapshotRestoreAndDeleteOther() { assertThat(deleteSnapshotStepListener.result().isAcknowledged(), is(true)); assertThat(restoreSnapshotResponseListener.result().getRestoreInfo().failedShards(), is(0)); - SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); - assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); final Repository repository = masterNode.repositoriesService.repository(repoName); Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); - assertThat(snapshotIds, hasSize(1)); + assertThat(snapshotIds, contains(createOtherSnapshotResponseStepListener.result().getSnapshotInfo().snapshotId())); for (SnapshotId snapshotId : snapshotIds) { final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId); From e8fe17ecd035527bc1863f6d7f32e6c78e4999d5 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 30 Jan 2020 11:25:57 +0100 Subject: [PATCH 4/4] index some docs --- .../snapshots/SnapshotResiliencyTests.java | 47 +++++++++++++++---- 1 file changed, 39 insertions(+), 8 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 787cae85f333c..2c3c276d8d2d9 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -531,16 +531,20 @@ public void testConcurrentSnapshotRestoreAndDeleteOther() { final StepListener createSnapshotResponseStepListener = new StepListener<>(); - continueOrDie(createRepoAndIndex(repoName, index, shards), - createIndexResponse -> client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName) - .setWaitForCompletion(true).execute(createSnapshotResponseStepListener)); + final int documentsFirstSnapshot = randomIntBetween(0, 100); + + continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse -> indexNDocuments( + documentsFirstSnapshot, index, () -> client().admin().cluster() + .prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true).execute(createSnapshotResponseStepListener))); + + final int documentsSecondSnapshot = randomIntBetween(0, 100); final StepListener createOtherSnapshotResponseStepListener = new StepListener<>(); - continueOrDie(createSnapshotResponseStepListener, - createSnapshotResponse -> client().admin().cluster().prepareCreateSnapshot(repoName, "snapshot-2") - .setWaitForCompletion(true) - .execute(createOtherSnapshotResponseStepListener)); + final String secondSnapshotName = "snapshot-2"; + continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> indexNDocuments( + documentsSecondSnapshot, index, () -> client().admin().cluster().prepareCreateSnapshot(repoName, secondSnapshotName) + .setWaitForCompletion(true).execute(createOtherSnapshotResponseStepListener))); final StepListener deleteSnapshotStepListener = new StepListener<>(); final StepListener restoreSnapshotResponseListener = new StepListener<>(); @@ -550,13 +554,22 @@ public void testConcurrentSnapshotRestoreAndDeleteOther() { scheduleNow( () -> client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).execute(deleteSnapshotStepListener)); scheduleNow(() -> client().admin().cluster().restoreSnapshot( - new RestoreSnapshotRequest(repoName, "snapshot-2").waitForCompletion(true) + new RestoreSnapshotRequest(repoName, secondSnapshotName).waitForCompletion(true) .renamePattern("(.+)").renameReplacement("restored_$1"), restoreSnapshotResponseListener)); }); + final StepListener searchResponseListener = new StepListener<>(); + continueOrDie(restoreSnapshotResponseListener, restoreSnapshotResponse -> { + assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards()); + client().search(new SearchRequest("restored_" + index).source(new SearchSourceBuilder().size(0).trackTotalHits(true)), + searchResponseListener); + }); + deterministicTaskQueue.runAllRunnableTasks(); + assertEquals(documentsFirstSnapshot + documentsSecondSnapshot, + Objects.requireNonNull(searchResponseListener.result().getHits().getTotalHits()).value); assertThat(deleteSnapshotStepListener.result().isAcknowledged(), is(true)); assertThat(restoreSnapshotResponseListener.result().getRestoreInfo().failedShards(), is(0)); @@ -573,6 +586,24 @@ public void testConcurrentSnapshotRestoreAndDeleteOther() { } } + private void indexNDocuments(int documents, String index, Runnable afterIndexing) { + if (documents == 0) { + afterIndexing.run(); + return; + } + final BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int i = 0; i < documents; ++i) { + bulkRequest.add(new IndexRequest(index).source(Collections.singletonMap("foo", "bar" + i))); + } + final StepListener bulkResponseStepListener = new StepListener<>(); + client().bulk(bulkRequest, bulkResponseStepListener); + continueOrDie(bulkResponseStepListener, bulkResponse -> { + assertFalse("Failures in bulk response: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures()); + assertEquals(documents, bulkResponse.getItems().length); + afterIndexing.run(); + }); + } + public void testConcurrentSnapshotDeleteAndDeleteIndex() throws IOException { setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));