diff --git a/server/src/main/java/org/elasticsearch/cluster/RepositoryCleanupInProgress.java b/server/src/main/java/org/elasticsearch/cluster/RepositoryCleanupInProgress.java index a8cb897f0d369..e7c8e995dd61c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/RepositoryCleanupInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/RepositoryCleanupInProgress.java @@ -101,7 +101,7 @@ private Entry(StreamInput in) throws IOException { repositoryStateId = in.readLong(); } - private Entry(String repository, long repositoryStateId) { + public Entry(String repository, long repositoryStateId) { this.repository = repository; this.repositoryStateId = repositoryStateId; } diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java index 8e702fbdceea8..2ac12d3e93922 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java @@ -44,7 +44,7 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable i // the list of snapshot deletion request entries private final List entries; - private SnapshotDeletionsInProgress(List entries) { + public SnapshotDeletionsInProgress(List entries) { this.entries = Collections.unmodifiableList(entries); } diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java index 49e426ce18eba..a25a90bf4199f 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java @@ -49,6 +49,7 @@ import static org.elasticsearch.xpack.core.slm.history.SnapshotHistoryItem.DELETE_OPERATION; import static org.elasticsearch.xpack.core.slm.history.SnapshotHistoryStore.SLM_HISTORY_INDEX_PREFIX; import static org.elasticsearch.xpack.ilm.TimeSeriesLifecycleActionsIT.getStepKeyForIndex; +import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -383,7 +384,11 @@ public void testSnapshotInProgress() throws Exception { }); // Cancel the snapshot since it is not going to complete quickly - assertOK(client().performRequest(new Request("DELETE", "/_snapshot/" + repoId + "/" + snapshotName))); + try { + client().performRequest(new Request("DELETE", "/_snapshot/" + repoId + "/" + snapshotName)); + } catch (Exception e) { + // ignore + } } } @@ -403,9 +408,9 @@ public void testRetentionWhileSnapshotInProgress() throws Exception { initializeRepo(slowRepo, "1b"); initializeRepo(fastRepo, "10mb"); - createSnapshotPolicy(slowPolicy, "snap", "1 2 3 4 5 ?", slowRepo, indexName, true, + createSnapshotPolicy(slowPolicy, "slow-snap", "1 2 3 4 5 ?", slowRepo, indexName, true, new SnapshotRetentionConfiguration(TimeValue.timeValueSeconds(0), null, null)); - createSnapshotPolicy(fastPolicy, "snap", "1 2 3 4 5 ?", fastRepo, indexName, true, + createSnapshotPolicy(fastPolicy, "fast-snap", "1 2 3 4 5 ?", fastRepo, indexName, true, new SnapshotRetentionConfiguration(TimeValue.timeValueSeconds(0), null, null)); // Create a snapshot and wait for it to be complete (need something that can be deleted) @@ -419,6 +424,19 @@ public void testRetentionWhileSnapshotInProgress() throws Exception { logger.info("--> waiting for snapshot {} to be successful, got: {}", completedSnapshotName, snaps); List> snaps2 = (List>) snaps.get("snapshots"); assertThat(snaps2.get(0).get("state"), equalTo("SUCCESS")); + + // Check that no in_progress snapshots show up + Response response = client().performRequest(new Request("GET", "/_slm/policy")); + Map policyResponseMap; + try (InputStream content2 = response.getEntity().getContent()) { + policyResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), content2, true); + } + assertThat(policyResponseMap.size(), greaterThan(0)); + Optional> inProgress = Optional.ofNullable((Map) policyResponseMap.get(slowPolicy)) + .map(policy -> (Map) policy.get("in_progress")); + + // Ensure no snapshots are running + assertFalse("expected no in progress snapshots but got " + inProgress.orElse(null), inProgress.isPresent()); } } catch (NullPointerException | ResponseException e) { fail("unable to retrieve completed snapshot: " + e); @@ -431,11 +449,12 @@ public void testRetentionWhileSnapshotInProgress() throws Exception { // Check that the executed snapshot shows up in the SLM output as in_progress assertBusy(() -> { try { - Response response = client().performRequest(new Request("GET", "/_slm/policy" + (randomBoolean() ? "" : "?human"))); + Response response = client().performRequest(new Request("GET", "/_slm/policy")); Map policyResponseMap; try (InputStream content = response.getEntity().getContent()) { policyResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), content, true); } + logger.info("--> checking for 'slow-*' snapshot to show up in policy response, got: " + policyResponseMap); assertThat(policyResponseMap.size(), greaterThan(0)); Optional> inProgress = Optional.ofNullable((Map) policyResponseMap.get(slowPolicy)) .map(policy -> (Map) policy.get("in_progress")); @@ -444,7 +463,7 @@ public void testRetentionWhileSnapshotInProgress() throws Exception { Map inProgressMap = inProgress.get(); assertThat(inProgressMap.get("name"), equalTo(slowSnapshotName)); assertNotNull(inProgressMap.get("uuid")); - assertThat(inProgressMap.get("state"), equalTo("STARTED")); + assertThat(inProgressMap.get("state"), anyOf(equalTo("STARTED"), equalTo("INIT"))); assertThat((long) inProgressMap.get("start_time_millis"), greaterThan(0L)); assertNull(inProgressMap.get("failure")); } else { @@ -481,9 +500,10 @@ public void testRetentionWhileSnapshotInProgress() throws Exception { assertBusy(() -> { // We expect a failed response because the snapshot should not exist try { - logger.info("--> checking to see if snapshot has been deleted..."); Response response = client().performRequest(new Request("GET", "/_snapshot/" + slowRepo + "/" + completedSnapshotName)); - assertThat(EntityUtils.toString(response.getEntity()), containsString("snapshot_missing_exception")); + String resp = EntityUtils.toString(response.getEntity()); + logger.info("--> checking to see if snapshot has been deleted, got: " + resp); + assertThat(resp, containsString("snapshot_missing_exception")); } catch (ResponseException e) { assertThat(EntityUtils.toString(e.getResponse().getEntity()), containsString("snapshot_missing_exception")); } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionService.java index 50e79441fdbac..36a60ffdf9365 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionService.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionService.java @@ -37,6 +37,7 @@ public class SnapshotRetentionService implements LocalNodeMasterListener, Closea private final SchedulerEngine scheduler; private volatile String slmRetentionSchedule; + private volatile boolean isMaster = false; public SnapshotRetentionService(Settings settings, Supplier taskSupplier, @@ -63,17 +64,19 @@ SchedulerEngine getScheduler() { @Override public void onMaster() { + this.isMaster = true; rescheduleRetentionJob(); } @Override public void offMaster() { + this.isMaster = false; cancelRetentionJob(); } private void rescheduleRetentionJob() { final String schedule = this.slmRetentionSchedule; - if (Strings.hasText(schedule)) { + if (this.isMaster && Strings.hasText(schedule)) { final SchedulerEngine.Job retentionJob = new SchedulerEngine.Job(SLM_RETENTION_JOB_ID, new CronSchedule(schedule)); logger.debug("scheduling SLM retention job for [{}]", schedule); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java index 780eecb35db33..27c40fbe15ca9 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java @@ -18,6 +18,9 @@ import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; +import org.elasticsearch.cluster.RepositoryCleanupInProgress; +import org.elasticsearch.cluster.RestoreInProgress; +import org.elasticsearch.cluster.SnapshotDeletionsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; @@ -280,19 +283,21 @@ private void maybeDeleteSnapshots(Map> snapshotsToDel } ClusterState state = clusterService.state(); - if (snapshotInProgress(state)) { + if (okayToDeleteSnapshots(state)) { + deleteSnapshots(snapshotsToDelete, maximumTime, slmStats); + } else { logger.debug("a snapshot is currently running, rescheduling SLM retention for after snapshot has completed"); ClusterStateObserver observer = new ClusterStateObserver(clusterService, maximumTime, logger, threadPool.getThreadContext()); CountDownLatch latch = new CountDownLatch(1); observer.waitForNextChange( new NoSnapshotRunningListener(observer, newState -> threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(() -> { - try { - deleteSnapshots(snapshotsToDelete, maximumTime, slmStats); - } finally { - latch.countDown(); - } - }), + try { + deleteSnapshots(snapshotsToDelete, maximumTime, slmStats); + } finally { + latch.countDown(); + } + }), e -> { latch.countDown(); throw new ElasticsearchException(e); @@ -302,8 +307,6 @@ private void maybeDeleteSnapshots(Map> snapshotsToDel } catch (InterruptedException e) { throw new ElasticsearchException(e); } - } else { - deleteSnapshots(snapshotsToDelete, maximumTime, slmStats); } } @@ -412,14 +415,32 @@ void updateStateWithStats(SnapshotLifecycleStats newStats) { clusterService.submitStateUpdateTask("update_slm_stats", new UpdateSnapshotLifecycleStatsTask(newStats)); } - public static boolean snapshotInProgress(ClusterState state) { - SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE); - if (snapshotsInProgress == null || snapshotsInProgress.entries().isEmpty()) { - // No snapshots are running, new state is acceptable to proceed + public static boolean okayToDeleteSnapshots(ClusterState state) { + // Cannot delete during a snapshot + final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE); + if (snapshotsInProgress != null && snapshotsInProgress.entries().size() > 0) { + return false; + } + + // Cannot delete during an existing delete + final SnapshotDeletionsInProgress deletionsInProgress = state.custom(SnapshotDeletionsInProgress.TYPE); + if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { return false; } - // There are snapshots + // Cannot delete while a repository is being cleaned + final RepositoryCleanupInProgress repositoryCleanupInProgress = state.custom(RepositoryCleanupInProgress.TYPE); + if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.cleanupInProgress() == false) { + return false; + } + + // Cannot delete during a restore + final RestoreInProgress restoreInProgress = state.custom(RestoreInProgress.TYPE); + if (restoreInProgress != null) { + return false; + } + + // It's okay to delete snapshots return true; } @@ -445,11 +466,11 @@ class NoSnapshotRunningListener implements ClusterStateObserver.Listener { @Override public void onNewClusterState(ClusterState state) { try { - if (snapshotInProgress(state)) { - observer.waitForNextChange(this); - } else { - logger.debug("retrying SLM snapshot retention deletion after snapshot has completed"); + if (okayToDeleteSnapshots(state)) { + logger.debug("retrying SLM snapshot retention deletion after snapshot operation has completed"); reRun.accept(state); + } else { + observer.waitForNextChange(this); } } catch (Exception e) { exceptionConsumer.accept(e); @@ -464,7 +485,7 @@ public void onClusterServiceClose() { @Override public void onTimeout(TimeValue timeout) { exceptionConsumer.accept( - new IllegalStateException("slm retention snapshot deletion out while waiting for ongoing snapshots to complete")); + new IllegalStateException("slm retention snapshot deletion out while waiting for ongoing snapshot operations to complete")); } } } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java index 8c96055302e5e..972f0d57db453 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java @@ -51,6 +51,7 @@ public void testJobsAreScheduled() { FakeRetentionTask::new, clusterService, clock)) { assertThat(service.getScheduler().jobCount(), equalTo(0)); + service.onMaster(); service.setUpdateSchedule(SnapshotLifecycleServiceTests.randomSchedule()); assertThat(service.getScheduler().scheduledJobIds(), containsInAnyOrder(SnapshotRetentionService.SLM_RETENTION_JOB_ID)); diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java index ce4ea03dd94c5..53c85c5e23027 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java @@ -11,10 +11,18 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.RepositoryCleanupInProgress; +import org.elasticsearch.cluster.RestoreInProgress; +import org.elasticsearch.cluster.SnapshotDeletionsInProgress; +import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.test.ClusterServiceUtils; @@ -57,6 +65,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.not; +import static org.mockito.Mockito.mock; public class SnapshotRetentionTaskTests extends ESTestCase { @@ -317,6 +326,43 @@ private void timeBoundedDeletion(final boolean deletionSuccess) throws Exception } } + public void testOkToDeleteSnapshots() { + final Snapshot snapshot = new Snapshot("repo", new SnapshotId("name", "uuid")); + + SnapshotsInProgress inProgress = new SnapshotsInProgress( + new SnapshotsInProgress.Entry( + snapshot, true, false, SnapshotsInProgress.State.INIT, + Collections.singletonList(new IndexId("name", "id")), 0, 0, + ImmutableOpenMap.builder().build(), Collections.emptyMap())); + ClusterState state = ClusterState.builder(new ClusterName("cluster")) + .putCustom(SnapshotsInProgress.TYPE, inProgress) + .build(); + + assertThat(SnapshotRetentionTask.okayToDeleteSnapshots(state), equalTo(false)); + + SnapshotDeletionsInProgress delInProgress = new SnapshotDeletionsInProgress( + Collections.singletonList(new SnapshotDeletionsInProgress.Entry(snapshot, 0, 0))); + state = ClusterState.builder(new ClusterName("cluster")) + .putCustom(SnapshotDeletionsInProgress.TYPE, delInProgress) + .build(); + + assertThat(SnapshotRetentionTask.okayToDeleteSnapshots(state), equalTo(false)); + + RepositoryCleanupInProgress cleanupInProgress = new RepositoryCleanupInProgress(new RepositoryCleanupInProgress.Entry("repo", 0)); + state = ClusterState.builder(new ClusterName("cluster")) + .putCustom(RepositoryCleanupInProgress.TYPE, cleanupInProgress) + .build(); + + assertThat(SnapshotRetentionTask.okayToDeleteSnapshots(state), equalTo(false)); + + RestoreInProgress restoreInProgress = mock(RestoreInProgress.class); + state = ClusterState.builder(new ClusterName("cluster")) + .putCustom(RestoreInProgress.TYPE, restoreInProgress) + .build(); + + assertThat(SnapshotRetentionTask.okayToDeleteSnapshots(state), equalTo(false)); + } + public void testSkipWhileStopping() throws Exception { doTestSkipDuringMode(OperationMode.STOPPING); }