diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java index 0134b798c72fd..1865bd58c7f4a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java @@ -19,13 +19,20 @@ package org.elasticsearch.cluster; +import com.carrotsearch.hppc.ObjectContainer; +import com.carrotsearch.hppc.cursors.ObjectCursor; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState.Custom; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.repositories.IndexId; import org.elasticsearch.snapshots.Snapshot; import java.io.IOException; @@ -48,7 +55,7 @@ public SnapshotDeletionsInProgress() { this(Collections.emptyList()); } - private SnapshotDeletionsInProgress(List entries) { + public SnapshotDeletionsInProgress(List entries) { this.entries = Collections.unmodifiableList(entries); } @@ -99,6 +106,16 @@ public boolean hasDeletionsInProgress() { return entries.isEmpty() == false; } + public Entry getEntry(final Snapshot snapshot) { + for (Entry entry : entries) { + final Snapshot curr = entry.getSnapshot(); + if (curr.equals(snapshot)) { + return entry; + } + } + return null; + } + @Override public String getWriteableName() { return TYPE; @@ -146,6 +163,32 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("snapshot", entry.snapshot.getSnapshotId().getName()); builder.humanReadableField("start_time_millis", "start_time", new TimeValue(entry.startTime)); builder.field("repository_state_id", entry.repositoryStateId); + builder.field("version", entry.version); + builder.startArray("indices_to_cleanup"); + { + for (IndexId index : entry.indicesToCleanup) { + index.toXContent(builder, params); + } + } + builder.endArray(); + builder.startArray("shards"); + { + for (ObjectObjectCursor, ShardSnapshotDeletionStatus> shardEntry : entry.shards) { + ShardId shardId = shardEntry.key.v1(); + String snapshotIndexId = shardEntry.key.v2(); + ShardSnapshotDeletionStatus status = shardEntry.value; + builder.startObject(); + { + builder.field("index", shardId.getIndex()); + builder.field("shard", shardId.getId()); + builder.field("snapshot_index_id", snapshotIndexId); + builder.field("state", status.state()); + builder.field("node", status.nodeId()); + } + builder.endObject(); + } + } + builder.endArray(); } builder.endObject(); } @@ -172,17 +215,57 @@ public static final class Entry implements Writeable { private final Snapshot snapshot; private final long startTime; private final long repositoryStateId; + private final int version; + private final List indicesToCleanup; + private final ImmutableOpenMap, ShardSnapshotDeletionStatus> shards; public Entry(Snapshot snapshot, long startTime, long repositoryStateId) { + this(snapshot, startTime, repositoryStateId, 0, null, null); + } + + public Entry(final Snapshot snapshot, + final long startTime, + final long repositoryStateId, + final int version, + final List indicesToCleanup, + final ImmutableOpenMap, ShardSnapshotDeletionStatus> shards) { this.snapshot = snapshot; this.startTime = startTime; this.repositoryStateId = repositoryStateId; + this.version = version; + this.indicesToCleanup = indicesToCleanup == null ? Collections.emptyList() : indicesToCleanup; + this.shards = shards == null ? ImmutableOpenMap.of() : shards; } public Entry(StreamInput in) throws IOException { this.snapshot = new Snapshot(in); this.startTime = in.readVLong(); this.repositoryStateId = in.readLong(); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + this.version = in.readInt(); + int indices = in.readVInt(); + List indexBuilder = new ArrayList<>(); + for (int i = 0; i < indices; i++) { + indexBuilder.add(new IndexId(in.readString(), in.readString())); + } + this.indicesToCleanup = indexBuilder; + ImmutableOpenMap.Builder, ShardSnapshotDeletionStatus> builder = ImmutableOpenMap.builder(); + int shards = in.readVInt(); + for (int i = 0; i < shards; i++) { + ShardId shardId = ShardId.readShardId(in); + String indexId = in.readString(); + builder.put(new Tuple<>(shardId, indexId), new ShardSnapshotDeletionStatus(in)); + } + this.shards = builder.build(); + } else { + this.version = 0; + this.indicesToCleanup = Collections.emptyList(); + this.shards = ImmutableOpenMap.of(); + } + } + + public Entry(Entry entry, ImmutableOpenMap, ShardSnapshotDeletionStatus> shards) { + this(entry.snapshot, entry.startTime, entry.repositoryStateId, entry.version, entry.indicesToCleanup, shards); } /** @@ -199,6 +282,10 @@ public long getStartTime() { return startTime; } + public int getVersion() { + return version; + } + /** * The repository state id at the time the snapshot deletion began. */ @@ -206,6 +293,14 @@ public long getRepositoryStateId() { return repositoryStateId; } + public List getIndicesToCleanup() { + return indicesToCleanup; + } + + public ImmutableOpenMap, ShardSnapshotDeletionStatus> getShards() { + return shards; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -217,12 +312,15 @@ public boolean equals(Object o) { Entry that = (Entry) o; return snapshot.equals(that.snapshot) && startTime == that.startTime - && repositoryStateId == that.repositoryStateId; + && repositoryStateId == that.repositoryStateId + && version == that.version + && indicesToCleanup.equals(that.indicesToCleanup) + && shards.equals(that.shards); } @Override public int hashCode() { - return Objects.hash(snapshot, startTime, repositoryStateId); + return Objects.hash(snapshot, startTime, repositoryStateId, version, indicesToCleanup, shards); } @Override @@ -230,6 +328,150 @@ public void writeTo(StreamOutput out) throws IOException { snapshot.writeTo(out); out.writeVLong(startTime); out.writeLong(repositoryStateId); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeInt(version); + out.writeVInt(indicesToCleanup.size()); + for (IndexId index : indicesToCleanup) { + index.writeTo(out); + } + out.writeVInt(shards.size()); + for (ObjectObjectCursor, ShardSnapshotDeletionStatus> shardEntry : shards) { + shardEntry.key.v1().writeTo(out); + out.writeString(shardEntry.key.v2()); + shardEntry.value.writeTo(out); + } + } } } + + /** + * Checks if all shard deletions in the list have completed + * + * @param shards list of shard deletion statuses + * @return true if all shard deletions have completed (either successfully or failed), false otherwise + */ + public static boolean completed(ObjectContainer shards) { + for (ObjectCursor status : shards) { + if (status.value.state().completed() == false) { + return false; + } + } + return true; + } + + public static class ShardSnapshotDeletionStatus { + private final State state; + private final String nodeId; + private final String reason; + + public ShardSnapshotDeletionStatus(String nodeId, State state) { + this(nodeId, state, null); + } + + public ShardSnapshotDeletionStatus(String nodeId, State state, String reason) { + this.nodeId = nodeId; + this.state = state; + this.reason = reason; + // If the state is failed we have to have a reason for this failure + assert state.failed() == false || reason != null; + } + + public ShardSnapshotDeletionStatus(StreamInput in) throws IOException { + nodeId = in.readOptionalString(); + state = State.fromValue(in.readByte()); + reason = in.readOptionalString(); + } + + public State state() { + return state; + } + + public String nodeId() { + return nodeId; + } + + public String reason() { + return reason; + } + + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalString(nodeId); + out.writeByte(state.value); + out.writeOptionalString(reason); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ShardSnapshotDeletionStatus status = (ShardSnapshotDeletionStatus) o; + + if (nodeId != null ? !nodeId.equals(status.nodeId) : status.nodeId != null) return false; + if (reason != null ? !reason.equals(status.reason) : status.reason != null) return false; + if (state != status.state) return false; + + return true; + } + + @Override + public int hashCode() { + int result = state != null ? state.hashCode() : 0; + result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0); + result = 31 * result + (reason != null ? reason.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return "ShardSnapshotDeletionStatus[state=" + state + ", nodeId=" + nodeId + ", reason=" + reason + "]"; + } + } + + public enum State { + INIT((byte) 0, false, false), + STARTED((byte) 1, false, false), + SUCCESS((byte) 2, true, false), + FAILED((byte) 3, true, true); + + private byte value; + + private boolean completed; + + private boolean failed; + + State(byte value, boolean completed, boolean failed) { + this.value = value; + this.completed = completed; + this.failed = failed; + } + + public byte value() { + return value; + } + + public boolean completed() { + return completed; + } + + public boolean failed() { + return failed; + } + + public static State fromValue(byte value) { + switch (value) { + case 0: + return INIT; + case 1: + return STARTED; + case 2: + return SUCCESS; + case 3: + return FAILED; + default: + throw new IllegalArgumentException("No snapshot deletion state for value [" + value + "]"); + } + } + } + } diff --git a/server/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotDeletionStatus.java b/server/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotDeletionStatus.java new file mode 100644 index 0000000000000..b9efd6b16afc3 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotDeletionStatus.java @@ -0,0 +1,158 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.snapshots; + +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Represent shard snapshot deletion status + */ +public class IndexShardSnapshotDeletionStatus { + + /** + * Snapshot Deletion Stage + */ + public enum Stage { + /** + * Shard snapshot deletion hasn't started yet + */ + INIT, + /** + * Shard snapshot deletion in progress + */ + STARTED, + /** + * Shard snapshot deletion completed successfully + */ + DONE, + /** + * Shard snapshot deletion failed + */ + FAILURE + } + + private final AtomicReference stage; + private final int version; + private long startTime; + private long totalTime; + private String failure; + + private IndexShardSnapshotDeletionStatus(final Stage stage, final int version, final long startTime, + final long totalTime, final String failure) { + this.stage = new AtomicReference<>(Objects.requireNonNull(stage)); + this.version = version; + this.startTime = startTime; + this.totalTime = totalTime; + this.failure = failure; + } + + public synchronized void moveToStarted(final long startTime) { + if (stage.compareAndSet(Stage.INIT, Stage.STARTED)) { + this.startTime = startTime; + } else { + throw new IllegalStateException("Unable to move the shard snapshot deletion status to [STARTED]: " + + "expecting [INIT] but got [" + stage.get() + "]"); + } + } + + public synchronized void moveToDone(final long endTime) { + if (stage.compareAndSet(Stage.STARTED, Stage.DONE)) { + this.totalTime = Math.max(0L, endTime - startTime); + } else { + throw new IllegalStateException("Unable to move the shard snapshot deletion status to [DONE]: " + + "expecting [STARTED] but got [" + stage.get() + "]"); + } + } + + public synchronized void moveToFailed(final long endTime, final String failure) { + if (stage.getAndSet(Stage.FAILURE) != Stage.FAILURE) { + this.totalTime = Math.max(0L, endTime - startTime); + this.failure = failure; + } + } + + public boolean isFailed() { + return stage.get() == Stage.FAILURE; + } + + public int getVersion() { + return version; + } + + public synchronized IndexShardSnapshotDeletionStatus.Copy asCopy() { + return new IndexShardSnapshotDeletionStatus.Copy(stage.get(), version, startTime, totalTime, failure); + } + + public static IndexShardSnapshotDeletionStatus newInitializing(final int version) { + return new IndexShardSnapshotDeletionStatus(Stage.INIT, version, 0L, 0L, null); + } + + public static class Copy { + + private final Stage stage; + private final int version; + private final long startTime; + private final long totalTime; + private final String failure; + + public Copy(final Stage stage, final int version, final long startTime, final long totalTime, + final String failure) { + this.stage = stage; + this.version = version; + this.startTime = startTime; + this.totalTime = totalTime; + this.failure = failure; + } + + public Stage getStage() { + return stage; + } + + public int getVersion() { + return version; + } + + public long getStartTime() { + return startTime; + } + + public long getTotalTime() { + return totalTime; + } + + public String getFailure() { + return failure; + } + + @Override + public String toString() { + return "index shard snapshot deletion status (" + + "stage=" + stage + + ", version=" + version + + ", startTime=" + startTime + + ", totalTime=" + totalTime + + ", failure='" + failure + '\'' + + ')'; + } + + } + +} diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index 4e8e9b6c7f569..1cef4cd25bc99 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.index.shard.IndexShard; @@ -37,6 +38,7 @@ import java.io.IOException; import java.util.List; +import java.util.Set; public class FilterRepository implements Repository { @@ -135,6 +137,42 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Ve return in.getShardSnapshotStatus(snapshotId, version, indexId, shardId); } + @Override + public SnapshotInfo getSnapshotInfoOrNull(SnapshotId snapshotId) { + return in.getSnapshotInfoOrNull(snapshotId); + } + + @Override + public RepositoryData initiateSnapshotDelete(SnapshotId snapshotId, SnapshotInfo snapshotInfo, long repositoryStateId) { + return in.initiateSnapshotDelete(snapshotId, snapshotInfo, repositoryStateId); + } + + @Override + public List> getShardsToDeleteForSnapshot(SnapshotId snapshotId, SnapshotInfo snapshotInfo, + RepositoryData repositoryData) { + return in.getShardsToDeleteForSnapshot(snapshotId, snapshotInfo, repositoryData); + } + + @Override + public void deleteIndicesMetaData(SnapshotInfo snapshot, Set indexIds) { + in.deleteIndicesMetaData(snapshot, indexIds); + } + + @Override + public void deleteShard(SnapshotId snapshotId, int version, String indexId, ShardId shardId) { + in.deleteShard(snapshotId, version, indexId, shardId); + } + + @Override + public void cleanUpIndices(List indicesToCleanUp) { + in.cleanUpIndices(indicesToCleanUp); + } + + @Override + public boolean isDistributedSnapshotDeletionEnabled() { + return in.isDistributedSnapshotDeletionEnabled(); + } + @Override public Lifecycle.State lifecycleState() { return in.lifecycleState(); diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index 1ca6f5e148510..5780ac6d9978a 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; @@ -36,6 +37,7 @@ import java.io.IOException; import java.util.List; +import java.util.Set; import java.util.function.Function; /** @@ -230,5 +232,80 @@ void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, Inde */ IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId); + /** + * Retrieve snapshot info for the provided snapshot id + * + * @param snapshotId snapshot id + * @return snapshot info + */ + default SnapshotInfo getSnapshotInfoOrNull(SnapshotId snapshotId) { + throw new RuntimeException("Operation not supported for this repository"); + } + + /** + * Initiate snapshot delete + * + * @param snapshotId snapshot id + * @param snapshotInfo snapshot info + * @param repositoryStateId repository state id + * @return repository data + */ + default RepositoryData initiateSnapshotDelete(SnapshotId snapshotId, SnapshotInfo snapshotInfo, + long repositoryStateId) { + throw new RuntimeException("Operation not supported for this repository"); + } + + /** + * Retrieve shards to delete for the provided snapshot id + * + * @param snapshotId snapshot id + * @param snapshotInfo snapshot info + * @param repositoryData repository data + * @return list of the shard ids with their snapshot index id that are part of this snapshot + */ + default List> getShardsToDeleteForSnapshot(SnapshotId snapshotId, SnapshotInfo snapshotInfo, + RepositoryData repositoryData) { + throw new RuntimeException("Operation not supported for this repository"); + } + + /** + * Delete indices meta data for the given set of index ids + * + * @param snapshot snapshot info + * @param indexIds Set of index ids who index meta data need to be deleted + */ + default void deleteIndicesMetaData(SnapshotInfo snapshot, Set indexIds) { + throw new RuntimeException("Operation not supported for this repository"); + } + + /** + * Delete shard snapshot + * + * @param snapshotId snapshot id + * @param version version of elasticsearch that created this snapshot + * @param indexId the snapshotted index id + * @param shardId shard id + */ + default void deleteShard(SnapshotId snapshotId, int version, String indexId, ShardId shardId) { + throw new RuntimeException("Operation not supported for this repository"); + } + + /** + * Clean up repository indices that are no longer part of any snapshots. + * + * @param indicesToCleanUp list of indices to be cleaned up + */ + default void cleanUpIndices(List indicesToCleanUp) { + throw new RuntimeException("Operation not supported for this repository"); + } + + /** + * Indicates if late assignment of node to snapshot a shard is enabled. + * + * @return true if enabled, false otherwise. + */ + default boolean isDistributedSnapshotDeletionEnabled() { + return false; + } } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 4fee2fad41600..17e82e313186e 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -450,7 +450,7 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, index), ex); } - deleteIndexMetaDataBlobIgnoringErrors(snapshot, indexId); + deleteIndexMetaDataBlobIgnoringErrors(snapshot, indexId.getId()); if (indexMetaData != null) { for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) { @@ -491,6 +491,106 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { } } + @Override + public SnapshotInfo getSnapshotInfoOrNull(SnapshotId snapshotId) { + SnapshotInfo snapshot = null; + try { + snapshot = getSnapshotInfo(snapshotId); + } catch (SnapshotMissingException ex) { + throw ex; + } catch (IllegalStateException | SnapshotException | ElasticsearchParseException ex) { + logger.warn(() -> new ParameterizedMessage("cannot read snapshot file [{}]", snapshotId), ex); + } + return snapshot; + } + + @Override + public RepositoryData initiateSnapshotDelete(SnapshotId snapshotId, SnapshotInfo snapshotInfo, long repositoryStateId) { + if (isReadOnly()) { + throw new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository"); + } + try { + // Delete snapshot from the index file, since it is the maintainer of truth of active snapshots + final RepositoryData updatedRepositoryData = getRepositoryData().removeSnapshot(snapshotId); + writeIndexGen(updatedRepositoryData, repositoryStateId); + + // delete the snapshot file + deleteSnapshotBlobIgnoringErrors(snapshotInfo, snapshotId.getUUID()); + // delete the global metadata file + deleteGlobalMetaDataBlobIgnoringErrors(snapshotInfo, snapshotId.getUUID()); + + return updatedRepositoryData; + } catch (IOException | ResourceNotFoundException ex) { + throw new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex); + } + } + + @Override + public List> getShardsToDeleteForSnapshot(SnapshotId snapshotId, SnapshotInfo snapshot, + RepositoryData repositoryData) { + List> shards = new ArrayList<>(); + if (snapshot != null) { + final List indices = snapshot.indices(); + for (String index : indices) { + final IndexId indexId = repositoryData.resolveIndexId(index); + + IndexMetaData indexMetaData = null; + try { + logger.info("Getting index metadata {}", indexId.getId()); + indexMetaData = getSnapshotIndexMetaData(snapshotId, indexId); + } catch (ElasticsearchParseException | IOException ex) { + logger.warn(() -> + new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, index), ex); + } + if (indexMetaData != null) { + for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) { + shards.add(new Tuple<>(new ShardId(indexMetaData.getIndex(), shardId), indexId.getId())); + } + } + } + } + return shards; + } + + @Override + public void deleteIndicesMetaData(SnapshotInfo snapshot, Set indexIds) { + if (isReadOnly()) { + throw new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository"); + } + if (snapshot != null) { + indexIds.stream().forEach(indexId -> { + logger.info("Deleting index metadata {}", indexId); + deleteIndexMetaDataBlobIgnoringErrors(snapshot, indexId); + }); + } + } + + @Override + public void cleanUpIndices(List indicesToCleanUp) { + final BlobContainer indicesBlobContainer = blobStore().blobContainer(basePath().add("indices")); + for (final IndexId indexId : indicesToCleanUp) { + try { + indicesBlobContainer.deleteBlob(indexId.getId()); + } catch (DirectoryNotEmptyException dnee) { + // if the directory isn't empty for some reason, it will fail to clean up; + // we'll ignore that and accept that cleanup didn't fully succeed. + // since we are using UUIDs for path names, this won't be an issue for + // snapshotting indices of the same name + logger.warn(() -> new ParameterizedMessage("[{}] index [{}] no longer part of any snapshots in the repository, but " + + "failed to clean up its index folder due to the directory not being empty.", metadata.name(), indexId), dnee); + } catch (IOException ioe) { + // a different IOException occurred while trying to delete - will just log the issue for now + logger.warn(() -> new ParameterizedMessage("[{}] index [{}] no longer part of any snapshots in the repository, but " + + "failed to clean up its index folder.", metadata.name(), indexId), ioe); + } + } + } + + @Override + public boolean isDistributedSnapshotDeletionEnabled() { + return true; + } + private void deleteSnapshotBlobIgnoringErrors(final SnapshotInfo snapshotInfo, final String blobId) { try { snapshotFormat.delete(blobContainer(), blobId); @@ -517,14 +617,14 @@ private void deleteGlobalMetaDataBlobIgnoringErrors(final SnapshotInfo snapshotI } } - private void deleteIndexMetaDataBlobIgnoringErrors(final SnapshotInfo snapshotInfo, final IndexId indexId) { + private void deleteIndexMetaDataBlobIgnoringErrors(final SnapshotInfo snapshotInfo, final String indexId) { final SnapshotId snapshotId = snapshotInfo.snapshotId(); - BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(basePath().add("indices").add(indexId.getId())); + BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(basePath().add("indices").add(indexId)); try { indexMetaDataFormat.delete(indexMetaDataBlobContainer, snapshotId.getUUID()); } catch (IOException ex) { logger.warn(() -> new ParameterizedMessage("[{}] failed to delete metadata for index [{}]", - snapshotId, indexId.getName()), ex); + snapshotId, indexId), ex); } } @@ -916,6 +1016,18 @@ public void verify(String seed, DiscoveryNode localNode) { } } + @Override + public void deleteShard(final SnapshotId snapshotId, + final int version, + final String indexId, + final ShardId shardId) { + if (isReadOnly()) { + throw new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository"); + } + Context context = new Context(snapshotId, indexId, shardId, shardId); + context.delete(); + } + /** * Delete shard snapshot * @@ -951,9 +1063,13 @@ private class Context { } Context(SnapshotId snapshotId, IndexId indexId, ShardId shardId, ShardId snapshotShardId) { + this(snapshotId, indexId.getId(), shardId, snapshotShardId); + } + + Context(SnapshotId snapshotId, String indexId, ShardId shardId, ShardId snapshotShardId) { this.snapshotId = snapshotId; this.shardId = shardId; - blobContainer = blobStore().blobContainer(basePath().add("indices").add(indexId.getId()) + blobContainer = blobStore().blobContainer(basePath().add("indices").add(indexId) .add(Integer.toString(snapshotShardId.getId()))); } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index fbb0a876e8f29..8fbc890438da4 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -37,6 +37,8 @@ import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ClusterStateTaskListener; +import org.elasticsearch.cluster.SnapshotDeletionsInProgress; +import org.elasticsearch.cluster.SnapshotDeletionsInProgress.ShardSnapshotDeletionStatus; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; import org.elasticsearch.cluster.SnapshotsInProgress.State; @@ -47,6 +49,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; @@ -60,6 +63,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.IndexShardSnapshotDeletionStatus; import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus.Stage; @@ -94,6 +98,8 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements private static final Logger logger = LogManager.getLogger(SnapshotShardsService.class); private static final String UPDATE_SNAPSHOT_STATUS_ACTION_NAME = "internal:cluster/snapshot/update_snapshot_status"; + private static final String UPDATE_SHARD_SNAPSHOT_DELETION_STATUS_ACTION_NAME = + "internal:cluster/snapshot/update_shard_snapshot_deletion_status"; private final ClusterService clusterService; @@ -107,13 +113,23 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements private final Map> shardSnapshots = new HashMap<>(); + private final Map, IndexShardSnapshotDeletionStatus>> + shardSnapshotDeletions = new HashMap<>(); + // A map of snapshots to the shardIds that we already reported to the master as failed private final TransportRequestDeduplicator remoteFailedRequestDeduplicator = new TransportRequestDeduplicator<>(); + private final TransportRequestDeduplicator + remoteFailedDeletionStatusRequestDeduplicator = new TransportRequestDeduplicator<>(); + private final SnapshotStateExecutor snapshotStateExecutor = new SnapshotStateExecutor(); private final UpdateSnapshotStatusAction updateSnapshotStatusHandler; + private final UpdateShardSnapshotDeletionStatusExecutor deletionStatusExecutor = + new UpdateShardSnapshotDeletionStatusExecutor(); + private final UpdateShardSnapshotDeletionStatusAction updateShardSnapshotDeletionStatusHandler; + @Inject public SnapshotShardsService(Settings settings, ClusterService clusterService, SnapshotsService snapshotsService, ThreadPool threadPool, TransportService transportService, IndicesService indicesService, @@ -131,12 +147,17 @@ public SnapshotShardsService(Settings settings, ClusterService clusterService, S // The constructor of UpdateSnapshotStatusAction will register itself to the TransportService. this.updateSnapshotStatusHandler = new UpdateSnapshotStatusAction(transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver); + this.updateShardSnapshotDeletionStatusHandler = + new UpdateShardSnapshotDeletionStatusAction(transportService, clusterService, threadPool, actionFilters, + indexNameExpressionResolver); } @Override protected void doStart() { assert this.updateSnapshotStatusHandler != null; assert transportService.getRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME) != null; + assert this.updateShardSnapshotDeletionStatusHandler != null; + assert transportService.getRequestHandler(UPDATE_SHARD_SNAPSHOT_DELETION_STATUS_ACTION_NAME) != null; } @Override @@ -160,10 +181,21 @@ public void clusterChanged(ClusterChangedEvent event) { } } + SnapshotDeletionsInProgress previousDeletions = event.previousState().custom(SnapshotDeletionsInProgress.TYPE); + SnapshotDeletionsInProgress currentDeletions = event.state().custom(SnapshotDeletionsInProgress.TYPE); + + if ((previousDeletions == null && currentDeletions != null) + || (previousDeletions != null && previousDeletions.equals(currentDeletions) == false)) { + synchronized (shardSnapshotDeletions) { + processIndexShardSnapshotDeletions(currentDeletions); + } + } + String previousMasterNodeId = event.previousState().nodes().getMasterNodeId(); String currentMasterNodeId = event.state().nodes().getMasterNodeId(); if (currentMasterNodeId != null && currentMasterNodeId.equals(previousMasterNodeId) == false) { syncShardStatsOnNewMaster(event); + syncShardDeletionStatsOnNewMaster(event); } } catch (Exception e) { @@ -202,6 +234,22 @@ public Map currentSnapshotShards(Snapshot sna } } + /** + * Returns status of shards snapshot deletions on this node and belong to the given snapshot + *

+ * This method is executed on data node + *

+ * + * @param snapshot snapshot + * @return map of shard id to snapshot deletion status + */ + public Map, IndexShardSnapshotDeletionStatus> currentShardSnapshotDeletions(Snapshot snapshot) { + synchronized (shardSnapshotDeletions) { + final Map, IndexShardSnapshotDeletionStatus> current = shardSnapshotDeletions.get(snapshot); + return current == null ? null : new HashMap<>(current); + } + } + /** * Checks if any new shards should be snapshotted on this node * @@ -419,6 +467,148 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) { } } + /** + * Checks if any snapshot shards deletion were performed that the new master doesn't know about + */ + private void syncShardDeletionStatsOnNewMaster(ClusterChangedEvent event) { + SnapshotDeletionsInProgress deletionsInProgress = event.state().custom(SnapshotDeletionsInProgress.TYPE); + if (deletionsInProgress == null) { + return; + } + + final String localNodeId = event.state().nodes().getLocalNodeId(); + for (SnapshotDeletionsInProgress.Entry snapshot : deletionsInProgress.getEntries()) { + Map, IndexShardSnapshotDeletionStatus> localShards = + currentShardSnapshotDeletions(snapshot.getSnapshot()); + if (localShards != null) { + ImmutableOpenMap, ShardSnapshotDeletionStatus> masterShards = snapshot.getShards(); + for(Map.Entry, IndexShardSnapshotDeletionStatus> localShard : localShards.entrySet()) { + Tuple shardId = localShard.getKey(); + IndexShardSnapshotDeletionStatus localShardStatus = localShard.getValue(); + ShardSnapshotDeletionStatus masterShard = masterShards.get(shardId); + if (masterShard != null && masterShard.state().completed() == false) { + final IndexShardSnapshotDeletionStatus.Copy indexShardSnapshotDeletionStatus = localShardStatus.asCopy(); + final IndexShardSnapshotDeletionStatus.Stage stage = indexShardSnapshotDeletionStatus.getStage(); + // Master knows about the shard snapshot deletion and thinks it has not completed + if (stage == IndexShardSnapshotDeletionStatus.Stage.DONE) { + // but we think the shard snapshot deletion is done - we need to make new master know that it is done + logger.info("[{}] new master thinks the shard snapshot deletion [{}] is not completed " + + "but the shard snapshot deletion succeeded locally, updating status on the master", + snapshot.getSnapshot(), shardId); + sendShardSnapshotDeletionSuccessUpdate(snapshot.getSnapshot(), shardId, localNodeId); + + } else if (stage == IndexShardSnapshotDeletionStatus.Stage.FAILURE) { + // but we think the shard snapshot deletion is failed - we need to make new master know that it is failed + logger.debug("[{}] new master thinks the shard snapshot deletion [{}] is not completed " + + "but the shard snapshot deletion failed locally, updating status on master", + snapshot.getSnapshot(), shardId); + final String failure = indexShardSnapshotDeletionStatus.getFailure(); + sendShardSnapshotDeletionFailureUpdate(snapshot.getSnapshot(), shardId, localNodeId, failure); + } + } + } + } + } + } + + private void cancelRemovedSnapshotDeletions(SnapshotDeletionsInProgress deletionsInProgress) { + Iterator, IndexShardSnapshotDeletionStatus>>> it + = shardSnapshotDeletions.entrySet().iterator(); + while (it.hasNext()) { + final Map.Entry, IndexShardSnapshotDeletionStatus>> entry = it.next(); + final Snapshot snapshot = entry.getKey(); + if (deletionsInProgress == null || deletionsInProgress.getEntry(snapshot) == null) { + it.remove(); + } + } + } + + private void startNewSnapshotDeletions(SnapshotDeletionsInProgress deletionsInProgress) { + // Now go through all snapshot deletions and update existing or create missing + final String localNodeId = clusterService.localNode().getId(); + for (SnapshotDeletionsInProgress.Entry entry : deletionsInProgress.getEntries()) { + Map, IndexShardSnapshotDeletionStatus> startedShardDeletions = new HashMap<>(); + Map, IndexShardSnapshotDeletionStatus> deletions + = shardSnapshotDeletions.getOrDefault(entry.getSnapshot(), emptyMap()); + for (ObjectObjectCursor, ShardSnapshotDeletionStatus> shard : entry.getShards()) { + final ShardSnapshotDeletionStatus deletionStatus = shard.value; + if (localNodeId.equals(shard.value.nodeId()) && deletionStatus.state() == SnapshotDeletionsInProgress.State.INIT + && deletions.containsKey(shard.key) == false) { + // Local node Id is assigned to delete the shard snapshot. + if (startedShardDeletions == null) { + startedShardDeletions = new HashMap<>(); + } + startedShardDeletions.put(shard.key, IndexShardSnapshotDeletionStatus.newInitializing(entry.getVersion())); + } + } + if (startedShardDeletions != null && startedShardDeletions.isEmpty() == false) { + shardSnapshotDeletions.computeIfAbsent(entry.getSnapshot(), s -> new HashMap<>()).putAll(startedShardDeletions); + startNewSnapshotShardDeletions(entry, startedShardDeletions); + } + } + } + + /** + * Checks if any new shard snapshots to be deleted on this node + */ + private void processIndexShardSnapshotDeletions(SnapshotDeletionsInProgress deletionsInProgress) { + cancelRemovedSnapshotDeletions(deletionsInProgress); + if (deletionsInProgress != null) { + startNewSnapshotDeletions(deletionsInProgress); + } + } + + private void startNewSnapshotShardDeletions(SnapshotDeletionsInProgress.Entry snapshotEntry, + Map, IndexShardSnapshotDeletionStatus> snapshotDeletions) { + final String localNodeId = clusterService.localNode().getId(); + final Snapshot snapshot = snapshotEntry.getSnapshot(); + final Repository repository = snapshotsService.getRepositoriesService().repository(snapshot.getRepository()); + final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + for (final Map.Entry, IndexShardSnapshotDeletionStatus> entry : snapshotDeletions.entrySet()) { + final Tuple shardId = entry.getKey(); + final IndexShardSnapshotDeletionStatus deletionStatus = entry.getValue(); + + executor.execute(new AbstractRunnable() { + + final SetOnce failure = new SetOnce<>(); + + @Override + public void doRun() { + deletionStatus.moveToStarted(System.currentTimeMillis()); + repository.deleteShard(snapshot.getSnapshotId(), deletionStatus.getVersion(), shardId.v2(), shardId.v1()); + deletionStatus.moveToDone(System.currentTimeMillis()); + } + + @Override + public void onFailure(Exception e) { + logger.warn(() -> + new ParameterizedMessage("[{}][{}] failed to delete shard snapshot", shardId, snapshot), e); + failure.set(e); + } + + @Override + public void onRejection(Exception e) { + failure.set(e); + } + + @Override + public void onAfter() { + final Exception exception = failure.get(); + if (exception != null) { + final String failure = ExceptionsHelper.detailedMessage(exception); + if (!entry.getValue().isFailed()) { + // The status is not yet moved to failed, move it before notifying the master + entry.getValue().moveToFailed(System.currentTimeMillis(), failure); + } + sendShardSnapshotDeletionFailureUpdate(snapshot, shardId, localNodeId, failure); + } else { + sendShardSnapshotDeletionSuccessUpdate(snapshot, shardId, localNodeId); + } + } + }); + } + } + /** * Internal request that is used to send changes in snapshot status to master */ @@ -643,4 +833,226 @@ protected ClusterBlockException checkBlock(UpdateIndexShardSnapshotStatusRequest } } + private void sendShardSnapshotDeletionSuccessUpdate(final Snapshot snapshot, + final Tuple shardId, + final String localNodeId) { + sendShardSnapshotDeletionUpdate(snapshot, shardId, new ShardSnapshotDeletionStatus(localNodeId, + SnapshotDeletionsInProgress.State.SUCCESS)); + } + + private void sendShardSnapshotDeletionFailureUpdate(final Snapshot snapshot, + final Tuple shardId, + final String localNodeId, + final String failure) { + sendShardSnapshotDeletionUpdate(snapshot, shardId, new ShardSnapshotDeletionStatus(localNodeId, + SnapshotDeletionsInProgress.State.FAILED, failure)); + } + + private void sendShardSnapshotDeletionUpdate(final Snapshot snapshot, + final Tuple shardId, + final ShardSnapshotDeletionStatus status) { + remoteFailedDeletionStatusRequestDeduplicator.executeOnce( + new UpdateShardSnapshotDeletionStatusRequest(snapshot, shardId, status), + new ActionListener() { + @Override + public void onResponse(Void aVoid) { + logger.trace("[{}] [{}] updated snapshot deletion state", snapshot, status); + } + + @Override + public void onFailure(Exception e) { + logger.warn( + () -> new ParameterizedMessage("[{}] [{}] failed to update snapshot deletion state", snapshot, status), e); + } + }, + (req, reqListener) -> transportService.sendRequest(transportService.getLocalNode(), + UPDATE_SHARD_SNAPSHOT_DELETION_STATUS_ACTION_NAME, req, + new TransportResponseHandler() { + @Override + public UpdateShardSnapshotDeletionStatusResponse read(StreamInput in) throws IOException { + final UpdateShardSnapshotDeletionStatusResponse response = new UpdateShardSnapshotDeletionStatusResponse(); + response.readFrom(in); + return response; + } + + @Override + public void handleResponse(UpdateShardSnapshotDeletionStatusResponse response) { + reqListener.onResponse(null); + } + + @Override + public void handleException(TransportException exp) { + reqListener.onFailure(exp); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }) + ); + } + + private static class UpdateShardSnapshotDeletionStatusRequest + extends MasterNodeRequest { + + private Snapshot snapshot; + private Tuple shardId; + private ShardSnapshotDeletionStatus status; + + UpdateShardSnapshotDeletionStatusRequest() { + } + + UpdateShardSnapshotDeletionStatusRequest(final Snapshot snapshot, + final Tuple shardId, + final ShardSnapshotDeletionStatus status) { + this.snapshot = snapshot; + this.shardId = shardId; + this.status = status; + // By default, we keep trying to post ShardSnapshot status messages to avoid ShardSnapshot processes getting stuck. + this.masterNodeTimeout = TimeValue.timeValueNanos(Long.MAX_VALUE); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(final StreamInput in) throws IOException { + super.readFrom(in); + snapshot = new Snapshot(in); + shardId = new Tuple<>(ShardId.readShardId(in), in.readString()); + status = new ShardSnapshotDeletionStatus(in); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + snapshot.writeTo(out); + shardId.v1().writeTo(out); + out.writeString(shardId.v2()); + status.writeTo(out); + } + + Snapshot shardSnapshot() { + return snapshot; + } + + Tuple shardId() { + return shardId; + } + + ShardSnapshotDeletionStatus status() { + return status; + } + + @Override + public String toString() { + return snapshot + ", shardId [" + shardId + "], status [" + status.state() + "]"; + } + } + + private static class UpdateShardSnapshotDeletionStatusResponse extends ActionResponse { + } + + private class UpdateShardSnapshotDeletionStatusAction + extends TransportMasterNodeAction { + + UpdateShardSnapshotDeletionStatusAction(final TransportService transportService, + final ClusterService clusterService, + final ThreadPool threadPool, + final ActionFilters actionFilters, + final IndexNameExpressionResolver indexNameExpressionResolver) { + super(UPDATE_SHARD_SNAPSHOT_DELETION_STATUS_ACTION_NAME, transportService, clusterService, threadPool, + actionFilters, indexNameExpressionResolver, UpdateShardSnapshotDeletionStatusRequest::new); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected UpdateShardSnapshotDeletionStatusResponse newResponse() { + return new UpdateShardSnapshotDeletionStatusResponse(); + } + + @Override + protected void masterOperation(final UpdateShardSnapshotDeletionStatusRequest request, + final ClusterState state, + final ActionListener listener) + throws Exception { + clusterService.submitStateUpdateTask( + "update ShardSnapshot deletion state", + request, + ClusterStateTaskConfig.build(Priority.NORMAL), + deletionStatusExecutor, + new ClusterStateTaskListener() { + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(new UpdateShardSnapshotDeletionStatusResponse()); + } + }); + } + + @Override + protected ClusterBlockException checkBlock(final UpdateShardSnapshotDeletionStatusRequest request, + final ClusterState state) { + return null; + } + } + + private class UpdateShardSnapshotDeletionStatusExecutor implements + ClusterStateTaskExecutor { + + @Override + public ClusterTasksResult execute( + ClusterState currentState, List tasks) throws Exception { + final SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); + if (deletionsInProgress != null) { + int changedCount = 0; + final List entries = new ArrayList<>(); + for (SnapshotDeletionsInProgress.Entry entry : deletionsInProgress.getEntries()) { + ImmutableOpenMap.Builder, ShardSnapshotDeletionStatus> shards = + ImmutableOpenMap.builder(); + boolean updated = false; + + for (UpdateShardSnapshotDeletionStatusRequest updateShardSnapshotState : tasks) { + if (entry.getSnapshot().equals(updateShardSnapshotState.shardSnapshot())) { + logger.trace("[{}] Updating shard [{}] with status [{}]", updateShardSnapshotState.shardSnapshot(), + updateShardSnapshotState.shardId(), updateShardSnapshotState.status().state()); + if (updated == false) { + shards.putAll(entry.getShards()); + updated = true; + } + if (shards.get(updateShardSnapshotState.shardId()) != updateShardSnapshotState.status()) { + shards.put(updateShardSnapshotState.shardId(), updateShardSnapshotState.status()); + changedCount++; + } + } + } + + if (updated) { + entries.add(new SnapshotDeletionsInProgress.Entry(entry, shards.build())); + } else { + entries.add(entry); + } + } + if (changedCount > 0) { + final SnapshotDeletionsInProgress updatedShardSnapshots = new SnapshotDeletionsInProgress(entries); + return ClusterTasksResult.builder().successes(tasks).build( + ClusterState.builder(currentState) + .putCustom(SnapshotDeletionsInProgress.TYPE, updatedShardSnapshots) + .build()); + } + } + return ClusterTasksResult.builder().successes(tasks).build(currentState); + } + } } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index f6ed3eb75d859..3629b817914dc 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -26,6 +26,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -34,6 +35,7 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.SnapshotDeletionsInProgress; +import org.elasticsearch.cluster.SnapshotDeletionsInProgress.ShardSnapshotDeletionStatus; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; import org.elasticsearch.cluster.SnapshotsInProgress.State; @@ -59,6 +61,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; @@ -72,6 +75,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -123,6 +127,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus private final Map>> snapshotCompletionListeners = new ConcurrentHashMap<>(); + private final Map>> deleteSnapshotCompletionListeners = new ConcurrentHashMap<>(); + // Set of snapshots that are currently being initialized by this node private final Set initializingSnapshots = Collections.synchronizedSet(new HashSet<>()); @@ -716,8 +722,15 @@ public void applyClusterState(ClusterChangedEvent event) { && (entry.state() == State.INIT || completed(entry.shards().values())) ).forEach(this::endSnapshot); } - if (newMaster) { - finalizeSnapshotDeletionFromPreviousMaster(event); + final SnapshotDeletionsInProgress deletionsInProgress = event.state().custom(SnapshotDeletionsInProgress.TYPE); + if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { + if (removedNodesCleanupNeeded(deletionsInProgress, event.nodesDelta().removedNodes())) { + processSnapshotsDeletionsOnRemovedNodes(); + } + if (newMaster) { + finalizeSnapshotDeletionFromPreviousMaster(event); + } + removeFinishedSnapshotDeletionFromClusterState(event); } } } catch (Exception e) { @@ -741,7 +754,28 @@ private void finalizeSnapshotDeletionFromPreviousMaster(ClusterChangedEvent even if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { assert deletionsInProgress.getEntries().size() == 1 : "only one in-progress deletion allowed per cluster"; SnapshotDeletionsInProgress.Entry entry = deletionsInProgress.getEntries().get(0); - deleteSnapshotFromRepository(entry.getSnapshot(), null, entry.getRepositoryStateId()); + if (entry.getShards().isEmpty()) { + // Non distributed delete, re-trigger delete snapshot + deleteSnapshotFromRepository(entry.getSnapshot(), null, entry.getRepositoryStateId()); + } + } + } + + private void removeFinishedSnapshotDeletionFromClusterState(ClusterChangedEvent event) { + if (event.localNodeMaster()) { + SnapshotDeletionsInProgress deletionsInProgress = event.state().custom(SnapshotDeletionsInProgress.TYPE); + if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { + for (SnapshotDeletionsInProgress.Entry entry : deletionsInProgress.getEntries()) { + if (entry.getShards().isEmpty() == false && SnapshotDeletionsInProgress.completed(entry.getShards().values())) { + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { + // Clean up the indices now, as the shards are cleaned up + repositoriesService.repository(entry.getSnapshot().getRepository()) + .cleanUpIndices(entry.getIndicesToCleanup()); + removeSnapshotDeletionFromClusterState(entry.getSnapshot(), null); + }); + } + } + } } } @@ -823,6 +857,50 @@ public void onFailure(String source, Exception e) { }); } + private void processSnapshotsDeletionsOnRemovedNodes() { + clusterService.submitStateUpdateTask("update snapshot delete state after node removal", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); + if (deletionsInProgress == null) { + return currentState; + } + + Set nodes = new HashSet<>(); + currentState.getNodes().getDataNodes().keysIt().forEachRemaining(node -> nodes.add(node)); + String[] nodeArray = nodes.toArray(new String[nodes.size()]); + boolean changed = false; + List entries = new ArrayList<>(); + for (final SnapshotDeletionsInProgress.Entry snapshot : deletionsInProgress.getEntries()) { + int i = 0; + ImmutableOpenMap.Builder, ShardSnapshotDeletionStatus> builder = ImmutableOpenMap.builder(); + for (ObjectObjectCursor, ShardSnapshotDeletionStatus> shard : snapshot.getShards()) { + if (nodes.contains(shard.value.nodeId()) || shard.value.state().completed()) { + builder.put(shard.key, shard.value); + } else { + changed = true; + builder.put(shard.key, new ShardSnapshotDeletionStatus(nodeArray[i], + SnapshotDeletionsInProgress.State.INIT)); + i = (i + 1 >= nodeArray.length) ? 0 : i + 1; + } + } + entries.add(new SnapshotDeletionsInProgress.Entry(snapshot, builder.build())); + } + if (!changed) { + return currentState; + } + return ClusterState.builder(currentState) + .putCustom(SnapshotDeletionsInProgress.TYPE, new SnapshotDeletionsInProgress(entries)) + .build(); + } + + @Override + public void onFailure(String source, Exception e) { + logger.warn("failed to update snapshot delete state after node removal"); + } + }); + } + private void processStartedShards() { clusterService.submitStateUpdateTask("update snapshot state after shards started", new ClusterStateUpdateTask() { @Override @@ -931,6 +1009,14 @@ private static boolean removedNodesCleanupNeeded(SnapshotsInProgress snapshotsIn .anyMatch(removedNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet())::contains); } + private static boolean removedNodesCleanupNeeded(SnapshotDeletionsInProgress deletionsInProgress, List removedNodes) { + // If at least one shard was running on a removed node - we need to fail it + return removedNodes.isEmpty() == false && deletionsInProgress.getEntries().stream().flatMap(snapshot -> + StreamSupport.stream(((Iterable) () -> snapshot.getShards().valuesIt()).spliterator(), false) + .filter(s -> s.state().completed() == false).map(ShardSnapshotDeletionStatus::nodeId)) + .anyMatch(removedNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet())::contains); + } + /** * Returns list of indices with missing shards, and list of indices that are closed * @@ -1309,23 +1395,149 @@ public static boolean isRepositoryInUse(ClusterState clusterState, String reposi */ private void deleteSnapshotFromRepository(Snapshot snapshot, @Nullable ActionListener listener, long repositoryStateId) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { + if (listener != null) { + addDeleteSnapshotListener(snapshot, listener); + } try { Repository repository = repositoriesService.repository(snapshot.getRepository()); - repository.deleteSnapshot(snapshot.getSnapshotId(), repositoryStateId); - logger.info("snapshot [{}] deleted", snapshot); + final ClusterState state = clusterService.state(); + if (repository.isDistributedSnapshotDeletionEnabled() && + state.getNodes().getMinNodeVersion().before(Version.V_8_0_0)) { + // Repository does not support distributed delete or there are some nodes in the cluster which does + // not understand distributed delete + repository.deleteSnapshot(snapshot.getSnapshotId(), repositoryStateId); + logger.info("snapshot [{}] deleted", snapshot); + removeSnapshotDeletionFromClusterState(snapshot, null); + return; + } - removeSnapshotDeletionFromClusterState(snapshot, null, listener); + logger.info("All nodes have version {} or above, proceeding to distribute the snapshot delete", Version.V_8_0_0); + deleteSnapshotMetadataAndInitiateDistributedDelete(snapshot, repository, repositoryStateId); } catch (Exception ex) { - removeSnapshotDeletionFromClusterState(snapshot, ex, listener); + removeSnapshotDeletionFromClusterState(snapshot, ex); } }); } + private void deleteSnapshotMetadataAndInitiateDistributedDelete(final Snapshot snapshot, + final Repository repository, + final long repositoryStateId) { + final SnapshotId snapshotId = snapshot.getSnapshotId(); + final SnapshotInfo snapshotInfo = repository.getSnapshotInfoOrNull(snapshotId); + RepositoryData repositoryData = repository.getRepositoryData(); + // initiate snapshot delete - delete the metadata files from snapshot root level and return the new repository data + RepositoryData updatedRepositoryData = repository.initiateSnapshotDelete(snapshotId, snapshotInfo, repositoryStateId); + // cleanup indices that are no longer part of the repository + final Collection indicesToCleanUp = Sets.newHashSet(repositoryData.getIndices().values()); + indicesToCleanUp.removeAll(updatedRepositoryData.getIndices().values()); + if (snapshotInfo != null) { + // fetches the list of shards that are part of current snapshot in deletion + final List> shards = repository.getShardsToDeleteForSnapshot( + snapshotId, snapshotInfo, repositoryData); + + if (shards.size() == 0) { + // No shards to delete in the snapshot + removeSnapshotDeletionFromClusterState(snapshot, null); + return; + } + + Set indexIds = shards.stream().map(shard -> shard.v2()).collect(Collectors.toSet()); + // delete the snapshot metadata at index level for the current snapshot in deletion + repository.deleteIndicesMetaData(snapshotInfo, indexIds); + // all snapshot shard deletions to data nodes + clusterService.submitStateUpdateTask("allocate nodes to delete shard snapshot", + new ClusterStateUpdateTask() { + + @Override + public ClusterState execute(ClusterState currentState) { + SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); + if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { + if (deletionsInProgress.getEntry(snapshot) != null && + deletionsInProgress.getEntry(snapshot).getShards().isEmpty() == false) { + // Delete is already in progress + logger.info("Delete is already in progress"); + return currentState; + } + } + if(currentState.getNodes().getMinNodeVersion().before(Version.V_8_0_0)) { + // There are new nodes that joined the cluster with older version, delete snapshot in master + deleteSnapshotInMaster(snapshot, snapshotInfo, repository, shards, indicesToCleanUp); + return currentState; + } + deletionsInProgress = deletionsInProgress.withRemovedEntry(deletionsInProgress.getEntry(snapshot)); + deletionsInProgress = deletionsInProgress.withAddedEntry(createSnapshotDeletionEntry(snapshot, + snapshotInfo, repositoryStateId, shards, indicesToCleanUp, currentState)); + ClusterState state = ClusterState.builder(currentState) + .putCustom(SnapshotDeletionsInProgress.TYPE, deletionsInProgress) + .build(); + return state; + } + + @Override + public void onFailure(String source, Exception e) { + logger.warn(() -> + new ParameterizedMessage("[{}] failed to assign shard snapshot delete to data node", + snapshot), e); + removeSnapshotDeletionFromClusterState(snapshot, e); + } + + }); + } else { + removeSnapshotDeletionFromClusterState(snapshot, null); + } + } + + private void deleteSnapshotInMaster(final Snapshot snapshot, + final SnapshotInfo snapshotInfo, + final Repository repository, + final List> shards, + final Collection indicesToCleanUp) { + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { + logger.info("There is at least one node with version older than {}, proceeding delete in master", Version.V_8_0_0); + shards.stream().forEach(shard -> { + repository.deleteShard(snapshot.getSnapshotId(), snapshotInfo.version().id, + shard.v2(), shard.v1()); + }); + repository.cleanUpIndices(new ArrayList<>(indicesToCleanUp)); + removeSnapshotDeletionFromClusterState(snapshot, null); + }); + } + + private SnapshotDeletionsInProgress.Entry createSnapshotDeletionEntry(final Snapshot snapshot, + final SnapshotInfo snapshotInfo, + final long repositoryStateId, + final List> shards, + final Collection indicesToCleanUp, + final ClusterState currentState) { + final List nodes = new ArrayList<>(); + currentState.getNodes().getDataNodes().keysIt().forEachRemaining(node -> nodes.add(node)); + final int numberOfDataNodes = nodes.size(); + final ImmutableOpenMap.Builder, ShardSnapshotDeletionStatus> builder = ImmutableOpenMap.builder(); + + for (int i = 0; i < shards.size(); i++) { + // assigning the nodes in round robin fashion, shard1 of index 1 is assigned to node 1, + // shard 2 of index 1 is assigned to node 2, shard 3 of index 1 is assigned to node 3, + // shard 1 of index 2 is assigned to node 4, shard 2 of index 2 is assigned to node 1 + // and so on + final String node = nodes.get(i % numberOfDataNodes); + logger.trace("Assigning shard {} to node {}", shards.get(i), node); + builder.put(shards.get(i), new ShardSnapshotDeletionStatus(node, SnapshotDeletionsInProgress.State.INIT)); + } + + return new SnapshotDeletionsInProgress.Entry( + snapshot, + System.currentTimeMillis(), + repositoryStateId, + snapshotInfo.version().id, + new ArrayList<>(indicesToCleanUp), + builder.build() + ); + } + /** * Removes the snapshot deletion from {@link SnapshotDeletionsInProgress} in the cluster state. */ - private void removeSnapshotDeletionFromClusterState(final Snapshot snapshot, @Nullable final Exception failure, - @Nullable final ActionListener listener) { + private void removeSnapshotDeletionFromClusterState(final Snapshot snapshot, @Nullable final Exception failure) { clusterService.submitStateUpdateTask("remove snapshot deletion metadata", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { @@ -1348,19 +1560,27 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(String source, Exception e) { logger.warn(() -> new ParameterizedMessage("[{}] failed to remove snapshot deletion metadata", snapshot), e); - if (listener != null) { - listener.onFailure(e); + final List> listeners = deleteSnapshotCompletionListeners.get(snapshot); + if (listeners != null) { + listeners.forEach(listener -> { + listener.onFailure(e); + }); + listeners.remove(snapshot); } } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - if (listener != null) { - if (failure != null) { - listener.onFailure(failure); - } else { - listener.onResponse(null); - } + final List> listeners = deleteSnapshotCompletionListeners.get(snapshot); + if (listeners != null) { + listeners.forEach(listener -> { + if (failure != null) { + listener.onFailure(failure); + } else { + listener.onResponse(null); + } + }); + listeners.remove(snapshot); } } }); @@ -1487,6 +1707,16 @@ private void addListener(Snapshot snapshot, ActionListener listene snapshotCompletionListeners.computeIfAbsent(snapshot, k -> new CopyOnWriteArrayList<>()).add(listener); } + /** + * Adds delete snapshot completion listener + * + * @param snapshot Snapshot to listen for + * @param listener listener + */ + public void addDeleteSnapshotListener(Snapshot snapshot, ActionListener listener) { + deleteSnapshotCompletionListeners.computeIfAbsent(snapshot, k -> new CopyOnWriteArrayList<>()).add(listener); + } + @Override protected void doStart() { @@ -1506,3 +1736,4 @@ public RepositoriesService getRepositoriesService() { return repositoriesService; } } + diff --git a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index 433cbe2fdaa6c..41504b512aab6 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -484,6 +484,8 @@ public void testSnapshotWithStuckNode() throws Exception { // completely stopping. In this case the retried delete snapshot operation on the new master can fail // with SnapshotMissingException } + // Make sure that the delete goes through + Thread.sleep(100); logger.info("--> making sure that snapshot no longer exists"); assertThrows(client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").execute(), diff --git a/server/src/test/java/org/elasticsearch/snapshots/DistributedSnapshotDeletionsIT.java b/server/src/test/java/org/elasticsearch/snapshots/DistributedSnapshotDeletionsIT.java new file mode 100644 index 0000000000000..b3e16a5ecd92b --- /dev/null +++ b/server/src/test/java/org/elasticsearch/snapshots/DistributedSnapshotDeletionsIT.java @@ -0,0 +1,387 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.snapshots; + +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.snapshots.mockstore.MockRepository; +import org.elasticsearch.test.ESIntegTestCase; + +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0) +public class DistributedSnapshotDeletionsIT extends AbstractSnapshotIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(MockRepository.Plugin.class, org.elasticsearch.test.transport.MockTransportService.TestPlugin.class); + } + + public void testDeleteSnapshot() throws Exception { + logger.info("--> start 1 master and 3 data nodes"); + internalCluster().startMasterOnlyNode(); + internalCluster().startDataOnlyNode(); + internalCluster().startDataOnlyNode(); // Start an additional data-node + internalCluster().startDataOnlyNode(); // Start an additional data-node + Client client = client(); + + String repoName = "test-repo"; + Path repo = randomRepoPath(); + logger.info("--> creating repository at {}", repo.toAbsolutePath()); + assertAcked(client.admin().cluster().preparePutRepository(repoName) + .setType("mock").setSettings(Settings.builder() + .put("location", repo) + .put("compress", false) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); + int numberOfFilesBeforeSnapshot = numberOfFiles(repo); + + createIndex("test-idx-1", "test-idx-2", "test-idx-3"); + logger.info("--> indexing some data"); + indexRandom(true, + client().prepareIndex("test-idx-1", "_doc").setSource("foo", "bar"), + client().prepareIndex("test-idx-2", "_doc").setSource("foo", "bar"), + client().prepareIndex("test-idx-3", "_doc").setSource("foo", "bar")); + + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repoName, "test-snap-1") + .setWaitForCompletion(true).setIndices("test-idx-*").get(); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), + equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); + + logger.info("--> delete snapshot"); + ActionFuture future = client.admin().cluster().prepareDeleteSnapshot(repoName,"test-snap-1").execute(); + future.actionGet(1, TimeUnit.MINUTES); + + logger.info("--> make sure snapshot doesn't exist"); + assertThrows(client.admin().cluster().prepareGetSnapshots(repoName).addSnapshots("test-snap-1"), SnapshotMissingException.class); + + // Subtract four files that will remain in the repository: + // (1) index-1 + // (2) index-0 (because we keep the previous version) and + // (3) index-latest + // (4) incompatible-snapshots + assertThat("not all files were deleted during snapshot cancellation", + numberOfFilesBeforeSnapshot, equalTo(numberOfFiles(repo) - 4)); + logger.info("--> done"); + } + + public void testDeleteSnapshotWithoutData() throws Exception { + logger.info("--> start 1 master and 3 data nodes"); + internalCluster().startMasterOnlyNode(); + internalCluster().startDataOnlyNode(); + internalCluster().startDataOnlyNode(); // Start an additional data-node + internalCluster().startDataOnlyNode(); // Start an additional data-node + Client client = client(); + + String repoName = "test-repo"; + Path repo = randomRepoPath(); + logger.info("--> creating repository at {}", repo.toAbsolutePath()); + assertAcked(client.admin().cluster().preparePutRepository(repoName) + .setType("mock").setSettings(Settings.builder() + .put("location", repo) + .put("compress", false) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); + int numberOfFilesBeforeSnapshot = numberOfFiles(repo); + + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repoName, "test-snap-1") + .setWaitForCompletion(true).setIndices("test-idx-*").get(); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(0)); + + logger.info("--> delete snapshot"); + ActionFuture future = client.admin().cluster().prepareDeleteSnapshot(repoName,"test-snap-1").execute(); + future.actionGet(1, TimeUnit.MINUTES); + + logger.info("--> make sure snapshot doesn't exist"); + assertThrows(client.admin().cluster().prepareGetSnapshots(repoName).addSnapshots("test-snap-1"), SnapshotMissingException.class); + + // Subtract four files that will remain in the repository: + // (1) index-1 + // (2) index-0 (because we keep the previous version) and + // (3) index-latest + // (4) incompatible-snapshots + assertThat("not all files were deleted during snapshot cancellation", + numberOfFilesBeforeSnapshot, equalTo(numberOfFiles(repo) - 4)); + logger.info("--> done"); + } + + public void testDeleteSnapshotWithDataNodeOutage() throws Exception { + logger.info("--> start 1 master and 3 data nodes"); + internalCluster().startMasterOnlyNode(); + String dataNode = internalCluster().startDataOnlyNode(); + internalCluster().startDataOnlyNode(); // Start an additional data-node + internalCluster().startDataOnlyNode(); // Start an additional data-node + Client client = client(); + + String repoName = "test-repo"; + Path repo = randomRepoPath(); + logger.info("--> creating repository at {}", repo.toAbsolutePath()); + assertAcked(client.admin().cluster().preparePutRepository(repoName) + .setType("mock").setSettings(Settings.builder() + .put("location", repo) + .put("compress", false) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); + int numberOfFilesBeforeSnapshot = numberOfFiles(repo); + + createIndex("test-idx-1", "test-idx-2", "test-idx-3"); + logger.info("--> indexing some data"); + indexRandom(true, + client().prepareIndex("test-idx-1", "_doc").setSource("foo", "bar"), + client().prepareIndex("test-idx-2", "_doc").setSource("foo", "bar"), + client().prepareIndex("test-idx-3", "_doc").setSource("foo", "bar")); + + logger.info("--> creating snapshot"); + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repoName, "test-snap-1") + .setWaitForCompletion(true).setIndices("test-idx-*").get(); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), + equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); + + ((MockRepository)internalCluster().getInstance(RepositoriesService.class, dataNode).repository(repoName)).blockOnDataFiles(true); + + logger.info("--> delete snapshot"); + ActionFuture future = client.admin().cluster().prepareDeleteSnapshot(repoName,"test-snap-1").execute(); + + logger.info("--> waiting for block to kick in on node [{}]", dataNode); + waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(10)); + + logger.info("--> stopping node [{}]", dataNode); + stopNode(dataNode); + + try { + future.actionGet(1, TimeUnit.MINUTES); + } catch (Exception e) { + logger.info("--> the node where the client is connected is down, sleep for 10 seconds before proceeding"); + Thread.sleep(10 * 1000); + } + + logger.info("--> make sure snapshot doesn't exist"); + assertThrows(client().admin().cluster().prepareGetSnapshots(repoName).addSnapshots("test-snap-1"), SnapshotMissingException.class); + + // Subtract four files that will remain in the repository: + // (1) index-1 + // (2) index-0 (because we keep the previous version) and + // (3) index-latest + // (4) incompatible-snapshots + assertThat("not all files were deleted during snapshot cancellation", + numberOfFilesBeforeSnapshot, equalTo(numberOfFiles(repo) - 4)); + logger.info("--> done"); + } + + public void testDeleteSnapshotWithMasterNodeOutage() throws Exception { + logger.info("--> start 3 master and 3 data nodes"); + internalCluster().startMasterOnlyNode(); + internalCluster().startMasterOnlyNode(); + internalCluster().startMasterOnlyNode(); + String dataNode = internalCluster().startDataOnlyNode(); + internalCluster().startDataOnlyNode(); + internalCluster().startDataOnlyNode(); + Client client = client(); + + String repoName = "test-repo"; + Path repo = randomRepoPath(); + logger.info("--> creating repository at {}", repo.toAbsolutePath()); + assertAcked(client.admin().cluster().preparePutRepository(repoName) + .setType("mock").setSettings(Settings.builder() + .put("location", repo) + .put("compress", false) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); + int numberOfFilesBeforeSnapshot = numberOfFiles(repo); + + createIndex("test-idx-1", "test-idx-2", "test-idx-3"); + logger.info("--> indexing some data"); + indexRandom(true, + client().prepareIndex("test-idx-1", "_doc").setSource("foo", "bar"), + client().prepareIndex("test-idx-2", "_doc").setSource("foo", "bar"), + client().prepareIndex("test-idx-3", "_doc").setSource("foo", "bar")); + + logger.info("--> creating snapshot"); + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repoName, "test-snap-1") + .setWaitForCompletion(true).setIndices("test-idx-*").get(); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), + equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); + + ((MockRepository)internalCluster().getInstance(RepositoriesService.class, dataNode).repository(repoName)).blockOnDataFiles(true); + + logger.info("--> delete snapshot"); + ActionFuture future = client.admin().cluster().prepareDeleteSnapshot(repoName,"test-snap-1").execute(); + + logger.info("--> waiting for block to kick in on node [{}]", dataNode); + waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(10)); + + String masterNode = internalCluster().getMasterName(); + logger.info("--> stopping current master [{}]", masterNode); + stopNode(masterNode); + + logger.info("--> unblocking blocked node [{}]", dataNode); + unblockNode(repoName, dataNode); + + try { + future.actionGet(1, TimeUnit.MINUTES); + } catch (Exception e) { + logger.info("--> new master may not be elected, sleep for 10 seconds"); + Thread.sleep(10 * 1000); + } + + logger.info("--> make sure snapshot doesn't exist"); + assertThrows(client().admin().cluster().prepareGetSnapshots(repoName).addSnapshots("test-snap-1"), SnapshotMissingException.class); + + // Subtract four files that will remain in the repository: + // (1) index-1 + // (2) index-0 (because we keep the previous version) and + // (3) index-latest + // (4) incompatible-snapshots + assertThat("not all files were deleted during snapshot cancellation", + numberOfFilesBeforeSnapshot, equalTo(numberOfFiles(repo) - 4)); + logger.info("--> done"); + } + + public void testDeleteSnapshotWithBothMasterAndDataNodeOutage() throws Exception { + logger.info("--> start 3 master and 3 data nodes"); + internalCluster().startMasterOnlyNode(); + internalCluster().startMasterOnlyNode(); + internalCluster().startMasterOnlyNode(); + String dataNode = internalCluster().startDataOnlyNode(); + internalCluster().startDataOnlyNode(); + internalCluster().startDataOnlyNode(); + Client client = client(); + + String repoName = "test-repo"; + Path repo = randomRepoPath(); + logger.info("--> creating repository at {}", repo.toAbsolutePath()); + assertAcked(client.admin().cluster().preparePutRepository(repoName) + .setType("mock").setSettings(Settings.builder() + .put("location", repo) + .put("compress", false) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); + int numberOfFilesBeforeSnapshot = numberOfFiles(repo); + + createIndex("test-idx-1", "test-idx-2", "test-idx-3"); + logger.info("--> indexing some data"); + indexRandom(true, + client().prepareIndex("test-idx-1", "_doc").setSource("foo", "bar"), + client().prepareIndex("test-idx-2", "_doc").setSource("foo", "bar"), + client().prepareIndex("test-idx-3", "_doc").setSource("foo", "bar")); + + logger.info("--> creating snapshot"); + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repoName, "test-snap-1") + .setWaitForCompletion(true).setIndices("test-idx-*").get(); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), + equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); + + ((MockRepository)internalCluster().getInstance(RepositoriesService.class, dataNode).repository(repoName)).blockOnDataFiles(true); + + logger.info("--> delete snapshot"); + ActionFuture future = client.admin().cluster().prepareDeleteSnapshot(repoName,"test-snap-1").execute(); + + logger.info("--> waiting for block to kick in on node [{}]", dataNode); + waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(10)); + + String masterNode = internalCluster().getMasterName(); + logger.info("--> stopping current master [{}] and data node [{}]", masterNode, dataNode); + stopNode(masterNode); + stopNode(dataNode); + + try { + future.actionGet(1, TimeUnit.MINUTES); + } catch (Exception e) { + logger.info("--> new master may not be elected, sleep for 10 seconds"); + Thread.sleep(10 * 1000); + } + + logger.info("--> make sure snapshot doesn't exist"); + assertThrows(client().admin().cluster().prepareGetSnapshots(repoName).addSnapshots("test-snap-1"), SnapshotMissingException.class); + + // Subtract four files that will remain in the repository: + // (1) index-1 + // (2) index-0 (because we keep the previous version) and + // (3) index-latest + // (4) incompatible-snapshots + assertThat("not all files were deleted during snapshot cancellation", + numberOfFilesBeforeSnapshot, equalTo(numberOfFiles(repo) - 4)); + logger.info("--> done"); + } + + public void testDeleteSnapshotWithOneNode() throws Exception { + logger.info("--> start 1 node"); + internalCluster().startNode(Settings.builder() + .put("thread_pool.snapshot.core", 1) + .put("thread_pool.snapshot.max", 1) + .build()); + Client client = client(); + + String repoName = "test-repo"; + Path repo = randomRepoPath(); + logger.info("--> creating repository at {}", repo.toAbsolutePath()); + assertAcked(client.admin().cluster().preparePutRepository(repoName) + .setType("mock").setSettings(Settings.builder() + .put("location", repo) + .put("compress", false) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); + int numberOfFilesBeforeSnapshot = numberOfFiles(repo); + + createIndex("test-idx-1", "test-idx-2", "test-idx-3"); + logger.info("--> indexing some data"); + indexRandom(true, + client().prepareIndex("test-idx-1", "_doc").setSource("foo", "bar"), + client().prepareIndex("test-idx-2", "_doc").setSource("foo", "bar"), + client().prepareIndex("test-idx-3", "_doc").setSource("foo", "bar")); + + logger.info("--> creating snapshot"); + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repoName, "test-snap-1") + .setWaitForCompletion(true).setIndices("test-idx-*").get(); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), + equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); + + logger.info("--> delete snapshot"); + ActionFuture future = client.admin().cluster().prepareDeleteSnapshot(repoName,"test-snap-1").execute(); + future.actionGet(1, TimeUnit.MINUTES); + + logger.info("--> make sure snapshot doesn't exist"); + assertThrows(client.admin().cluster().prepareGetSnapshots(repoName).addSnapshots("test-snap-1"), SnapshotMissingException.class); + + // Subtract four files that will remain in the repository: + // (1) index-1 + // (2) index-0 (because we keep the previous version) and + // (3) index-latest + // (4) incompatible-snapshots + assertThat("not all files were deleted during snapshot cancellation", + numberOfFilesBeforeSnapshot, equalTo(numberOfFiles(repo) - 4)); + logger.info("--> done"); + } + +} diff --git a/server/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java index 885baa883ed63..88b2a1842a2f9 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java @@ -81,7 +81,8 @@ public void testConcurrentSnapshotDeletionsNotAllowed() throws Exception { client().admin().cluster().prepareCreateSnapshot(repo, snapshot2).setWaitForCompletion(true).get(); String blockedNode = internalCluster().getMasterName(); - ((MockRepository)internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)).blockOnDataFiles(true); + ((MockRepository)internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)) + .setBlockOnDeleteSnapFile(true); logger.info("--> start deletion of first snapshot"); ActionFuture future = client().admin().cluster().prepareDeleteSnapshot(repo, snapshot2).execute(); @@ -127,7 +128,8 @@ public void testSnapshottingWithInProgressDeletionNotAllowed() throws Exception client().admin().cluster().prepareCreateSnapshot(repo, snapshot1).setWaitForCompletion(true).get(); String blockedNode = internalCluster().getMasterName(); - ((MockRepository)internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)).blockOnDataFiles(true); + ((MockRepository)internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)) + .setBlockOnDeleteSnapFile(true); logger.info("--> start deletion of snapshot"); ActionFuture future = client().admin().cluster().prepareDeleteSnapshot(repo, snapshot1).execute(); logger.info("--> waiting for block to kick in on node [{}]", blockedNode); @@ -182,7 +184,8 @@ public void testRestoreWithInProgressDeletionsNotAllowed() throws Exception { client().admin().indices().prepareClose(index, index2).get(); String blockedNode = internalCluster().getMasterName(); - ((MockRepository)internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)).blockOnDataFiles(true); + ((MockRepository)internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)) + .setBlockOnDeleteSnapFile(true); logger.info("--> start deletion of snapshot"); ActionFuture future = client().admin().cluster().prepareDeleteSnapshot(repo, snapshot2).execute(); logger.info("--> waiting for block to kick in on node [{}]", blockedNode); diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 8a49324757f27..6be8cad0de733 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -110,6 +110,9 @@ public long getFailureCount() { /** Allows blocking on writing the snapshot file at the end of snapshot creation to simulate a died master node */ private volatile boolean blockAndFailOnWriteSnapFile; + /* Allows blocking on deleting the snapshot file */ + private volatile boolean blockOnDeleteSnapFile; + private volatile boolean blocked = false; public MockRepository(RepositoryMetaData metadata, Environment environment, @@ -123,6 +126,7 @@ public MockRepository(RepositoryMetaData metadata, Environment environment, blockOnDataFiles = metadata.settings().getAsBoolean("block_on_data", false); blockOnInitialization = metadata.settings().getAsBoolean("block_on_init", false); blockAndFailOnWriteSnapFile = metadata.settings().getAsBoolean("block_on_snap", false); + blockOnDeleteSnapFile = metadata.settings().getAsBoolean("block_on_delete_snap", false); randomPrefix = metadata.settings().get("random", "default"); waitAfterUnblock = metadata.settings().getAsLong("wait_after_unblock", 0L); logger.info("starting mock repository with random prefix {}", randomPrefix); @@ -172,6 +176,7 @@ public synchronized void unblock() { blockOnInitialization = false; blockOnWriteIndexFile = false; blockAndFailOnWriteSnapFile = false; + blockOnDeleteSnapFile = false; this.notifyAll(); } @@ -187,6 +192,10 @@ public void setBlockOnWriteIndexFile(boolean blocked) { blockOnWriteIndexFile = blocked; } + public void setBlockOnDeleteSnapFile(boolean blocked) { + blockOnDeleteSnapFile = blocked; + } + public boolean blocked() { return blocked; } @@ -196,7 +205,7 @@ private synchronized boolean blockExecution() { boolean wasBlocked = false; try { while (blockOnDataFiles || blockOnControlFiles || blockOnInitialization || blockOnWriteIndexFile || - blockAndFailOnWriteSnapFile) { + blockAndFailOnWriteSnapFile || blockOnDeleteSnapFile) { blocked = true; this.wait(); wasBlocked = true; @@ -320,7 +329,11 @@ public InputStream readBlob(String name) throws IOException { @Override public void deleteBlob(String blobName) throws IOException { - maybeIOExceptionOrBlock(blobName); + if (blobName.startsWith("snap-") && blockOnDeleteSnapFile) { + blockExecutionAndMaybeWait(blobName); + } else { + maybeIOExceptionOrBlock(blobName); + } super.deleteBlob(blobName); }