diff --git a/docs/changelog/79156.yaml b/docs/changelog/79156.yaml new file mode 100644 index 0000000000000..69fa260060814 --- /dev/null +++ b/docs/changelog/79156.yaml @@ -0,0 +1,6 @@ +pr: 79156 +summary: Add snapshots pending deletion in cluster state to delete snapshot once index + is deleted +area: Snapshot/Restore +type: enhancement +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java index 28f797dfa5ec6..d081f80be921c 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java @@ -287,7 +287,7 @@ public void testDeletePreventsClone() throws Exception { ConcurrentSnapshotExecutionException.class, () -> startClone(repoName, sourceSnapshot, targetSnapshot, indexName).actionGet() ); - assertThat(ex.getMessage(), containsString("cannot clone from snapshot that is being deleted")); + assertThat(ex.getMessage(), containsString("cannot clone a snapshot while a snapshot deletion is in-progress")); unblockNode(repoName, masterName); assertAcked(deleteFuture.get()); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java index 32fdb285cb808..6de81eba3acdb 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java @@ -64,6 +64,6 @@ protected void masterOperation( ClusterState state, final ActionListener listener ) { - snapshotsService.deleteSnapshots(request, listener.map(v -> AcknowledgedResponse.TRUE)); + snapshotsService.deleteSnapshotsByName(request, listener.map(v -> AcknowledgedResponse.TRUE)); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 5aa458c0a5ca1..57a08b92091a5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -142,6 +142,12 @@ public static List getNamedWriteables() { RepositoryCleanupInProgress::new, RepositoryCleanupInProgress::readDiffFrom ); + registerClusterCustom( + entries, + SnapshotDeletionsPending.TYPE, + SnapshotDeletionsPending::new, + SnapshotDeletionsPending::readDiffFrom + ); // Metadata registerMetadataCustom(entries, RepositoriesMetadata.TYPE, RepositoriesMetadata::new, RepositoriesMetadata::readDiffFrom); registerMetadataCustom(entries, IngestMetadata.TYPE, IngestMetadata::new, IngestMetadata::readDiffFrom); diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java index 36e97fcf6dd7d..ecbae26179cd9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java @@ -114,6 +114,15 @@ public boolean hasExecutingDeletion(String repository) { return false; } + /** + * Checks if the current {@link SnapshotDeletionsInProgress} contains the given {@link SnapshotId} + * + * @param snapshotId the snapshot id + */ + public boolean contains(SnapshotId snapshotId) { + return getEntries().stream().anyMatch(entry -> entry.getSnapshots().contains(snapshotId)); + } + /** * Returns {@code true} if there are snapshot deletions in progress in the cluster, * returns {@code false} otherwise. diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsPending.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsPending.java new file mode 100644 index 0000000000000..bd7d7d27e4864 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsPending.java @@ -0,0 +1,281 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.cluster; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState.Custom; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.function.Consumer; + +import static java.util.Collections.unmodifiableList; + +/** + * Represents snapshots marked as to be deleted and pending deletion. + * + * Snapshots pending deletion are added to the cluster state when searchable snapshots indices with a specific setting are deleted (see + * MetadataDeleteIndexService#updateSnapshotDeletionsPending()). Because deleting snapshots requires a consistent view of the repository + * they belong to it is not possible to delete searchable snapshots indices and their backing snapshots in the same cluster state update. + * + * Hence we keep in cluster state the snapshot that should be deleted from repositories. To be able to delete them we capture the snapshot + * id, the snapshot name, the repository name and the repository id (if it exists) once, along with the time at which the snapshot was added + * to the pending deletion, in a {@link SnapshotDeletionsPending} entry. + * + * When cluster state is updated with such entries the {@link org.elasticsearch.snapshots.SnapshotsService} executes corresponding snapshot + * delete requests to effectively delete the snapshot from the repository. It is possible that the deletion of a snapshot failed for various + * reason (ex: conflicting snapshot operation, repository removed etc). In such cases the snapshot pending deletion is kept in the cluster + * state and the deletion will be retried on the next cluster state update. To avoid too many snapshots pending deletion stored in cluster + * state the number is limited to 5000 and configurable through the {@link #MAX_PENDING_DELETIONS_SETTING} setting. + */ +public class SnapshotDeletionsPending extends AbstractNamedDiffable implements Custom { + + public static final SnapshotDeletionsPending EMPTY = new SnapshotDeletionsPending(List.of()); + public static final String TYPE = "snapshot_deletions_pending"; + + /** + * Version from which a snapshot can be marked as to be deleted after an index is deleted. + */ + public static final Version SNAPSHOT_DELETIONS_PENDING_VERSION = Version.V_8_2_0; + + /** + * Setting for the maximum number of snapshots pending deletion allowed in the cluster state. + *

+ * This setting is here to prevent the cluster to grow too large. In the case that the number of snapshots pending deletion exceeds + * the value of this setting the oldest entries are removed from the cluster state. Snapshots that are discarded are removed before + * they can be deleted from their repository and are therefore considered as "leaking" and should be logged as such as warnings. + *

+ * This setting is a non-dynamic, node-level only setting that is only used on the elected master node. + */ + public static final Setting MAX_PENDING_DELETIONS_SETTING = Setting.intSetting( + "cluster.snapshot.snapshot_deletions_pending.size", + 5_000, + Setting.Property.NodeScope + ); + + /** + * A list of snapshots to delete, in the order deletions were requested. + */ + private final List entries; + + private SnapshotDeletionsPending(List entries) { + this.entries = unmodifiableList(Objects.requireNonNull(entries)); + } + + public SnapshotDeletionsPending(StreamInput in) throws IOException { + this(in.readList(Entry::new)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeList(entries); + } + + @Override + public String getWriteableName() { + return TYPE; + } + + public boolean isEmpty() { + return entries.isEmpty(); + } + + public boolean contains(SnapshotId snapshotId) { + return entries.stream().anyMatch(entry -> Objects.equals(entry.getSnapshotId(), snapshotId)); + } + + public List entries() { + return entries; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startArray(TYPE); + for (Entry entry : entries) { + entry.toXContent(builder, params); + } + builder.endArray(); + return builder; + } + + @Override + public Version getMinimalSupportedVersion() { + return SNAPSHOT_DELETIONS_PENDING_VERSION; + } + + public SnapshotDeletionsPending withRemovedSnapshots(List snapshotIds) { + if (snapshotIds == null || snapshotIds.isEmpty()) { + return this; + } + boolean changed = false; + final List updatedEntries = new ArrayList<>(); + final Set removedSnapshotIds = new HashSet<>(snapshotIds); + for (Entry entry : entries) { + if (removedSnapshotIds.contains(entry.snapshotId)) { + changed = true; + continue; + } + updatedEntries.add(entry); + } + if (changed == false) { + return this; + } else if (updatedEntries.isEmpty()) { + return EMPTY; + } else { + return new SnapshotDeletionsPending(updatedEntries); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SnapshotDeletionsPending that = (SnapshotDeletionsPending) o; + return Objects.equals(entries, that.entries); + } + + @Override + public int hashCode() { + return Objects.hash(entries); + } + + @Override + public String toString() { + return "SnapshotDeletionsPending[" + entries + ']'; + } + + public static class Entry implements Writeable, ToXContentObject { + + private final String repositoryName; + private final String repositoryUuid; + private final SnapshotId snapshotId; + private final long indexDeletionTime; + + public Entry(String repositoryName, String repositoryUuid, SnapshotId snapshotId, long indexDeletionTime) { + this.repositoryName = Objects.requireNonNull(repositoryName); + this.repositoryUuid = Objects.requireNonNullElse(repositoryUuid, RepositoryData.MISSING_UUID); + this.snapshotId = Objects.requireNonNull(snapshotId); + this.indexDeletionTime = indexDeletionTime; + } + + private Entry(StreamInput in) throws IOException { + this.repositoryName = in.readString(); + this.repositoryUuid = in.readString(); + this.snapshotId = new SnapshotId(in); + this.indexDeletionTime = in.readVLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(repositoryName); + out.writeString(repositoryUuid); + snapshotId.writeTo(out); + out.writeVLong(indexDeletionTime); + } + + public String getRepositoryName() { + return repositoryName; + } + + public String getRepositoryUuid() { + return repositoryUuid; + } + + public SnapshotId getSnapshotId() { + return snapshotId; + } + + public long getIndexDeletionTime() { + return indexDeletionTime; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Entry entry = (Entry) o; + return indexDeletionTime == entry.indexDeletionTime + && Objects.equals(repositoryName, entry.repositoryName) + && Objects.equals(repositoryUuid, entry.repositoryUuid) + && Objects.equals(snapshotId, entry.snapshotId); + } + + @Override + public int hashCode() { + return Objects.hash(repositoryName, repositoryUuid, snapshotId, indexDeletionTime); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + { + builder.field("repository_name", repositoryName); + builder.field("repository_uuid", repositoryUuid); + builder.timeField("creation_time_millis", "creation_time", indexDeletionTime); + builder.field("snapshot", snapshotId); + } + builder.endObject(); + return builder; + } + + @Override + public String toString() { + return '[' + repositoryName + '/' + repositoryUuid + ',' + snapshotId + ',' + indexDeletionTime + ']'; + } + } + + public static final class Builder { + + private final List entries; + private final Consumer consumer; + + public Builder(SnapshotDeletionsPending snapshotDeletionsPending, Consumer onLimitExceeded) { + this.entries = new ArrayList<>(snapshotDeletionsPending.entries); + this.consumer = onLimitExceeded; + } + + private void ensureLimit(final int maxPendingDeletions) { + while (entries.size() >= maxPendingDeletions) { + final Entry removed = entries.remove(0); + if (consumer != null) { + consumer.accept(removed); + } + } + } + + public Builder add(String repositoryName, String repositoryUuid, SnapshotId snapshotId, long creationTime) { + entries.add(new Entry(repositoryName, repositoryUuid, snapshotId, creationTime)); + return this; + } + + public SnapshotDeletionsPending build(Settings settings) { + final int maxPendingDeletions = MAX_PENDING_DELETIONS_SETTING.get(settings); + ensureLimit(maxPendingDeletions); + assert entries.size() <= maxPendingDeletions : entries.size() + " > " + maxPendingDeletions; + return entries.isEmpty() == false ? new SnapshotDeletionsPending(entries) : EMPTY; + } + } + + public static NamedDiff readDiffFrom(StreamInput in) throws IOException { + return readDiffFrom(Custom.class, TYPE, in); + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java index 87e04b8d1bb94..8fbc4979d1189 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.delete.DeleteIndexClusterStateUpdateRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -18,6 +19,7 @@ import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.RestoreInProgress; +import org.elasticsearch.cluster.SnapshotDeletionsPending; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; @@ -28,16 +30,31 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; import org.elasticsearch.snapshots.RestoreService; +import org.elasticsearch.snapshots.SearchableSnapshotsSettings; +import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInProgressException; import org.elasticsearch.snapshots.SnapshotsService; +import java.time.Instant; +import java.time.ZoneOffset; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; + +import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION; +import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY; +import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_REPOSITORY_UUID_SETTING_KEY; +import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_SNAPSHOT_NAME_SETTING_KEY; +import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_SNAPSHOT_UUID_SETTING_KEY; +import static org.elasticsearch.snapshots.SnapshotsService.findRepositoryForPendingDeletion; /** * Deletes indices. @@ -48,7 +65,6 @@ public class MetadataDeleteIndexService { private final Settings settings; private final ClusterService clusterService; - private final AllocationService allocationService; @Inject @@ -143,27 +159,135 @@ public ClusterState deleteIndices(ClusterState currentState, Set indices) currentGraveyard.getTombstones().size() ); - Metadata newMetadata = metadataBuilder.build(); - ClusterBlocks blocks = clusterBlocksBuilder.build(); + final ClusterState.Builder builder = ClusterState.builder(currentState) + .routingTable(routingTableBuilder.build()) + .blocks(clusterBlocksBuilder.build()) + .metadata(metadataBuilder.build()); + + ImmutableOpenMap.Builder customBuilder = null; // update snapshot restore entries - ImmutableOpenMap customs = currentState.getCustoms(); final RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY); RestoreInProgress updatedRestoreInProgress = RestoreService.updateRestoreStateWithDeletedIndices(restoreInProgress, indices); if (updatedRestoreInProgress != restoreInProgress) { - ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(customs); - builder.put(RestoreInProgress.TYPE, updatedRestoreInProgress); - customs = builder.build(); + customBuilder = ImmutableOpenMap.builder(currentState.getCustoms()); + customBuilder.put(RestoreInProgress.TYPE, updatedRestoreInProgress); + } + + // update snapshot(s) marked as to delete + final SnapshotDeletionsPending deletionsInPending = currentState.custom( + SnapshotDeletionsPending.TYPE, + SnapshotDeletionsPending.EMPTY + ); + final SnapshotDeletionsPending updatedPendingDeletes = updateSnapshotDeletionsPending( + deletionsInPending, + indicesToDelete, + currentState + ); + if (updatedPendingDeletes != deletionsInPending) { + if (customBuilder == null) { + customBuilder = ImmutableOpenMap.builder(currentState.getCustoms()); + } + customBuilder.put(SnapshotDeletionsPending.TYPE, updatedPendingDeletes); + } + if (customBuilder != null) { + builder.customs(customBuilder.build()); + } + return allocationService.reroute(builder.build(), "deleted indices [" + indices + "]"); + } + + /** + * This method updates the list of snapshots marked as to be deleted if one or more searchable snapshots are deleted. + * + * The snapshots cannot be deleted at the same time of the searchable snapshots indices because deleting one or more snapshot requires a + * consistent view of their repositories data, and getting the consistent views cannot be done in the same cluster state update. It is + * also possible than one (or more) snapshot cannot be deleted immediately because the snapshot is involved in another restore or + * cloning or the repository might not be writeable etc. To address those conflicting situations this method only captures the snapshot + * information that are required to later delete the snapshot and stores them in a {@link SnapshotDeletionsPending.Entry} in cluster + * state. Once a snapshot is pending deletion it cannot be restored, mounted or cloned. If the snapshot pending deletion is involved in + * a snapshot operation at the time it is deleted then the deletion will happen once the conflicting operation is terminated. + */ + private SnapshotDeletionsPending updateSnapshotDeletionsPending( + final SnapshotDeletionsPending pendingDeletions, + final Set indicesToDelete, + final ClusterState state + ) { + final List deletedIndicesSettings = indicesToDelete.stream() + .map(index -> state.metadata().getIndexSafe(index)) + .filter(IndexMetadata::isSearchableSnapshot) + .map(IndexMetadata::getSettings) + .filter(indexSettings -> indexSettings.getAsBoolean(SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION, false)) + .collect(Collectors.toList()); + if (deletedIndicesSettings.isEmpty()) { + return pendingDeletions; } - return allocationService.reroute( - ClusterState.builder(currentState) - .routingTable(routingTableBuilder.build()) - .metadata(newMetadata) - .blocks(blocks) - .customs(customs) - .build(), - "deleted indices [" + indices + "]" + final Set activeSearchableSnapshots = state.metadata() + .indices() + .values() + .stream() + .filter(index -> index.isSearchableSnapshot() && indicesToDelete.contains(index.getIndex()) == false) + .map(index -> MetadataDeleteIndexService.toSnapshotId(index.getSettings())) + .collect(Collectors.toUnmodifiableSet()); + + final RepositoriesMetadata repositories = state.metadata().custom(RepositoriesMetadata.TYPE); + // also used to deduplicate snapshots that were used by multiple deleted indices + final Map> snapshotsWithRepository = new HashMap<>(); + // also used to log a warning for snapshots with unknown repository + final Map> snapshotsWithoutRepository = new HashMap<>(); + + for (Settings deletedIndexSettings : deletedIndicesSettings) { + SnapshotId snapshotId = toSnapshotId(deletedIndexSettings); + if (activeSearchableSnapshots.contains(snapshotId) == false) { + String repositoryUuid = deletedIndexSettings.get(SEARCHABLE_SNAPSHOTS_REPOSITORY_UUID_SETTING_KEY); + String repositoryName = deletedIndexSettings.get(SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY); + Optional repository = findRepositoryForPendingDeletion(repositories, repositoryName, repositoryUuid); + if (repository.isPresent()) { + snapshotsWithRepository.putIfAbsent(snapshotId, Tuple.tuple(repositoryName, repositoryUuid)); + } else { + snapshotsWithoutRepository.putIfAbsent(snapshotId, Tuple.tuple(repositoryName, repositoryUuid)); + } + } + } + + final int maxPendingDeletions = SnapshotDeletionsPending.MAX_PENDING_DELETIONS_SETTING.get(settings); + final SnapshotDeletionsPending.Builder builder = new SnapshotDeletionsPending.Builder( + pendingDeletions, + evicted -> logger.warn( + () -> new ParameterizedMessage( + "maximum number of snapshots [{}] awaiting deletion has been reached in " + + "cluster state before snapshot [{}] deleted on [{}] in repository [{}/{}] could be deleted", + maxPendingDeletions, + evicted.getSnapshotId(), + Instant.ofEpochMilli(evicted.getIndexDeletionTime()).atZone(ZoneOffset.UTC), + evicted.getRepositoryName(), + evicted.getRepositoryUuid() + ) + ) + ); + + final long timestamp = Instant.now().toEpochMilli(); + for (Map.Entry> entry : snapshotsWithRepository.entrySet()) { + logger.debug("snapshot [{}:{}] added to the list of snapshots pending deletion", entry.getValue().v1(), entry.getKey()); + builder.add(entry.getValue().v1(), entry.getValue().v2(), entry.getKey(), timestamp); + } + for (Map.Entry> entry : snapshotsWithoutRepository.entrySet()) { + logger.warn( + "snapshot [{}] added to the list of snapshots pending deletion but refers to an unregistered repository [{}/{}]", + entry.getKey(), + entry.getValue().v1(), + entry.getValue().v2() + ); + builder.add(entry.getValue().v1(), entry.getValue().v2(), entry.getKey(), timestamp); + } + return builder.build(settings); + } + + private static SnapshotId toSnapshotId(final Settings indexSettings) { + assert SearchableSnapshotsSettings.isSearchableSnapshotStore(indexSettings); + return new SnapshotId( + indexSettings.get(SEARCHABLE_SNAPSHOTS_SNAPSHOT_NAME_SETTING_KEY), + indexSettings.get(SEARCHABLE_SNAPSHOTS_SNAPSHOT_UUID_SETTING_KEY) ); } } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index f82bb516d14d3..577c2f40015eb 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -100,6 +100,7 @@ import org.elasticsearch.search.fetch.subphase.highlight.FastVectorHighlighter; import org.elasticsearch.snapshots.InternalSnapshotsInfoService; import org.elasticsearch.snapshots.RestoreService; +import org.elasticsearch.snapshots.SnapshotDeletionsPendingService; import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ProxyConnectionStrategy; @@ -503,6 +504,8 @@ public void apply(Settings value, Settings current, Settings previous) { HandshakingTransportAddressConnector.PROBE_CONNECT_TIMEOUT_SETTING, HandshakingTransportAddressConnector.PROBE_HANDSHAKE_TIMEOUT_SETTING, SnapshotsService.MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING, + SnapshotDeletionsPendingService.PENDING_SNAPSHOT_DELETIONS_RETRY_INTERVAL_SETTING, + SnapshotDeletionsPendingService.PENDING_SNAPSHOT_DELETIONS_EXPIRATION_INTERVAL_SETTING, RestoreService.REFRESH_REPO_UUID_ON_RESTORE_SETTING, FsHealthService.ENABLED_SETTING, FsHealthService.REFRESH_INTERVAL_SETTING, diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index 76079c4eee61c..182ad90c54de2 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -365,6 +365,8 @@ public ClusterState execute(ClusterState currentState) { if (Regex.simpleMatch(request.name(), repositoryMetadata.name())) { ensureRepositoryNotInUse(currentState, repositoryMetadata.name()); ensureNoSearchableSnapshotsIndicesInUse(currentState, repositoryMetadata); + // TODO should we prevent the deletion of repositories that have snapshots pending deletions or should + // we just log a warning and rely on SnapshotDeletionsPending to catch up if the repo comes back deletedRepositories.add(repositoryMetadata.name()); changed = true; } else { diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 0a238e4f03b0a..e68dacd9acdbd 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -2531,7 +2531,7 @@ private ClusterState updateRepositoryGenerationsIfNecessary(ClusterState state, } } updatedDeletionsInProgress = changedDeletions ? SnapshotDeletionsInProgress.of(deletionEntries) : null; - return SnapshotsService.updateWithSnapshots(state, updatedSnapshotsInProgress, updatedDeletionsInProgress); + return SnapshotsService.updateWithSnapshots(state, updatedSnapshotsInProgress, updatedDeletionsInProgress, null); } private RepositoryMetadata getRepoMetadata(ClusterState state) { diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 906c280a27818..23d3bfa71dc0f 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -23,7 +23,6 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.RestoreInProgress.ShardRestoreStatus; -import org.elasticsearch.cluster.SnapshotDeletionsInProgress; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.AliasMetadata; import org.elasticsearch.cluster.metadata.DataStream; @@ -100,11 +99,11 @@ import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED; import static org.elasticsearch.common.util.set.Sets.newHashSet; -import static org.elasticsearch.index.IndexModule.INDEX_STORE_TYPE_SETTING; import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION; import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY; import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_REPOSITORY_UUID_SETTING_KEY; import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_SNAPSHOT_UUID_SETTING_KEY; +import static org.elasticsearch.snapshots.SnapshotUtils.ensureSnapshotNotDeletedOrPendingDeletion; import static org.elasticsearch.snapshots.SnapshotUtils.filterIndices; import static org.elasticsearch.snapshots.SnapshotsService.NO_FEATURE_STATES_VALUE; @@ -1117,7 +1116,7 @@ private static IndexMetadata updateIndexSettings( "cannot disable setting [" + IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey() + "] on restore" ); } - if ("snapshot".equals(INDEX_STORE_TYPE_SETTING.get(settings))) { + if (SearchableSnapshotsSettings.isSearchableSnapshotStore(settings)) { final Boolean changed = changeSettings.getAsBoolean(SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION, null); if (changed != null) { final Boolean previous = settings.getAsBoolean(SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION, null); @@ -1251,7 +1250,7 @@ private final class RestoreSnapshotStateTask extends ClusterStateUpdateTask { @Override public ClusterState execute(ClusterState currentState) { // Check if the snapshot to restore is currently being deleted - ensureSnapshotNotDeleted(currentState); + ensureSnapshotNotDeletedOrPendingDeletion(currentState, snapshot.getRepository(), snapshot.getSnapshotId(), "restore"); // Clear out all existing indices which fall within a system index pattern being restored currentState = metadataDeleteIndexService.deleteIndices( @@ -1359,7 +1358,7 @@ && isSystemIndex(snapshotIndexMetadata) == false) { ); } - if ("snapshot".equals(INDEX_STORE_TYPE_SETTING.get(updatedIndexMetadata.getSettings()))) { + if (SearchableSnapshotsSettings.isSearchableSnapshotStore(updatedIndexMetadata.getSettings())) { searchableSnapshotsIndices.add(updatedIndexMetadata.getIndex()); } } @@ -1424,19 +1423,6 @@ private void applyDataStreamRestores(ClusterState currentState, Metadata.Builder mdBuilder.dataStreams(updatedDataStreams, updatedDataStreamAliases); } - private void ensureSnapshotNotDeleted(ClusterState currentState) { - SnapshotDeletionsInProgress deletionsInProgress = currentState.custom( - SnapshotDeletionsInProgress.TYPE, - SnapshotDeletionsInProgress.EMPTY - ); - if (deletionsInProgress.getEntries().stream().anyMatch(entry -> entry.getSnapshots().contains(snapshot.getSnapshotId()))) { - throw new ConcurrentSnapshotExecutionException( - snapshot, - "cannot restore a snapshot while a snapshot deletion is in-progress [" + deletionsInProgress.getEntries().get(0) + "]" - ); - } - } - private void applyGlobalStateRestore(ClusterState currentState, Metadata.Builder mdBuilder) { if (metadata.persistentSettings() != null) { Settings settings = metadata.persistentSettings(); @@ -1660,7 +1646,7 @@ private static void ensureSearchableSnapshotsRestorable( final Metadata metadata = currentState.metadata(); for (Index index : indices) { final Settings indexSettings = metadata.getIndexSafe(index).getSettings(); - assert "snapshot".equals(INDEX_STORE_TYPE_SETTING.get(indexSettings)) : "not a snapshot backed index: " + index; + assert SearchableSnapshotsSettings.isSearchableSnapshotStore(indexSettings) : "not a snapshot backed index: " + index; final String repositoryUuid = indexSettings.get(SEARCHABLE_SNAPSHOTS_REPOSITORY_UUID_SETTING_KEY); final String repositoryName = indexSettings.get(SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY); @@ -1688,10 +1674,10 @@ private static void ensureSearchableSnapshotsRestorable( if (other.getIndex().equals(index)) { continue; // do not check the searchable snapshot index against itself } - final Settings otherSettings = other.getSettings(); - if ("snapshot".equals(INDEX_STORE_TYPE_SETTING.get(otherSettings)) == false) { + if (other.isSearchableSnapshot() == false) { continue; // other index is not a searchable snapshot index, skip } + final Settings otherSettings = other.getSettings(); final String otherSnapshotUuid = otherSettings.get(SEARCHABLE_SNAPSHOTS_SNAPSHOT_UUID_SETTING_KEY); if (Objects.equals(snapshotUuid, otherSnapshotUuid) == false) { continue; // other index is backed by a different snapshot, skip diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotDeletionsPendingService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotDeletionsPendingService.java new file mode 100644 index 0000000000000..105d88b68e4c1 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotDeletionsPendingService.java @@ -0,0 +1,493 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.snapshots; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskConfig; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.ClusterStateTaskListener; +import org.elasticsearch.cluster.RepositoryCleanupInProgress; +import org.elasticsearch.cluster.RestoreInProgress; +import org.elasticsearch.cluster.SnapshotDeletionsPending; +import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.cluster.metadata.RepositoriesMetadata; +import org.elasticsearch.cluster.metadata.RepositoryMetadata; +import org.elasticsearch.cluster.service.ClusterApplierService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.threadpool.ThreadPool; + +import java.time.Instant; +import java.time.ZoneOffset; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.READONLY_SETTING_KEY; +import static org.elasticsearch.snapshots.SnapshotUtils.cloneSources; +import static org.elasticsearch.snapshots.SnapshotUtils.deletionsSources; +import static org.elasticsearch.snapshots.SnapshotUtils.restoreSources; +import static org.elasticsearch.snapshots.SnapshotsService.findRepositoryForPendingDeletion; + +public class SnapshotDeletionsPendingService { + + private static final Logger logger = LogManager.getLogger(SnapshotDeletionsPendingService.class); + + public static final Setting PENDING_SNAPSHOT_DELETIONS_RETRY_INTERVAL_SETTING = Setting.timeSetting( + "snapshot.snapshot_deletions_pending.retry_interval", + TimeValue.timeValueSeconds(30L), + TimeValue.ZERO, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + public static final Setting PENDING_SNAPSHOT_DELETIONS_EXPIRATION_INTERVAL_SETTING = Setting.timeSetting( + "snapshot.snapshot_deletions_pending.expiration_interval", + TimeValue.timeValueHours(12L), + TimeValue.ZERO, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** + * Set of pending snapshots deletions whose deletion is already triggered + */ + private final Set triggered = ConcurrentCollections.newConcurrentSet(); + + /** + * Map of pending snapshots deletions whose deletion is conflicting with on-going restores/clones/repository clean up or repository + * missing or read-only. Those sets are used to identify the cluster state updates to process in case they resolve some conflict. + */ + private final Map conflicting = new HashMap<>(); + + enum ConflictType { + RESTORING, + CLONING, + REPO_MISSING, + REPO_READONLY, + REPO_CLEANUP + } + + private final SnapshotsService snapshotsService; + private final ClusterService clusterService; + private final ThreadPool threadPool; + private final ClusterStateTaskExecutor removePendingDeletionsExecutor; + + private volatile TimeValue pendingDeletionsRetryInterval; + private volatile TimeValue pendingDeletionsExpirationInterval; + + SnapshotDeletionsPendingService( + SnapshotsService snapshotsService, + ClusterService clusterService, + ThreadPool threadPool, + Settings settings + ) { + this.snapshotsService = Objects.requireNonNull(snapshotsService); + this.clusterService = Objects.requireNonNull(clusterService); + this.threadPool = Objects.requireNonNull(threadPool); + this.removePendingDeletionsExecutor = new RemoveSnapshotDeletionsPendingExecutor(); + pendingDeletionsRetryInterval = PENDING_SNAPSHOT_DELETIONS_RETRY_INTERVAL_SETTING.get(settings); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(PENDING_SNAPSHOT_DELETIONS_RETRY_INTERVAL_SETTING, t -> pendingDeletionsRetryInterval = t); + pendingDeletionsExpirationInterval = PENDING_SNAPSHOT_DELETIONS_EXPIRATION_INTERVAL_SETTING.get(settings); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(PENDING_SNAPSHOT_DELETIONS_EXPIRATION_INTERVAL_SETTING, t -> pendingDeletionsExpirationInterval = t); + } + + /** + * Find snapshots to delete in the the cluster state and triggers explicit snapshot delete requests. This method attempts to detect + * conflicting situations where triggering the snapshot deletion would likely fail due to a concurrent snapshot operation. In such + * cases the snapshot deletion is not triggered as it should be triggered by subsequent cluster state updates once the conflicting + * situation is resolved. + * + * The repository name and uuid information are extracted from the {@link SnapshotDeletionsPending} entries in order to find the + * repository to execute the snapshot delete request against. If the repo uuid was known at the time the snapshot was added to + * {@link SnapshotDeletionsPending} we try to find the corresponding repository, or a repository with a missing uuid but the same + * name. If the repo uuid was not known at the time the snapshot was added to {@link SnapshotDeletionsPending}, we try to find a + * repository with the same name. + * + * @param state the current {@link ClusterState} + * @param previousState the previous {@link ClusterState} + */ + public synchronized void processPendingDeletions(ClusterState state, ClusterState previousState) { + assert Thread.currentThread().getName().contains('[' + ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME + ']') + || Thread.currentThread().getName().startsWith("TEST-") : Thread.currentThread().getName(); + + final SnapshotDeletionsPending snapshotDeletionsPending = state.custom(SnapshotDeletionsPending.TYPE); + if (state.nodes().isLocalNodeElectedMaster() == false || snapshotDeletionsPending == null || snapshotDeletionsPending.isEmpty()) { + clearConflicts(); + return; + } + + if (pendingDeletionsChanged(state, previousState) || pendingDeletionsWithConflictsChanged(state, previousState)) { + final RepositoriesMetadata repositories = state.metadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY); + final RepositoryCleanupInProgress cleanUps = state.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY); + + final Set currentDeletions = deletionsSources(state); + final Set currentRestores = restoreSources(state); + final Set currentClones = cloneSources(state); + + // the snapshots to trigger deletion for, per repository + final Map> snapshotsToDelete = new HashMap<>(); + + for (SnapshotDeletionsPending.Entry snapshot : snapshotDeletionsPending.entries()) { + final SnapshotId snapshotId = snapshot.getSnapshotId(); + + if (currentRestores.contains(snapshotId)) { + logger.trace("snapshot to delete [{}] is being restored, waiting for restore to complete", snapshotId); + conflicting.put(snapshotId, ConflictType.RESTORING); + continue; + } + if (currentClones.contains(snapshotId)) { + logger.trace("snapshot to delete [{}] is being cloned, waiting for cloning to complete", snapshotId); + conflicting.put(snapshotId, ConflictType.CLONING); + continue; + } + if (cleanUps.hasCleanupInProgress()) { + if (conflicting.put(snapshotId, ConflictType.REPO_CLEANUP) != ConflictType.REPO_CLEANUP) { + logger.debug( + "a repository clean-up is in progress, cannot delete pending snapshot [{}] created at {}", + snapshotId, + Instant.ofEpochMilli(snapshot.getIndexDeletionTime()).atZone(ZoneOffset.UTC) + ); + } + continue; + } + if (currentDeletions.contains(snapshotId)) { + logger.trace("snapshot to delete [{}] is already queued", snapshotId); + conflicting.remove(snapshotId); + continue; + } + + final Optional optionalRepository = findRepositoryForPendingDeletion( + repositories, + snapshot.getRepositoryName(), + snapshot.getRepositoryUuid() + ); + if (optionalRepository.isEmpty()) { + if (conflicting.put(snapshotId, ConflictType.REPO_MISSING) != ConflictType.REPO_MISSING) { + logger.debug( + "repository [{}/{}] not found, cannot delete pending snapshot [{}] created at {}", + snapshot.getRepositoryName(), + snapshot.getRepositoryUuid(), + snapshotId, + Instant.ofEpochMilli(snapshot.getIndexDeletionTime()).atZone(ZoneOffset.UTC) + ); + } + continue; + } + final RepositoryMetadata repository = optionalRepository.get(); + if (repository.settings().getAsBoolean(READONLY_SETTING_KEY, false)) { + if (conflicting.put(snapshotId, ConflictType.REPO_READONLY) != ConflictType.REPO_READONLY) { + logger.debug( + "repository [{}/{}] is read-only, cannot delete pending snapshot [{}] created at {}", + repository.name(), + repository.uuid(), + snapshotId, + Instant.ofEpochMilli(snapshot.getIndexDeletionTime()).atZone(ZoneOffset.UTC) + ); + } + continue; + } + conflicting.remove(snapshotId); + + if (triggered.add(snapshotId)) { + logger.info("triggering snapshot deletion for [{}]", snapshotId); + final Long previous = snapshotsToDelete.computeIfAbsent(repository, r -> new HashMap<>()) + .put(snapshotId, snapshot.getIndexDeletionTime()); + assert previous == null : snapshotId; + } + } + + assert snapshotDeletionsPending.entries() + .stream() + .map(SnapshotDeletionsPending.Entry::getSnapshotId) + .allMatch(snapId -> triggered.contains(snapId) || conflicting.containsKey(snapId) || currentDeletions.contains(snapId)); + + snapshotsToDelete.forEach( + (repo, snapshots) -> threadPool.generic().execute(new SnapshotsToDeleteRunnable(repo.name(), repo.uuid(), snapshots)) + ); + } + assert Sets.intersection(triggered, conflicting.keySet()).isEmpty() + : "pending snapshot deletion cannot be both triggered and in conflict: " + triggered + " vs " + conflicting.keySet(); + assert conflicting.keySet().stream().allMatch(snapshotDeletionsPending::contains); + } + + // only used in tests + boolean isTriggered(SnapshotId snapshotId) { + return triggered.contains(snapshotId); + } + + // only used in tests + synchronized ConflictType getConflict(SnapshotId snapshotId) { + return conflicting.get(snapshotId); + } + + synchronized void clearConflicts() { + if (conflicting.isEmpty() == false) { + conflicting.clear(); + } + } + + // package-private for testing + synchronized boolean pendingDeletionsChanged(ClusterState state, ClusterState previousState) { + SnapshotDeletionsPending previous = previousState.custom(SnapshotDeletionsPending.TYPE, SnapshotDeletionsPending.EMPTY); + SnapshotDeletionsPending current = state.custom(SnapshotDeletionsPending.TYPE, SnapshotDeletionsPending.EMPTY); + return Objects.equals(previous, current) == false; + } + + // package-private for testing + synchronized boolean pendingDeletionsWithConflictsChanged(ClusterState state, ClusterState previousState) { + if (conflicting.values().stream().anyMatch(c -> c == ConflictType.RESTORING)) { + RestoreInProgress previous = previousState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY); + RestoreInProgress current = state.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY); + if (Objects.equals(previous, current) == false) { + return true; + } + } + if (conflicting.values().stream().anyMatch(c -> c == ConflictType.CLONING)) { + Set previous = previousState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY) + .asStream() + .filter(SnapshotsInProgress.Entry::isClone) + .collect(Collectors.toSet()); + Set current = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY) + .asStream() + .filter(SnapshotsInProgress.Entry::isClone) + .collect(Collectors.toSet()); + if (Objects.equals(previous, current) == false) { + return true; + } + } + if (conflicting.values().stream().anyMatch(c -> c == ConflictType.REPO_MISSING || c == ConflictType.REPO_READONLY)) { + RepositoriesMetadata previous = previousState.metadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY); + RepositoriesMetadata current = state.metadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY); + if (previous.equals(current) == false) { + return true; + } + } + if (conflicting.values().stream().anyMatch(c -> c == ConflictType.REPO_CLEANUP)) { + boolean previousCleanUp = previousState.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY) + .hasCleanupInProgress(); + boolean currentCleanUp = state.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY) + .hasCleanupInProgress(); + return previousCleanUp != currentCleanUp; + } + return false; + } + + private boolean isExpiredPendingDeletion(long deletionTimeEpochMillis) { + return Instant.ofEpochMilli(deletionTimeEpochMillis) + .plusMillis(pendingDeletionsExpirationInterval.getMillis()) + .isBefore(Instant.now()); + } + + /** + * A {@link Runnable} used to process the deletion of snapshots marked as to delete for a given repository. + */ + private class SnapshotsToDeleteRunnable extends AbstractRunnable { + + private final Map snapshots; + private final String repositoryName; + private final String repositoryUuid; + private final boolean missingUuid; + + SnapshotsToDeleteRunnable(String repositoryName, String repositoryUuid, Map snapshots) { + this.repositoryName = Objects.requireNonNull(repositoryName); + this.repositoryUuid = Objects.requireNonNull(repositoryUuid); + this.snapshots = Objects.requireNonNull(snapshots); + this.missingUuid = RepositoryData.MISSING_UUID.equals(repositoryUuid); + } + + @Override + protected void doRun() throws Exception { + final Set pendingDeletionsToRemove = ConcurrentCollections.newConcurrentSet(); + final CountDown countDown = new CountDown(snapshots.size()); + + for (Map.Entry snapshot : snapshots.entrySet()) { + final SnapshotId snapshotId = snapshot.getKey(); + final ActionListener listener = new ActionListener() { + @Override + public void onResponse(Void unused) { + logger.debug( + "snapshot marked as to delete [{}] successfully deleted from repository [{}/{}]", + snapshotId, + repositoryName, + repositoryUuid + ); + pendingDeletionsToRemove.add(snapshotId); + finish(); + } + + @Override + public void onFailure(Exception e) { + if (e instanceof SnapshotMissingException && missingUuid == false) { + pendingDeletionsToRemove.add(snapshotId); + logger.debug( + () -> new ParameterizedMessage( + "snapshot marked as to delete [{}] is missing in repository [{}/{}], removing from pending deletions", + snapshotId, + repositoryName, + repositoryUuid + ), + e + ); + } else if (isExpiredPendingDeletion(snapshot.getValue())) { + pendingDeletionsToRemove.add(snapshotId); + logger.warn( + () -> new ParameterizedMessage( + "snapshot marked as to delete [{}] failed to be deleted within [{}]. The pending snapshot " + + "expired before the snapshot could be deleted from the repository and as such might still " + + "exist in the original repository [{}/{}]. This snapshot will now be removed from the list of " + + "pending deletions.", + snapshotId, + pendingDeletionsExpirationInterval, + repositoryName, + repositoryUuid + ), + e + ); + } else { + logger.debug( + () -> new ParameterizedMessage( + "[{}/{}] attempt to delete snapshot marked as to delete [{}] failed; deletion will be retried in [{}]", + repositoryName, + repositoryUuid, + snapshotId, + pendingDeletionsRetryInterval + ), + e + ); + } + finish(); + } + + void finish() { + if (countDown.countDown()) { + final Map retryables = snapshots.entrySet() + .stream() + .filter(snap -> pendingDeletionsToRemove.contains(snap.getKey()) == false) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + if (retryables.isEmpty() == false) { + // TODO maybe re-resolve repository here if the uuid is missing? + threadPool.scheduleUnlessShuttingDown( + pendingDeletionsRetryInterval, + ThreadPool.Names.GENERIC, + new SnapshotsToDeleteRunnable(repositoryName, repositoryUuid, retryables) + ); + } + if (pendingDeletionsToRemove.isEmpty() == false) { + clusterService.submitStateUpdateTask( + "remove-snapshot-deletions-in-pending", + new RemoveSnapshotDeletionsPendingTask(List.copyOf(pendingDeletionsToRemove)), + ClusterStateTaskConfig.build(Priority.NORMAL), + removePendingDeletionsExecutor + ); + } + } + } + }; + + try { + snapshotsService.deleteSnapshotsByUuid(repositoryName, new String[] { snapshotId.getUUID() }, listener); + } catch (Exception e) { + logger.warn( + () -> new ParameterizedMessage("[{}] failed to trigger deletion of snapshot [{}]", repositoryName, snapshotId), + e + ); + listener.onFailure(e); + } + } + } + + @Override + public void onFailure(Exception e) { + triggered.removeAll(snapshots.keySet()); + logger.warn( + () -> new ParameterizedMessage("[{}] failed to trigger deletion of snapshots {}", repositoryName, snapshots.keySet()), + e + ); + } + } + + private class RemoveSnapshotDeletionsPendingExecutor implements ClusterStateTaskExecutor { + + @Override + public ClusterState execute(ClusterState currentState, List> taskContexts) + throws Exception { + ClusterState state = currentState; + for (final var taskContext : taskContexts) { + final var task = taskContext.getTask(); + try { + SnapshotDeletionsPending currentPendings = state.custom(SnapshotDeletionsPending.TYPE, SnapshotDeletionsPending.EMPTY); + SnapshotDeletionsPending updatedPendings = currentPendings.withRemovedSnapshots(task.pendingDeletionsToRemove); + if (currentPendings != updatedPendings) { + state = ClusterState.builder(state).putCustom(SnapshotDeletionsPending.TYPE, updatedPendings).build(); + } + taskContext.success(task.newPublicationListener()); + } catch (Exception e) { + taskContext.onFailure(e); + } + } + return state; + } + } + + private class RemoveSnapshotDeletionsPendingTask implements ClusterStateTaskListener { + + private final List pendingDeletionsToRemove; + private final AtomicBoolean removed; + + private RemoveSnapshotDeletionsPendingTask(List pendingDeletionsToRemove) { + this.pendingDeletionsToRemove = pendingDeletionsToRemove; + this.removed = new AtomicBoolean(); + } + + private void removeAllPendingDeletions() { + if (removed.compareAndSet(false, true)) { + pendingDeletionsToRemove.forEach(triggered::remove); + } + } + + @Override + public void onFailure(Exception e) { + removeAllPendingDeletions(); + } + + ActionListener newPublicationListener() { + return new ActionListener<>() { + @Override + public void onResponse(ClusterState clusterState) { + removeAllPendingDeletions(); + } + + @Override + public void onFailure(Exception e) { + removeAllPendingDeletions(); + } + }; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotUtils.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotUtils.java index ab5f1f4ea9f26..8e6141770e997 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotUtils.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotUtils.java @@ -8,6 +8,11 @@ package org.elasticsearch.snapshots; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.RestoreInProgress; +import org.elasticsearch.cluster.SnapshotDeletionsInProgress; +import org.elasticsearch.cluster.SnapshotDeletionsPending; +import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.index.IndexNotFoundException; @@ -16,6 +21,8 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; /** * Snapshot utilities @@ -108,4 +115,54 @@ public static List filterIndices(List availableIndices, String[] return List.copyOf(result); } + static Set cloneSources(final ClusterState state) { + return state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY) + .asStream() + .filter(SnapshotsInProgress.Entry::isClone) + .map(SnapshotsInProgress.Entry::source) + .collect(Collectors.toUnmodifiableSet()); + } + + static Set restoreSources(final ClusterState state) { + return StreamSupport.stream(state.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY).spliterator(), false) + .map(restore -> restore.snapshot().getSnapshotId()) + .collect(Collectors.toUnmodifiableSet()); + } + + static Set deletionsSources(final ClusterState state) { + return state.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY) + .getEntries() + .stream() + .flatMap(deletion -> deletion.getSnapshots().stream()) + .collect(Collectors.toUnmodifiableSet()); + } + + static void ensureSnapshotNotDeletedOrPendingDeletion( + final ClusterState currentState, + final String repositoryName, + final SnapshotId snapshotId, + final String reason + ) { + final SnapshotDeletionsPending pendingDeletions = currentState.custom(SnapshotDeletionsPending.TYPE); + if (pendingDeletions != null && pendingDeletions.contains(snapshotId)) { + throw new ConcurrentSnapshotExecutionException( + repositoryName, + snapshotId.getName(), + "cannot " + reason + " a snapshot already marked as deleted [" + repositoryName + ":" + snapshotId + "]" + ); + } + final SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); + if (deletionsInProgress != null + && deletionsInProgress.getEntries().stream().anyMatch(entry -> entry.getSnapshots().contains(snapshotId))) { + throw new ConcurrentSnapshotExecutionException( + repositoryName, + snapshotId.getName(), + "cannot " + + reason + + " a snapshot while a snapshot deletion is in-progress [" + + deletionsInProgress.getEntries().get(0) + + "]" + ); + } + } } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 6111a41f8ea98..f9bc5894e5969 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -35,6 +35,7 @@ import org.elasticsearch.cluster.RepositoryCleanupInProgress; import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.SnapshotDeletionsInProgress; +import org.elasticsearch.cluster.SnapshotDeletionsPending; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; import org.elasticsearch.cluster.SnapshotsInProgress.ShardState; @@ -47,6 +48,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; +import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; @@ -66,6 +68,7 @@ import org.elasticsearch.common.util.Maps; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; @@ -114,6 +117,8 @@ import static java.util.Collections.unmodifiableList; import static org.elasticsearch.cluster.SnapshotsInProgress.completed; +import static org.elasticsearch.snapshots.SnapshotUtils.cloneSources; +import static org.elasticsearch.snapshots.SnapshotUtils.ensureSnapshotNotDeletedOrPendingDeletion; /** * Service responsible for creating snapshots. This service runs all the steps executed on the master node during snapshot creation and @@ -173,6 +178,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus private final SystemIndices systemIndices; + private final SnapshotDeletionsPendingService snapshotPendingDeletions; + /** * Setting that specifies the maximum number of allowed concurrent snapshot create and delete operations in the * cluster state. The number of concurrent operations in a cluster state is defined as the sum of @@ -219,6 +226,7 @@ public SnapshotsService( .addSettingsUpdateConsumer(MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING, i -> maxConcurrentOperations = i); } this.systemIndices = systemIndices; + this.snapshotPendingDeletions = new SnapshotDeletionsPendingService(this, clusterService, threadPool, settings); } /** @@ -484,13 +492,7 @@ public ClusterState execute(ClusterState currentState) { SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY ); - if (deletionsInProgress.getEntries().stream().anyMatch(entry -> entry.getSnapshots().contains(sourceSnapshotId))) { - throw new ConcurrentSnapshotExecutionException( - repositoryName, - sourceSnapshotId.getName(), - "cannot clone from snapshot that is being deleted" - ); - } + ensureSnapshotNotDeletedOrPendingDeletion(currentState, repositoryName, sourceSnapshotId, "clone"); ensureBelowConcurrencyLimit(repositoryName, snapshotName, snapshots, deletionsInProgress); final List indicesForSnapshot = new ArrayList<>(); for (IndexId indexId : repositoryData.getIndices().values()) { @@ -680,7 +682,12 @@ public ClusterState execute(ClusterState currentState) { // shard snapshot state was based on all previous existing operations in progress // TODO: If we could eventually drop the snapshot clone init phase we don't need this any longer updatedEntries.add(updatedEntry); - return updateWithSnapshots(currentState, snapshotsInProgress.withUpdatedEntriesForRepo(repoName, updatedEntries), null); + return updateWithSnapshots( + currentState, + snapshotsInProgress.withUpdatedEntriesForRepo(repoName, updatedEntries), + null, + null + ); } return currentState; } @@ -986,6 +993,7 @@ public void applyClusterState(ClusterChangedEvent event) { } } } + snapshotPendingDeletions.processPendingDeletions(event.state(), event.previousState()); } catch (Exception e) { assert false : new AssertionError(e); logger.warn("Failed to update snapshot state ", e); @@ -1272,6 +1280,48 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState) }, newExecutor()); } + // only used in tests + public boolean isSnapshotPendingDeletionTriggered(SnapshotId snapshotId) { + return snapshotPendingDeletions.isTriggered(snapshotId); + } + + // only used in tests + public boolean isSnapshotPendingDeletionConflicting(SnapshotId snapshotId) { + return snapshotPendingDeletions.getConflict(snapshotId) != null; + } + + public static Optional findRepositoryForPendingDeletion( + final RepositoriesMetadata repositories, + final String repositoryName, + final String repositoryUuid + ) { + if (repositories != null) { + if (RepositoryData.MISSING_UUID.equals(repositoryUuid) == false) { + // the snapshot waiting to be deleted references a repository with a known uuid, + // let's try to find this repository among the existing ones first + return repositories.repositories() + .stream() + .filter(repo -> Objects.equals(repo.uuid(), repositoryUuid)) + .findFirst() + .or( + // there is no existing repository matching the uuid, + // let's try to find the repository by name among the existing ones that have no uuid + () -> repositories.repositories() + .stream() + .filter(repo -> Objects.equals(repo.uuid(), RepositoryData.MISSING_UUID)) + .filter(repo -> Objects.equals(repo.name(), repositoryName)) + .findFirst() + ); + } else { + // the snapshot waiting to be deleted does not references a repository with a known uuid, + // let's try to find the repository by name among the existing ones, in the hope that + // the snapshot will be found there. + return repositories.repositories().stream().filter(repo -> Objects.equals(repo.name(), repositoryName)).findFirst(); + } + } + return Optional.empty(); + } + private static ImmutableOpenMap processWaitingShardsAndRemovedNodes( SnapshotsInProgress.Entry entry, RoutingTable routingTable, @@ -1976,7 +2026,8 @@ public ClusterState execute(ClusterState currentState) { updatedState.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY), Collections.singletonList(snapshot.getSnapshotId()), snapshot.getRepository() - ) + ), + null ); } @@ -2048,24 +2099,68 @@ private void failSnapshotCompletionListeners(Snapshot snapshot, Exception e) { } /** - * Deletes snapshots from the repository. In-progress snapshots matched by the delete will be aborted before deleting them. + * Deletes snapshots from the repository. In-progress snapshots matched by the delete will be aborted before deleting them. Snapshots + * to delete are identified by their names. * * @param request delete snapshot request * @param listener listener */ - public void deleteSnapshots(final DeleteSnapshotRequest request, final ActionListener listener) { - final String repositoryName = request.repository(); - final String[] snapshotNames = request.snapshots(); + public void deleteSnapshotsByName(final DeleteSnapshotRequest request, final ActionListener listener) { + deleteSnapshots(request.repository(), request.snapshots(), null, request.masterNodeTimeout(), SnapshotId::getName, listener); + } + + /** + * Deletes snapshots from the repository. In-progress snapshots matched by the delete will be aborted before deleting them. Snapshots + * to delete are identified by their UUIDs. + * + * @param repositoryName the name of the repository that contains the snapshots to delete + * @param snapshotUuids the uuids of the snapshots to delete + * @param listener listener + */ + void deleteSnapshotsByUuid(final String repositoryName, final String[] snapshotUuids, final ActionListener listener) { + deleteSnapshots(repositoryName, null, snapshotUuids, null, SnapshotId::getUUID, listener); + } + + /** + * Deletes snapshots from the repository. In-progress snapshots matched by the delete will be aborted before deleting them. + * Snapshots to delete are identified by converting their {@link SnapshotId} to a {@link String} using the mapping function + * {@code mapping}; the resulting string is then compared to the snapshots names/uuids/patterns to match against. + * + * @param repositoryName the name of the repository that contains the snapshots to delete + * @param snapshotNames the names of the snapshots to delete + * @param snapshotUuids the uuids of the snapshots to delete + * @param masterNodeTimeout the timeout to use for the cluster state update task, or null if no time out is needed + * @param mapping the mapping function used to match the {@link SnapshotId} against the given snapshotNamesOrUuids + * @param listener listener + */ + private void deleteSnapshots( + final String repositoryName, + @Nullable final String[] snapshotNames, + @Nullable final String[] snapshotUuids, + @Nullable final TimeValue masterNodeTimeout, + final Function mapping, + final ActionListener listener + ) { + assert snapshotNames == null || snapshotNames.length > 0; + assert snapshotUuids == null || snapshotUuids.length > 0; + assert snapshotNames != null ^ snapshotUuids != null + : "either snapshots names or snapshots uuids must be not null, " + + "but got " + + Arrays.toString(snapshotNames) + + " and " + + Arrays.toString(snapshotUuids); + + final String[] snapshotsToDelete = snapshotNames != null ? snapshotNames : snapshotUuids; logger.info( () -> new ParameterizedMessage( "deleting snapshots [{}] from repository [{}]", - Strings.arrayToCommaDelimitedString(snapshotNames), + Strings.arrayToCommaDelimitedString(snapshotsToDelete), repositoryName ) ); final Repository repository = repositoriesService.repository(repositoryName); - repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(request.masterNodeTimeout()) { + repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(masterNodeTimeout) { private SnapshotDeletionsInProgress.Entry newDelete = null; @@ -2088,7 +2183,7 @@ public ClusterState execute(ClusterState currentState) { final SnapshotsInProgress snapshotsInProgress = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); for (SnapshotsInProgress.Entry entry : snapshotsInProgress.forRepo(repositoryName)) { final SnapshotId snapshotId = entry.snapshot().getSnapshotId(); - if (Regex.simpleMatch(snapshotNames, snapshotId.getName())) { + if (Regex.simpleMatch(snapshotsToDelete, mapping.apply(snapshotId))) { snapshotIds.add(snapshotId); } } @@ -2096,8 +2191,8 @@ public ClusterState execute(ClusterState currentState) { // find snapshots to delete in repository data final Map snapshotsIdsInRepository = repositoryData.getSnapshotIds() .stream() - .collect(Collectors.toMap(SnapshotId::getName, Function.identity())); - for (String snapshotOrPattern : snapshotNames) { + .collect(Collectors.toMap(mapping, Function.identity())); + for (String snapshotOrPattern : snapshotsToDelete) { if (Regex.isSimpleMatchPattern(snapshotOrPattern)) { for (Map.Entry entry : snapshotsIdsInRepository.entrySet()) { if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) { @@ -2107,7 +2202,7 @@ public ClusterState execute(ClusterState currentState) { } else { final SnapshotId foundId = snapshotsIdsInRepository.get(snapshotOrPattern); if (foundId == null) { - if (snapshotIds.stream().noneMatch(snapshotId -> snapshotId.getName().equals(snapshotOrPattern))) { + if (snapshotIds.stream().map(mapping).noneMatch(snapshot -> snapshot.equals(snapshotOrPattern))) { throw new SnapshotMissingException(repositoryName, snapshotOrPattern); } } else { @@ -2120,10 +2215,7 @@ public ClusterState execute(ClusterState currentState) { return currentState; } - final Set activeCloneSources = snapshotsInProgress.asStream() - .filter(SnapshotsInProgress.Entry::isClone) - .map(SnapshotsInProgress.Entry::source) - .collect(Collectors.toSet()); + final Set activeCloneSources = cloneSources(currentState); for (SnapshotId snapshotId : snapshotIds) { if (activeCloneSources.contains(snapshotId)) { throw new ConcurrentSnapshotExecutionException( @@ -2140,11 +2232,6 @@ public ClusterState execute(ClusterState currentState) { "delete snapshot" ); - final SnapshotDeletionsInProgress deletionsInProgress = currentState.custom( - SnapshotDeletionsInProgress.TYPE, - SnapshotDeletionsInProgress.EMPTY - ); - final RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY); // don't allow snapshot deletions while a restore is taking place, // otherwise we could end up deleting a snapshot that is being restored @@ -2186,9 +2273,14 @@ public ClusterState execute(ClusterState currentState) { ); if (snapshotIdsRequiringCleanup.isEmpty()) { // We only saw snapshots that could be removed from the cluster state right away, no need to update the deletions - return updateWithSnapshots(currentState, updatedSnapshots, null); + return updateWithSnapshots(currentState, updatedSnapshots, null, null); } + // add the snapshot deletion to the cluster state + final SnapshotDeletionsInProgress deletionsInProgress = currentState.custom( + SnapshotDeletionsInProgress.TYPE, + SnapshotDeletionsInProgress.EMPTY + ); final SnapshotDeletionsInProgress.Entry replacedEntry = deletionsInProgress.getEntries() .stream() .filter(entry -> entry.repository().equals(repositoryName)) @@ -2226,7 +2318,8 @@ public ClusterState execute(ClusterState currentState) { currentState, updatedSnapshots, (replacedEntry == null ? deletionsInProgress : deletionsInProgress.withRemovedEntry(replacedEntry.uuid())) - .withAddedEntry(newDelete) + .withAddedEntry(newDelete), + null ); } @@ -2461,6 +2554,12 @@ protected SnapshotDeletionsInProgress filterDeletions(SnapshotDeletionsInProgres return updatedDeletions == null ? deletions : updatedDeletions; } + @Nullable + @Override + protected SnapshotDeletionsPending filterPendingDeletions(@Nullable SnapshotDeletionsPending pendingDeletions) { + return pendingDeletions != null ? pendingDeletions.withRemovedSnapshots(deleteEntry.getSnapshots()) : null; + } + @Override protected void handleListeners(List> deleteListeners) { assert repositoryData.getSnapshotIds().stream().noneMatch(deleteEntry.getSnapshots()::contains) @@ -2546,8 +2645,9 @@ public ClusterState execute(ClusterState currentState) { return currentState; } final SnapshotDeletionsInProgress newDeletions = filterDeletions(updatedDeletions); + SnapshotDeletionsPending newPendingDeletions = filterPendingDeletions(currentState.custom(SnapshotDeletionsPending.TYPE)); final Tuple> res = readyDeletions( - updateWithSnapshots(currentState, updatedSnapshotsInProgress(currentState, newDeletions), newDeletions) + updateWithSnapshots(currentState, updatedSnapshotsInProgress(currentState, newDeletions), newDeletions, newPendingDeletions) ); readyDeletions = res.v2(); return res.v1(); @@ -2564,6 +2664,10 @@ protected SnapshotDeletionsInProgress filterDeletions(SnapshotDeletionsInProgres return deletions; } + protected SnapshotDeletionsPending filterPendingDeletions(SnapshotDeletionsPending pendingDeletions) { + return pendingDeletions; + } + @Override public final void clusterStateProcessed(ClusterState oldState, ClusterState newState) { final List> deleteListeners; @@ -2754,19 +2858,21 @@ private void markShardReassigned(RepositoryShardId shardId, Set mockInvocation.getArguments()[0] ); service = new MetadataDeleteIndexService(Settings.EMPTY, null, allocationService); @@ -107,7 +119,7 @@ public void testDeleteUnassigned() { ClusterState before = clusterState(index); // Mock the built reroute - when(allocationService.reroute(any(ClusterState.class), any(String.class))).then(i -> i.getArguments()[0]); + when(allocationService.reroute(any(ClusterState.class), anyString())).then(i -> i.getArguments()[0]); // Remove it ClusterState after = service.deleteIndices(before, Set.of(before.metadata().getIndices().get(index).getIndex())); @@ -118,7 +130,7 @@ public void testDeleteUnassigned() { assertNull(after.blocks().indices().get(index)); // Make sure we actually attempted to reroute - verify(allocationService).reroute(any(ClusterState.class), any(String.class)); + verify(allocationService).reroute(any(ClusterState.class), anyString()); } public void testDeleteIndexWithAnAlias() { @@ -213,6 +225,121 @@ public void testDeleteCurrentWriteIndexForDataStream() { ); } + public void testDeleteIndexWithSnapshotDeletion() { + final boolean deleteSnapshot = randomBoolean(); + final boolean knownRepositoryUuid = randomBoolean(); + final IndexMetadata indexMetadata = IndexMetadata.builder("test") + .settings( + Settings.builder() + .put("index.version.created", VersionUtils.randomVersion(random())) + .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOT_STORE_TYPE) + .put(SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY, "repo_name") + .put(SEARCHABLE_SNAPSHOTS_REPOSITORY_UUID_SETTING_KEY, knownRepositoryUuid ? "repo_uuid" : null) + .put(SEARCHABLE_SNAPSHOTS_SNAPSHOT_NAME_SETTING_KEY, "snap_name") + .put(SEARCHABLE_SNAPSHOTS_SNAPSHOT_UUID_SETTING_KEY, "snap_uuid") + .put(SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION, deleteSnapshot) + .build() + ) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + final ClusterState initialState = ClusterState.builder(ClusterName.DEFAULT) + .metadata( + Metadata.builder() + .put(indexMetadata, false) + .putCustom( + RepositoriesMetadata.TYPE, + new RepositoriesMetadata(List.of(new RepositoryMetadata("repo_name", "fs", Settings.EMPTY).withUuid("repo_uuid"))) + ) + ) + .routingTable(RoutingTable.builder().addAsNew(indexMetadata).build()) + .blocks(ClusterBlocks.builder().addBlocks(indexMetadata)) + .build(); + + final ClusterState updatedState = service.deleteIndices(initialState, Set.of(indexMetadata.getIndex())); + assertThat(updatedState.metadata().getIndices().get("test"), nullValue()); + assertThat(updatedState.blocks().indices().get("test"), nullValue()); + assertThat(updatedState.routingTable().index("test"), nullValue()); + + final SnapshotDeletionsPending updatedPendingDeletions = updatedState.custom(SnapshotDeletionsPending.TYPE); + if (deleteSnapshot) { + assertThat(updatedPendingDeletions, notNullValue()); + assertThat(updatedPendingDeletions.isEmpty(), equalTo(false)); + assertThat(updatedPendingDeletions.entries(), hasSize(1)); + SnapshotDeletionsPending.Entry entry = updatedPendingDeletions.entries().get(0); + assertThat(entry.getRepositoryName(), equalTo("repo_name")); + assertThat(entry.getRepositoryUuid(), knownRepositoryUuid ? equalTo("repo_uuid") : equalTo(RepositoryData.MISSING_UUID)); + assertThat(updatedPendingDeletions.contains(new SnapshotId("snap_name", "snap_uuid")), equalTo(true)); + } else { + assertThat(updatedPendingDeletions, nullValue()); + } + } + + public void testDeleteMultipleIndicesWithSnapshotDeletion() { + RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), "fs", Settings.EMPTY); + if (randomBoolean()) { + repositoryMetadata = repositoryMetadata.withUuid(UUIDs.randomBase64UUID()); + } + + final Metadata.Builder metadataBuilder = Metadata.builder(); + metadataBuilder.putCustom(RepositoriesMetadata.TYPE, new RepositoriesMetadata(List.of(repositoryMetadata))); + final RoutingTable.Builder routingBuilder = RoutingTable.builder(); + + final SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(10), UUIDs.randomBase64UUID()); + final Set indices = new HashSet<>(); + + final int nbIndices = randomIntBetween(2, 10); + for (int i = 0; i < nbIndices; i++) { + Settings.Builder indexSettingsBuilder = Settings.builder() + .put("index.version.created", VersionUtils.randomVersion(random())) + .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOT_STORE_TYPE) + .put(SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION, true) + .put(SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY, repositoryMetadata.name()) + .put(SEARCHABLE_SNAPSHOTS_SNAPSHOT_NAME_SETTING_KEY, snapshotId.getName()) + .put(SEARCHABLE_SNAPSHOTS_SNAPSHOT_UUID_SETTING_KEY, snapshotId.getUUID()); + if (randomBoolean()) { + indexSettingsBuilder.put(SEARCHABLE_SNAPSHOTS_REPOSITORY_UUID_SETTING_KEY, repositoryMetadata.uuid()); + } + IndexMetadata indexMetadata = IndexMetadata.builder(randomAlphaOfLength(10) + i) + .settings(indexSettingsBuilder.build()) + .numberOfShards(randomIntBetween(1, 3)) + .numberOfReplicas(randomInt(1)) + .build(); + metadataBuilder.put(indexMetadata, false); + routingBuilder.addAsNew(indexMetadata); + indices.add(indexMetadata.getIndex()); + } + + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .routingTable(routingBuilder.build()) + .metadata(metadataBuilder) + .build(); + + SnapshotDeletionsPending pendingDeletions = clusterState.custom(SnapshotDeletionsPending.TYPE, SnapshotDeletionsPending.EMPTY); + while (indices.size() > 0) { + assertThat(pendingDeletions.isEmpty(), equalTo(true)); + + List indicesToDelete = randomSubsetOf(randomIntBetween(1, Math.max(1, indices.size() - 1)), indices); + clusterState = service.deleteIndices(clusterState, Set.copyOf(indicesToDelete)); + indicesToDelete.forEach(indices::remove); + + for (Index deletedIndex : indicesToDelete) { + assertThat(clusterState.metadata().index(deletedIndex), nullValue()); + assertThat(clusterState.routingTable().index(deletedIndex), nullValue()); + } + + pendingDeletions = clusterState.custom(SnapshotDeletionsPending.TYPE, SnapshotDeletionsPending.EMPTY); + } + + assertThat(pendingDeletions.isEmpty(), equalTo(false)); + assertThat(pendingDeletions.entries(), hasSize(1)); + assertThat(pendingDeletions.contains(snapshotId), equalTo(true)); + SnapshotDeletionsPending.Entry entry = pendingDeletions.entries().get(0); + assertThat(entry.getRepositoryName(), equalTo(repositoryMetadata.name())); + assertThat(entry.getRepositoryUuid(), anyOf(equalTo(repositoryMetadata.uuid()), equalTo(RepositoryData.MISSING_UUID))); + assertThat(entry.getSnapshotId(), equalTo(snapshotId)); + } + private ClusterState clusterState(String index) { IndexMetadata indexMetadata = IndexMetadata.builder(index) .settings(Settings.builder().put("index.version.created", VersionUtils.randomVersion(random()))) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotDeletionsPendingServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotDeletionsPendingServiceTests.java new file mode 100644 index 0000000000000..c502fb6c7459f --- /dev/null +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotDeletionsPendingServiceTests.java @@ -0,0 +1,423 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.snapshots; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.RepositoryCleanupInProgress; +import org.elasticsearch.cluster.RestoreInProgress; +import org.elasticsearch.cluster.SnapshotDeletionsPending; +import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.RepositoriesMetadata; +import org.elasticsearch.cluster.metadata.RepositoryMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.snapshots.SnapshotDeletionsPendingService.ConflictType; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static java.util.Collections.emptyMap; +import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.READONLY_SETTING_KEY; +import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; +import static org.elasticsearch.test.ClusterServiceUtils.setState; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class SnapshotDeletionsPendingServiceTests extends ESTestCase { + + private SnapshotsService snapshotsService; + private DeterministicTaskQueue taskQueue; + private ClusterService clusterService; + private TestThreadPool threadPool; + private SnapshotDeletionsPendingService executor; + + @Before + @Override + @SuppressWarnings("unchecked") + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool(getTestName()); + clusterService = createClusterService(threadPool); + snapshotsService = mock(SnapshotsService.class); + doAnswer(invocation -> { + ((ActionListener) invocation.getArgument(2)).onResponse(null); + return null; + }).when(snapshotsService).deleteSnapshotsByUuid(any(), any(), any()); + taskQueue = new DeterministicTaskQueue(); + executor = new SnapshotDeletionsPendingService(snapshotsService, clusterService, taskQueue.getThreadPool(), Settings.EMPTY); + clusterService.addStateApplier(event -> executor.processPendingDeletions(event.state(), event.previousState())); + } + + @Override + @After + public void tearDown() throws Exception { + super.tearDown(); + clusterService.close(); + snapshotsService = null; + terminate(threadPool); + } + + public void testSnapshotDeletionsPendingsAreTriggered() throws Exception { + final RepositoryMetadata repository1 = randomRepository(); + setState(clusterService, stateWithRepository(emptyState(), repository1)); + + final SnapshotId snapshot1 = randomSnapshotId(); + ClusterState state = stateWithNewPending(clusterService.state(), repository1, snapshot1, 1); + assertThat(executor.pendingDeletionsChanged(state, clusterService.state()), is(true)); + + setState(clusterService, state); + + assertThat(executor.isTriggered(snapshot1), is(true)); + assertThat(taskQueue.hasRunnableTasks(), is(true)); + + runAllTasks(); + + assertBusy(() -> assertThat(executor.isTriggered(snapshot1), is(false))); + verify(snapshotsService, times(1)).deleteSnapshotsByUuid(eq(repository1.name()), any(), any()); + assertNoSnapshotDeletionsPendingsInClusterState(); + + final RepositoryMetadata repository2 = randomRepository(); + state = stateWithRepository(clusterService.state(), repository2); + assertThat(executor.pendingDeletionsChanged(state, clusterService.state()), is(false)); + + setState(clusterService, state); + + final SnapshotId snapshot2 = randomSnapshotId(); + state = stateWithNewPending(clusterService.state(), repository2, snapshot2, 2); + final SnapshotId snapshot3 = randomSnapshotId(); + state = stateWithNewPending(state, repository2, snapshot3, 3); + final SnapshotId snapshot4 = randomSnapshotId(); + state = stateWithNewPending(state, repository2, snapshot4, 4); + assertThat(executor.pendingDeletionsChanged(state, clusterService.state()), is(true)); + + setState(clusterService, state); + + assertThat(executor.isTriggered(snapshot2), is(true)); + assertThat(executor.isTriggered(snapshot3), is(true)); + assertThat(executor.isTriggered(snapshot4), is(true)); + assertThat(taskQueue.hasRunnableTasks(), is(true)); + + runAllTasks(); + + assertBusy(() -> assertThat(executor.isTriggered(snapshot2), is(false))); + assertThat(executor.isTriggered(snapshot3), is(false)); + assertThat(executor.isTriggered(snapshot4), is(false)); + verify(snapshotsService, times(3)).deleteSnapshotsByUuid(eq(repository2.name()), any(), any()); + assertNoSnapshotDeletionsPendingsInClusterState(); + } + + public void testSnapshotDeletionsPendingsWithRandomConflict() throws Exception { + final RepositoryMetadata repository = randomRepository(); + ClusterState state = stateWithRepository(emptyState(), repository); + + setState(clusterService, state); + + final SnapshotId snapshot = randomSnapshotId(); + state = stateWithNewPending(state, repository, snapshot, 1L); + assertThat(executor.pendingDeletionsChanged(state, clusterService.state()), is(true)); + assertThat(executor.pendingDeletionsWithConflictsChanged(state, clusterService.state()), is(false)); + + final ConflictType conflict = randomFrom(ConflictType.values()); + state = addConflict(conflict, state, repository, snapshot); + + assertThat(executor.pendingDeletionsChanged(state, clusterService.state()), is(true)); + assertThat(executor.pendingDeletionsWithConflictsChanged(state, clusterService.state()), is(false)); + + setState(clusterService, state); + + assertThat(executor.getConflict(snapshot), equalTo(conflict)); + assertThat(executor.isTriggered(snapshot), is(false)); + assertThat(taskQueue.hasRunnableTasks(), is(false)); + + state = resolveConflict(conflict, state, repository, snapshot); + assertThat(executor.pendingDeletionsChanged(state, clusterService.state()), is(false)); + assertThat(executor.pendingDeletionsWithConflictsChanged(state, clusterService.state()), is(true)); + + setState(clusterService, state); + + assertThat(executor.isTriggered(snapshot), is(true)); + assertThat(taskQueue.hasRunnableTasks(), is(true)); + + runAllTasks(); + + assertBusy(() -> assertThat(executor.isTriggered(snapshot), is(false))); + verify(snapshotsService, times(1)).deleteSnapshotsByUuid(any(), any(), any()); + assertNoSnapshotDeletionsPendingsInClusterState(); + } + + private void runAllTasks() { + taskQueue.runAllTasks(); + assertThat(taskQueue.hasRunnableTasks(), is(false)); + } + + private void assertNoSnapshotDeletionsPendingsInClusterState() { + SnapshotDeletionsPending current = clusterService.state().custom(SnapshotDeletionsPending.TYPE, SnapshotDeletionsPending.EMPTY); + assertThat(Strings.toString(current), current.isEmpty(), is(true)); + } + + private static ClusterState emptyState() { + return ClusterState.builder(ClusterState.EMPTY_STATE) + .nodes( + DiscoveryNodes.builder() + .add(new DiscoveryNode("_node", buildNewFakeTransportAddress(), emptyMap(), DiscoveryNodeRole.roles(), Version.CURRENT)) + .localNodeId("_node") + .masterNodeId("_node") + ) + .metadata(Metadata.builder().generateClusterUuidIfNeeded()) + .build(); + } + + private static RepositoryMetadata randomRepository() { + return new RepositoryMetadata( + randomAlphaOfLength(10).toLowerCase(Locale.ROOT), + UUIDs.randomBase64UUID(random()), + "fs", + Settings.EMPTY, + RepositoryData.UNKNOWN_REPO_GEN, + RepositoryData.EMPTY_REPO_GEN + ); + } + + private static SnapshotId randomSnapshotId() { + return new SnapshotId(randomAlphaOfLength(10).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())); + } + + private static ClusterState stateWithRepository(ClusterState previousState, RepositoryMetadata repositoryMetadata) { + final RepositoriesMetadata previousRepos = previousState.metadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY); + assertThat(previousRepos.repository(repositoryMetadata.name()), nullValue()); + final List newRepos = new ArrayList<>(previousRepos.repositories()); + newRepos.add(repositoryMetadata); + return ClusterState.builder(previousState) + .version(previousState.version() + 1L) + .metadata(Metadata.builder(previousState.metadata()).putCustom(RepositoriesMetadata.TYPE, new RepositoriesMetadata(newRepos))) + .build(); + } + + private static ClusterState stateWithNewPending( + ClusterState previousState, + RepositoryMetadata repository, + SnapshotId snapshot, + long creationTime + ) { + SnapshotDeletionsPending previousPendings = previousState.custom(SnapshotDeletionsPending.TYPE, SnapshotDeletionsPending.EMPTY); + SnapshotDeletionsPending.Builder pendings = new SnapshotDeletionsPending.Builder(previousPendings, e -> {}); + pendings.add(repository.name(), repository.uuid(), snapshot, creationTime); + + ImmutableOpenMap.Builder customs = ImmutableOpenMap.builder(previousState.getCustoms()); + customs.put(SnapshotDeletionsPending.TYPE, pendings.build(Settings.EMPTY)); + return ClusterState.builder(previousState).version(previousState.version() + 1L).customs(customs.build()).build(); + } + + private static ClusterState stateWithRestore(ClusterState previousState, RepositoryMetadata repository, SnapshotId snapshot) { + return ClusterState.builder(previousState) + .version(previousState.version() + 1L) + .putCustom( + RestoreInProgress.TYPE, + new RestoreInProgress.Builder(previousState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)).add( + new RestoreInProgress.Entry( + UUIDs.randomBase64UUID(random()), + new Snapshot(repository.name(), snapshot), + RestoreInProgress.State.INIT, + List.of(randomAlphaOfLength(10).toLowerCase(Locale.ROOT)), + null + ) + ).build() + ) + .build(); + } + + private static ClusterState stateWithoutRestore(ClusterState previousState, RepositoryMetadata repository, SnapshotId snapshot) { + final RestoreInProgress.Builder builder = new RestoreInProgress.Builder(); + boolean found = false; + for (RestoreInProgress.Entry entry : previousState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)) { + final Snapshot restored = entry.snapshot(); + if (restored.getRepository().equals(repository.name()) && snapshot.equals(restored.getSnapshotId())) { + found = true; + } else { + builder.add(entry); + } + } + assertThat("Restore not found: " + snapshot, found, is(true)); + return ClusterState.builder(previousState) + .version(previousState.version() + 1L) + .putCustom(RestoreInProgress.TYPE, builder.build()) + .build(); + } + + private static ClusterState stateWithClone(ClusterState previousState, RepositoryMetadata repository, SnapshotId snapshot) { + return ClusterState.builder(previousState) + .version(previousState.version() + 1L) + .putCustom( + SnapshotsInProgress.TYPE, + previousState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY) + .withAddedEntry( + SnapshotsInProgress.startClone( + new Snapshot(repository.name(), randomSnapshotId()), + snapshot, + Map.of( + randomAlphaOfLength(10).toLowerCase(Locale.ROOT), + new IndexId(randomAlphaOfLength(10).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())) + ), + 0, + 1, + Version.CURRENT + ) + ) + ) + .build(); + } + + private static ClusterState stateWithoutClone(ClusterState previousState, RepositoryMetadata repository, SnapshotId snapshot) { + final SnapshotsInProgress snapshots = previousState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); + List entries = new ArrayList<>(); + boolean found = false; + for (SnapshotsInProgress.Entry entry : snapshots.forRepo(repository.name())) { + if (entry.isClone() && snapshot.equals(entry.source())) { + found = true; + } else { + entries.add(entry); + } + } + assertThat("Clone not found: " + snapshot, found, is(true)); + return ClusterState.builder(previousState) + .version(previousState.version() + 1L) + .putCustom(SnapshotsInProgress.TYPE, snapshots.withUpdatedEntriesForRepo(repository.name(), entries)) + .build(); + } + + private static ClusterState stateWithRepositoryCleanUp(ClusterState previousState, RepositoryMetadata repository) { + RepositoriesMetadata repositories = previousState.metadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY); + assertThat(repositories.repository(repository.name()), notNullValue()); + RepositoryCleanupInProgress cleanUps = previousState.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY); + assertThat(cleanUps.hasCleanupInProgress(), is(false)); + return ClusterState.builder(previousState) + .version(previousState.version() + 1L) + .putCustom( + RepositoryCleanupInProgress.TYPE, + new RepositoryCleanupInProgress(List.of(RepositoryCleanupInProgress.startedEntry(repository.name(), 0L))) + ) + .build(); + } + + private static ClusterState stateWithoutRepositoryCleanUp(ClusterState previousState, RepositoryMetadata repository) { + RepositoriesMetadata repositories = previousState.metadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY); + assertThat(repositories.repository(repository.name()), notNullValue()); + RepositoryCleanupInProgress cleanUps = previousState.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY); + assertThat(cleanUps.hasCleanupInProgress(), is(true)); + return ClusterState.builder(previousState) + .version(previousState.version() + 1L) + .putCustom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY) + .build(); + } + + private static ClusterState updateRepository(ClusterState previousState, String repositoryName, RepositoryMetadata repositoryMetadata) { + final RepositoriesMetadata previousRepos = previousState.metadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY); + assertThat(previousRepos.repository(repositoryName), notNullValue()); + final List newRepos = new ArrayList<>(); + for (RepositoryMetadata r : previousRepos.repositories()) { + if (r.name().equals(repositoryName)) { + newRepos.add(repositoryMetadata); + } else { + newRepos.add(r); + } + } + return ClusterState.builder(previousState) + .version(previousState.version() + 1L) + .metadata(Metadata.builder(previousState.metadata()).putCustom(RepositoriesMetadata.TYPE, new RepositoriesMetadata(newRepos))) + .build(); + } + + private static ClusterState addConflict( + ConflictType conflict, + ClusterState previousState, + RepositoryMetadata repository, + SnapshotId snapshot + ) { + return switch (conflict) { + case RESTORING -> stateWithRestore(previousState, repository, snapshot); + case CLONING -> stateWithClone(previousState, repository, snapshot); + case REPO_CLEANUP -> stateWithRepositoryCleanUp(previousState, repository); + case REPO_READONLY -> updateRepository( + previousState, + repository.name(), + repository.withSettings(Settings.builder().put(repository.settings()).put(READONLY_SETTING_KEY, true).build()) + ); + case REPO_MISSING -> updateRepository( + previousState, + repository.name(), + new RepositoryMetadata( + "missing", + RepositoryData.MISSING_UUID, + "fs", + repository.settings(), + repository.generation(), + repository.pendingGeneration() + ) + ); + }; + } + + private ClusterState resolveConflict( + ConflictType conflict, + ClusterState previousState, + RepositoryMetadata repository, + SnapshotId snapshot + ) { + return switch (conflict) { + case RESTORING -> stateWithoutRestore(previousState, repository, snapshot); + case CLONING -> stateWithoutClone(previousState, repository, snapshot); + case REPO_CLEANUP -> stateWithoutRepositoryCleanUp(previousState, repository); + case REPO_READONLY -> updateRepository( + previousState, + repository.name(), + repository.withSettings(Settings.builder().put(repository.settings()).put(READONLY_SETTING_KEY, false).build()) + ); + case REPO_MISSING -> updateRepository( + previousState, + "missing", + randomBoolean() + ? repository + : new RepositoryMetadata( + randomAlphaOfLength(10).toLowerCase(Locale.ROOT), + repository.uuid(), + "fs", + repository.settings(), + repository.generation(), + repository.pendingGeneration() + ) + ); + }; + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java index 1e1ff4d62ebfd..286371ccd4a80 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java @@ -11,6 +11,8 @@ import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.cluster.SnapshotDeletionsInProgress; +import org.elasticsearch.cluster.SnapshotDeletionsPending; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -336,4 +338,19 @@ protected void assertExecutorIsIdle(String executorName) throws Exception { } }); } + + protected void awaitNoMoreSnapshotsDeletions() throws Exception { + final String master = internalCluster().getMasterName(); + awaitClusterState(logger, master, state -> { + final SnapshotDeletionsInProgress deletions = state.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY); + if (deletions.hasDeletionsInProgress()) { + return false; + } + final SnapshotDeletionsPending pendingDeletions = state.custom(SnapshotDeletionsPending.TYPE, SnapshotDeletionsPending.EMPTY); + if (pendingDeletions.isEmpty() == false) { + return false; + } + return true; + }); + } } diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsPendingDeletionsIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsPendingDeletionsIntegTests.java new file mode 100644 index 0000000000000..6443ce557aae5 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsPendingDeletionsIntegTests.java @@ -0,0 +1,617 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.searchablesnapshots; + +import org.apache.lucene.search.TotalHits; +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryResponse; +import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.elasticsearch.action.support.GroupedActionListener; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.RepositoryCleanupInProgress; +import org.elasticsearch.cluster.RestoreInProgress; +import org.elasticsearch.cluster.SnapshotDeletionsPending; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.TriConsumer; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.snapshots.ConcurrentSnapshotExecutionException; +import org.elasticsearch.snapshots.RestoreInfo; +import org.elasticsearch.snapshots.SnapshotDeletionsPendingService; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.snapshots.SnapshotMissingException; +import org.elasticsearch.snapshots.SnapshotState; +import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction; +import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest; + +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; + +import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING; +import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.READONLY_SETTING_KEY; +import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION; +import static org.elasticsearch.snapshots.SnapshotDeletionsPendingService.PENDING_SNAPSHOT_DELETIONS_EXPIRATION_INTERVAL_SETTING; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; + +public class SearchableSnapshotsPendingDeletionsIntegTests extends BaseFrozenSearchableSnapshotsIntegTestCase { + + public void testSnapshotPendingDeletionCannotBeMounted() throws Exception { + blockSnapshotDeletionThenExecute((repository, snapshot, index) -> { + ConcurrentSnapshotExecutionException exception = expectThrows( + ConcurrentSnapshotExecutionException.class, + () -> mountSnapshot(repository, snapshot.getName(), index, Settings.EMPTY) + ); + assertThat(exception.getMessage(), containsString("cannot restore a snapshot already marked as deleted")); + }); + } + + public void testSnapshotPendingDeletionCannotBeRestored() throws Exception { + blockSnapshotDeletionThenExecute((repository, snapshot, index) -> { + ConcurrentSnapshotExecutionException exception = expectThrows( + ConcurrentSnapshotExecutionException.class, + () -> client().admin().cluster().prepareRestoreSnapshot(repository, snapshot.getName()).setWaitForCompletion(true).get() + ); + assertThat(exception.getMessage(), containsString("cannot restore a snapshot already marked as deleted")); + }); + } + + public void testSnapshotPendingDeletionCannotBeCloned() throws Exception { + blockSnapshotDeletionThenExecute((repository, snapshot, index) -> { + ConcurrentSnapshotExecutionException exception = expectThrows( + ConcurrentSnapshotExecutionException.class, + () -> client().admin().cluster().prepareCloneSnapshot(repository, snapshot.getName(), "target").setIndices("*").get() + ); + assertThat(exception.getMessage(), containsString("cannot clone a snapshot already marked as deleted")); + }); + } + + public void testSearchableSnapshotIsDeletedWhenRepoIsRecreated() throws Exception { + mountIndexThenExecute((repository, snapshot, index) -> { + try { + final Settings.Builder repositorySettings = getRepositorySettings(repository); + updateRepositoryReadOnly(repository, true); + + assertAcked(client().admin().indices().prepareDelete(mountedIndex(index))); + awaitSnapshotPendingDeletion(snapshot); + assertConflict(snapshot); + + assertAcked(client().admin().cluster().prepareDeleteRepository(repository)); + awaitSnapshotPendingDeletion(snapshot); + assertConflict(snapshot); + + final String repoName; + if (randomBoolean()) { + // re register the repository without verification: the snapshot + // pending deletion logic should try to delete the snapshot based + // on the repository name + repoName = repository; + createRepository(repoName, "mock", repositorySettings, false); + } else { + // re register the repository under a different name: the snapshot + // pending deletion logic should try to delete the snapshot based + // on the repository uuid, that is why we force a verification here + repoName = "new_" + repository; + createRepository(repoName, "mock", repositorySettings, true); + } + awaitNoMoreSnapshotsDeletions(); + assertNoConflict(snapshot); + + expectThrows( + SnapshotMissingException.class, + () -> client().admin().cluster().prepareGetSnapshots(repoName).setSnapshots(snapshot.getName()).get() + ); + } catch (Exception e) { + throw new AssertionError(e); + } + }); + } + + public void testSearchableSnapshotIsDeletedWithOnGoingRestore() throws Exception { + mountIndexThenExecute((repository, snapshot, index) -> { + try { + final String masterNode = internalCluster().getMasterName(); + blockMasterOnShardLevelSnapshotFile(repository, getRepositoryData(repository).resolveIndexId(index).getId()); + + final ActionFuture restoreFuture = client().admin() + .cluster() + .prepareRestoreSnapshot(repository, snapshot.getName()) + .setIndices(index) + .setRenamePattern("(.+)") + .setRenameReplacement("old_$1") + .setWaitForCompletion(true) + .execute(); + awaitClusterState(state -> state.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY).isEmpty() == false); + waitForBlock(masterNode, repository); + + assertAcked(client().admin().indices().prepareDelete(mountedIndex(index))); + awaitSnapshotPendingDeletion(snapshot); + assertFalse(restoreFuture.isDone()); + assertConflict(snapshot); + + unblockNode(repository, masterNode); + awaitNoMoreSnapshotsDeletions(); + + final RestoreInfo restoreInfoResponse = restoreFuture.actionGet().getRestoreInfo(); + assertThat(restoreInfoResponse.successfulShards(), greaterThan(0)); + assertThat(restoreInfoResponse.failedShards(), equalTo(0)); + assertNoConflict(snapshot); + + expectThrows( + SnapshotMissingException.class, + () -> client().admin().cluster().prepareGetSnapshots(repository).setSnapshots(snapshot.getName()).get() + ); + } catch (Exception e) { + throw new AssertionError(e); + } + }); + } + + public void testSearchableSnapshotIsDeletedWithOnGoingClone() throws Exception { + mountIndexThenExecute((repository, snapshot, index) -> { + try { + final String masterNode = internalCluster().getMasterName(); + blockMasterOnShardLevelSnapshotFile(repository, getRepositoryData(repository).resolveIndexId(index).getId()); + + final String cloneTarget = "target-snapshot"; + final ActionFuture cloneFuture = clusterAdmin().prepareCloneSnapshot( + repository, + snapshot.getName(), + cloneTarget + ).setIndices(index).execute(); + awaitNumberOfSnapshotsInProgress(1); + waitForBlock(masterNode, repository); + + assertAcked(client().admin().indices().prepareDelete(mountedIndex(index))); + awaitSnapshotPendingDeletion(snapshot); + assertFalse(cloneFuture.isDone()); + assertConflict(snapshot); + + unblockNode(repository, masterNode); + awaitNoMoreSnapshotsDeletions(); + assertAcked(cloneFuture.get()); + assertNoConflict(snapshot); + + expectThrows( + SnapshotMissingException.class, + () -> client().admin().cluster().prepareGetSnapshots(repository).setSnapshots(snapshot.getName()).get() + ); + } catch (Exception e) { + throw new AssertionError(e); + } + }); + } + + public void testSearchableSnapshotIsDeletedWithOnGoingDeletion() throws Exception { + mountIndexThenExecute((repository, snapshot, index) -> { + try { + blockMasterOnWriteIndexFile(repository); + final String masterNode = internalCluster().getMasterName(); + + final ActionFuture deleteFuture = startDeleteSnapshot(repository, snapshot.getName()); + waitForBlock(masterNode, repository); + + assertAcked(client().admin().indices().prepareDelete(mountedIndex(index))); + awaitSnapshotPendingDeletion(snapshot); + assertFalse(deleteFuture.isDone()); + + unblockNode(repository, masterNode); + awaitNoMoreSnapshotsDeletions(); + assertAcked(deleteFuture.get()); + + expectThrows( + SnapshotMissingException.class, + () -> client().admin().cluster().prepareGetSnapshots(repository).setSnapshots(snapshot.getName()).get() + ); + } catch (Exception e) { + throw new AssertionError(e); + } + }); + } + + public void testSearchableSnapshotsDeletionsWithConcurrentDeletes() throws Exception { + final String repository = "repository"; + final Settings.Builder repositorySettings = randomRepositorySettings(); + createRepository(repository, FsRepository.TYPE, repositorySettings); + + final String[] indices = new String[randomIntBetween(1, 10)]; + for (int i = 0; i < indices.length; i++) { + final String index = "index-" + i; + assertAcked( + prepareCreate( + index, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(INDEX_SOFT_DELETES_SETTING.getKey(), true) + ) + ); + indices[i] = index; + } + + final int nbSnapshots = randomIntBetween(1, 10); + final CountDownLatch snapshotLatch = new CountDownLatch(nbSnapshots); + final List snapshots = new CopyOnWriteArrayList<>(); + for (int i = 0; i < nbSnapshots; i++) { + final String snapshot = "snapshot-" + i; + client().admin() + .cluster() + .prepareCreateSnapshot(repository, snapshot) + // must contain 1 index to be used with SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION + .setIndices(randomFrom(indices)) + .setWaitForCompletion(true) + .execute(new ActionListener<>() { + @Override + public void onResponse(CreateSnapshotResponse response) { + assertThat(response.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS)); + snapshots.add(response.getSnapshotInfo()); + snapshotLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + final AssertionError error = new AssertionError("error during snapshot", e); + logger.error("test failed", error); + snapshotLatch.countDown(); + throw error; + } + }); + } + snapshotLatch.await(); + + final int nbMounts = randomIntBetween(1, 10); + final CountDownLatch mountLatch = new CountDownLatch(nbMounts); + final Map mounts = ConcurrentCollections.newConcurrentMap(); + + for (int i = 0; i < nbMounts; i++) { + final String mount = "mount-" + i; + + final SnapshotInfo snapshotInfo = randomFrom(snapshots); + client().execute( + MountSearchableSnapshotAction.INSTANCE, + new MountSearchableSnapshotRequest( + mount, + repository, + snapshotInfo.snapshot().getSnapshotId().getName(), + randomFrom(snapshotInfo.indices()), + Settings.builder().put(SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION, true).build(), + Strings.EMPTY_ARRAY, + true, + randomFrom(MountSearchableSnapshotRequest.Storage.values()) + ), + new ActionListener<>() { + @Override + public void onResponse(RestoreSnapshotResponse response) { + assertThat(response.getRestoreInfo().successfulShards(), greaterThan(0)); + assertThat(response.getRestoreInfo().failedShards(), equalTo(0)); + mounts.put(mount, snapshotInfo.snapshotId()); + mountLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + final AssertionError error = new AssertionError("error during mount", e); + logger.error("test failed", error); + mountLatch.countDown(); + throw error; + } + } + ); + } + mountLatch.await(); + + if (randomBoolean()) { + // Force the re-registration of the repository with randomized value for the "verify" flag; + // it helps to test the case where the repository UUID is unknown at the time the snapshot + // is marked to be deleted. + assertAcked( + clusterAdmin().preparePutRepository(repository) + .setType(FsRepository.TYPE) + .setVerify(randomBoolean()) + .setSettings(randomBoolean() ? repositorySettings : repositorySettings.put("dummy", randomInt())) + ); + } + + final CyclicBarrier startThreads = new CyclicBarrier(2); + final Thread deleteAllSnapshotsThread = new Thread(() -> { + try { + startThreads.await(); + final CountDownLatch latch = new CountDownLatch(snapshots.size()); + for (SnapshotInfo snapshot : snapshots) { + client().admin() + .cluster() + .prepareDeleteSnapshot(repository, snapshot.snapshotId().getName()) + .execute(ActionListener.wrap(latch::countDown)); + + } + latch.await(); + } catch (Exception e) { + throw new AssertionError(e); + } + }); + deleteAllSnapshotsThread.start(); + + final Thread deleteAllMountedIndicesThread = new Thread(() -> { + try { + ensureGreen("mount-*"); + startThreads.await(); + final Set mountedIndices = new HashSet<>(mounts.keySet()); + do { + List deletions = randomSubsetOf(randomIntBetween(1, mountedIndices.size()), mountedIndices); + assertAcked(client().admin().indices().prepareDelete(deletions.toArray(String[]::new))); + deletions.forEach(mountedIndices::remove); + } while (mountedIndices.isEmpty() == false); + } catch (Exception e) { + throw new AssertionError(e); + } + }); + deleteAllMountedIndicesThread.start(); + + deleteAllMountedIndicesThread.join(); + deleteAllSnapshotsThread.join(); + awaitNoMoreSnapshotsDeletions(); + + final GetSnapshotsResponse getSnapshotsResponse = client().admin().cluster().prepareGetSnapshots(repository).get(); + assertTrue(getSnapshotsResponse.getSnapshots().stream().noneMatch(snapshotInfo -> mounts.containsValue(snapshotInfo.snapshotId()))); + } + + public void testSearchableSnapshotIsDeletedWithOnRepoCleanUp() throws Exception { + mountIndexThenExecute((repository, snapshot, index) -> { + try { + final int garbageFiles = between(1, 10); + final int garbageSize = between(1, 50); + final PlainActionFuture> garbageFuture = PlainActionFuture.newFuture(); + final GroupedActionListener garbageGroupedListener = new GroupedActionListener<>(garbageFuture, garbageFiles); + + final BlobStoreRepository blobRepository = getRepositoryOnMaster(repository); + for (int i = 0; i < garbageFiles; i++) { + int garbageId = i; + blobRepository.threadPool() + .generic() + .execute( + ActionRunnable.run( + garbageGroupedListener, + () -> blobRepository.blobStore() + .blobContainer(blobRepository.basePath()) + .writeBlob("snap-" + garbageId + ".dat", new BytesArray(randomByteArrayOfLength(garbageSize)), true) + ) + ); + } + garbageFuture.get(); + + // repository clean up writes a new index-N blob to ensure concurrent operations will fail so we can block on this + blockMasterOnWriteIndexFile(repository); + + final ActionFuture cleanUpFuture = client().admin() + .cluster() + .prepareCleanupRepository(repository) + .execute(); + + final String masterNode = internalCluster().getMasterName(); + awaitClusterState( + state -> state.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY).hasCleanupInProgress() + ); + waitForBlock(masterNode, repository); + + assertAcked(client().admin().indices().prepareDelete(mountedIndex(index))); + awaitSnapshotPendingDeletion(snapshot); + assertFalse(cleanUpFuture.isDone()); + assertConflict(snapshot); + + unblockNode(repository, masterNode); + awaitNoMoreSnapshotsDeletions(); + assertNoConflict(snapshot); + + final CleanupRepositoryResponse cleanUpResponse = cleanUpFuture.get(); + assertThat(cleanUpResponse.result().blobs(), equalTo((long) garbageFiles)); + assertThat(cleanUpResponse.result().bytes(), equalTo((long) garbageSize * garbageFiles)); + + expectThrows( + SnapshotMissingException.class, + () -> client().admin().cluster().prepareGetSnapshots(repository).setSnapshots(snapshot.getName()).get() + ); + } catch (Exception e) { + throw new AssertionError(e); + } + }); + } + + public void testSnapshotDeletionsPendingIsRemovedFromClusterStateAfterExpiration() throws Exception { + mountIndexThenExecute((repository, snapshot, index) -> { + try { + assertAcked( + clusterAdmin().prepareUpdateSettings() + .setTransientSettings( + Settings.builder() + .put( + SnapshotDeletionsPendingService.PENDING_SNAPSHOT_DELETIONS_RETRY_INTERVAL_SETTING.getKey(), + TimeValue.timeValueMillis(randomLongBetween(100L, 1000L)) + ) + .build() + ) + ); + + assertAcked( + clusterAdmin().preparePutRepository(repository) + .setVerify(false) + .setType("mock") + .setSettings( + Settings.builder() + .put(getRepositorySettings(repository).build()) + .put("random_control_io_exception_rate", 1.0) + .build() + ) + ); + + assertAcked(client().admin().indices().prepareDelete(mountedIndex(index))); + awaitSnapshotPendingDeletion(snapshot); + + assertBusy(() -> { + final SnapshotsService snapshotsService = internalCluster().getCurrentMasterNodeInstance(SnapshotsService.class); + assertTrue(snapshotsService.isSnapshotPendingDeletionTriggered(snapshot)); + }); + + assertAcked( + clusterAdmin().prepareUpdateSettings() + .setTransientSettings( + Settings.builder().put(PENDING_SNAPSHOT_DELETIONS_EXPIRATION_INTERVAL_SETTING.getKey(), TimeValue.ZERO).build() + ) + ); + + assertBusy(() -> { + final SnapshotsService snapshotsService = internalCluster().getCurrentMasterNodeInstance(SnapshotsService.class); + assertFalse(snapshotsService.isSnapshotPendingDeletionTriggered(snapshot)); + }); + + awaitNoMoreSnapshotsDeletions(); + + } catch (Exception e) { + throw new AssertionError(e); + } finally { + assertAcked( + clusterAdmin().prepareUpdateSettings() + .setTransientSettings( + Settings.builder() + .putNull(PENDING_SNAPSHOT_DELETIONS_EXPIRATION_INTERVAL_SETTING.getKey()) + .putNull(SnapshotDeletionsPendingService.PENDING_SNAPSHOT_DELETIONS_RETRY_INTERVAL_SETTING.getKey()) + .build() + ) + ); + assertAcked( + clusterAdmin().preparePutRepository(repository) + .setVerify(false) + .setType("mock") + .setSettings( + Settings.builder() + .put(getRepositorySettings(repository).build()) + .put("random_control_io_exception_rate", 0.0) + .build() + ) + ); + } + + assertThat( + client().admin().cluster().prepareGetSnapshots(repository).setSnapshots(snapshot.getName()).get().getSnapshots(), + hasSize(1) + ); + }); + } + + private void mountIndexThenExecute(final TriConsumer test) throws Exception { + final String suffix = randomAlphaOfLength(5).toLowerCase(Locale.ROOT); + final String repository = "repository-" + suffix; + final Settings.Builder repositorySettings = randomRepositorySettings(); + createRepository(repository, "mock", repositorySettings); + + final String index = "index-" + suffix; + assertAcked(prepareCreate(index, Settings.builder().put(INDEX_SOFT_DELETES_SETTING.getKey(), true))); + ensureGreen(index); + populateIndex(index, scaledRandomIntBetween(10, 5_000)); + refresh(index); + + final TotalHits totalHits = internalCluster().client().prepareSearch(index).setTrackTotalHits(true).get().getHits().getTotalHits(); + final SnapshotId snapshotId = createSnapshot(repository, "snapshot-" + suffix, List.of(index)).snapshotId(); + + final String restored = mountedIndex(index); + mountSnapshot( + repository, + snapshotId.getName(), + index, + restored, + Settings.builder().put(SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION, true).build() + ); + assertHitCount(client().prepareSearch(restored).setTrackTotalHits(true).get(), totalHits.value); + test.apply(repository, snapshotId, index); + } + + private void blockSnapshotDeletionThenExecute(final TriConsumer test) throws Exception { + mountIndexThenExecute((repository, snapshot, index) -> { + try { + blockMasterOnWriteIndexFile(repository); + + assertAcked(client().admin().indices().prepareDelete(mountedIndex(index))); + awaitSnapshotPendingDeletion(snapshot); + test.apply(repository, snapshot, index); + + unblockNode(repository, internalCluster().getMasterName()); + awaitNoMoreSnapshotsDeletions(); + + expectThrows( + SnapshotMissingException.class, + () -> client().admin().cluster().prepareGetSnapshots(repository).setSnapshots(snapshot.getName()).get() + ); + } catch (Exception e) { + throw new AssertionError(e); + } + }); + } + + protected void awaitSnapshotPendingDeletion(final SnapshotId snapshotId) throws Exception { + logger.info("--> wait for snapshot [{}] to be show up as pending deletion in the cluster state", snapshotId); + awaitClusterState(state -> state.custom(SnapshotDeletionsPending.TYPE, SnapshotDeletionsPending.EMPTY).contains(snapshotId)); + } + + private void updateRepositoryReadOnly(String repository, boolean readOnly) { + logger.info("--> updating repository [{}] with read-only [{}]", repository, readOnly); + final Settings.Builder repositorySettings = getRepositorySettings(repository); + repositorySettings.put(READONLY_SETTING_KEY, readOnly); + assertAcked( + clusterAdmin().preparePutRepository(repository) + // randomization here is important to have registered repository with _na_ uuid + .setVerify(randomBoolean()) + .setType("mock") + .setSettings(repositorySettings) + ); + } + + private Settings.Builder getRepositorySettings(String repository) { + return Settings.builder().put(client().admin().cluster().prepareGetRepositories(repository).get().repositories().get(0).settings()); + } + + private String mountedIndex(String index) { + return "restored-" + index; + } + + private static void assertConflict(final SnapshotId snapshotId) throws Exception { + assertSnapshotPendingDeletionConflict(snapshotId, true); + } + + private static void assertNoConflict(final SnapshotId snapshotId) throws Exception { + assertSnapshotPendingDeletionConflict(snapshotId, false); + } + + private static void assertSnapshotPendingDeletionConflict(final SnapshotId snapshotId, final boolean expected) throws Exception { + assertBusy(() -> { + final SnapshotsService snapshotsService = internalCluster().getCurrentMasterNodeInstance(SnapshotsService.class); + assertThat(snapshotsService.isSnapshotPendingDeletionConflicting(snapshotId), equalTo(expected)); + }); + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRepositoryIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRepositoryIntegTests.java index 6dfb07e2db852..6d90363a87dc7 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRepositoryIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRepositoryIntegTests.java @@ -155,6 +155,7 @@ public void testMountIndexWithDeletionOfSnapshotFailsIfNotSingleIndexSnapshot() containsString("snapshot contains [" + nbIndices + "] indices instead of 1.") ) ); + awaitNoMoreSnapshotsDeletions(); } public void testMountIndexWithDifferentDeletionOfSnapshot() throws Exception { @@ -211,6 +212,7 @@ public void testMountIndexWithDifferentDeletionOfSnapshot() throws Exception { assertAcked(client().admin().indices().prepareDelete(mountedAgain)); assertAcked(client().admin().indices().prepareDelete(mounted)); + awaitNoMoreSnapshotsDeletions(); } public void testDeletionOfSnapshotSettingCannotBeUpdated() throws Exception { @@ -257,6 +259,7 @@ public void testDeletionOfSnapshotSettingCannotBeUpdated() throws Exception { ); assertAcked(client().admin().indices().prepareDelete(mounted)); + awaitNoMoreSnapshotsDeletions(); } public void testRestoreSearchableSnapshotIndexConflicts() throws Exception { @@ -271,15 +274,12 @@ public void testRestoreSearchableSnapshotIndexConflicts() throws Exception { assertAcked(client().admin().indices().prepareDelete(indexName)); final String mountedIndex = "mounted-index"; - final boolean deleteSnapshot = randomBoolean(); - final Settings indexSettings = deleteSnapshotIndexSettingsOrNull(deleteSnapshot); + final Settings indexSettings = deleteSnapshotIndexSettingsOrNull(false); logger.info("--> mounting snapshot of index [{}] as [{}] with index settings [{}]", indexName, mountedIndex, indexSettings); mountSnapshot(repository, snapshotOfIndex, indexName, mountedIndex, indexSettings, randomFrom(Storage.values())); assertThat( getDeleteSnapshotIndexSetting(mountedIndex), - indexSettings.hasValue(SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION) - ? equalTo(Boolean.toString(deleteSnapshot)) - : nullValue() + indexSettings.hasValue(SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION) ? equalTo("false") : nullValue() ); final String snapshotOfMountedIndex = "snapshot-of-mounted-index"; @@ -287,16 +287,10 @@ public void testRestoreSearchableSnapshotIndexConflicts() throws Exception { assertAcked(client().admin().indices().prepareDelete(mountedIndex)); final String mountedIndexAgain = "mounted-index-again"; - final boolean deleteSnapshotAgain = deleteSnapshot == false; - final Settings indexSettingsAgain = deleteSnapshotIndexSettings(deleteSnapshotAgain); + final Settings indexSettingsAgain = deleteSnapshotIndexSettings(true); logger.info("--> mounting snapshot of index [{}] again as [{}] with index settings [{}]", indexName, mountedIndex, indexSettings); mountSnapshot(repository, snapshotOfIndex, indexName, mountedIndexAgain, indexSettingsAgain, randomFrom(Storage.values())); - assertThat( - getDeleteSnapshotIndexSetting(mountedIndexAgain), - indexSettingsAgain.hasValue(SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION) - ? equalTo(Boolean.toString(deleteSnapshotAgain)) - : nullValue() - ); + assertThat(getDeleteSnapshotIndexSetting(mountedIndexAgain), equalTo("true")); logger.info("--> restoring snapshot of searchable snapshot index [{}] should be conflicting", mountedIndex); final SnapshotRestoreException exception = expectThrows( @@ -313,12 +307,13 @@ public void testRestoreSearchableSnapshotIndexConflicts() throws Exception { allOf( containsString("cannot mount snapshot [" + repository + '/'), containsString(':' + snapshotOfMountedIndex + "] as index [" + mountedIndex + "] with "), - containsString("[index.store.snapshot.delete_searchable_snapshot: " + deleteSnapshot + "]; another "), + containsString("[index.store.snapshot.delete_searchable_snapshot: false]; another "), containsString("index [" + mountedIndexAgain + '/'), - containsString("is mounted with [index.store.snapshot.delete_searchable_snapshot: " + deleteSnapshotAgain + "].") + containsString("is mounted with [index.store.snapshot.delete_searchable_snapshot: true].") ) ); assertAcked(client().admin().indices().prepareDelete("mounted-*")); + awaitNoMoreSnapshotsDeletions(); } public void testRestoreSearchableSnapshotIndexWithDifferentSettingsConflicts() throws Exception { @@ -400,6 +395,7 @@ public void testRestoreSearchableSnapshotIndexWithDifferentSettingsConflicts() t assertAcked(client().admin().indices().prepareDelete("mounted-*")); assertAcked(client().admin().indices().prepareDelete("restored-with-same-setting-*")); + awaitNoMoreSnapshotsDeletions(); } private static Settings deleteSnapshotIndexSettings(boolean value) { diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index 28cef6972562a..898853a4b2eb0 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -15,6 +15,7 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.SnapshotDeletionsPending; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; @@ -311,7 +312,8 @@ public List> getSettings() { BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING, BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_KEEP_ALIVE_SETTING, BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_BATCH_SIZE_SETTING, - BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_RETENTION_PERIOD + BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_RETENTION_PERIOD, + SnapshotDeletionsPending.MAX_PENDING_DELETIONS_SETTING ); }