diff --git a/core/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/core/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index f0a0fdec665d5..7088037353096 100644 --- a/core/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/core/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.repositories.IndexId; import org.elasticsearch.snapshots.Snapshot; import java.io.IOException; @@ -70,12 +71,12 @@ public static class Entry { private final boolean includeGlobalState; private final boolean partial; private final ImmutableOpenMap shards; - private final List indices; + private final List indices; private final ImmutableOpenMap> waitingIndices; private final long startTime; - public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List indices, long startTime, - ImmutableOpenMap shards) { + public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List indices, + long startTime, ImmutableOpenMap shards) { this.state = state; this.snapshot = snapshot; this.includeGlobalState = includeGlobalState; @@ -111,7 +112,7 @@ public State state() { return state; } - public List indices() { + public List indices() { return indices; } @@ -377,9 +378,9 @@ public SnapshotsInProgress readFrom(StreamInput in) throws IOException { boolean partial = in.readBoolean(); State state = State.fromValue(in.readByte()); int indices = in.readVInt(); - List indexBuilder = new ArrayList<>(); + List indexBuilder = new ArrayList<>(); for (int j = 0; j < indices; j++) { - indexBuilder.add(in.readString()); + indexBuilder.add(new IndexId(in.readString(), in.readString())); } long startTime = in.readLong(); ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); @@ -410,8 +411,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(entry.partial()); out.writeByte(entry.state().value()); out.writeVInt(entry.indices().size()); - for (String index : entry.indices()) { - out.writeString(index); + for (IndexId index : entry.indices()) { + index.writeTo(out); } out.writeLong(entry.startTime()); out.writeVInt(entry.shards().size()); @@ -458,8 +459,8 @@ public void toXContent(Entry entry, XContentBuilder builder, ToXContent.Params p builder.field(STATE, entry.state()); builder.startArray(INDICES); { - for (String index : entry.indices()) { - builder.value(index); + for (IndexId index : entry.indices()) { + index.toXContent(builder, params); } } builder.endArray(); diff --git a/core/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java b/core/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java index 024a882a7de62..4229ee954d402 100644 --- a/core/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java +++ b/core/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java @@ -23,7 +23,7 @@ import java.io.IOException; import java.io.InputStream; -import java.util.Collection; +import java.nio.file.NoSuchFileException; import java.util.Map; /** @@ -53,7 +53,8 @@ public interface BlobContainer { * @param blobName * The name of the blob to get an {@link InputStream} for. * @return The {@code InputStream} to read the blob. - * @throws IOException if the blob does not exist or can not be read. + * @throws NoSuchFileException if the blob does not exist + * @throws IOException if the blob can not be read. */ InputStream readBlob(String blobName) throws IOException; @@ -95,7 +96,8 @@ public interface BlobContainer { * * @param blobName * The name of the blob to delete. - * @throws IOException if the blob does not exist, or if the blob exists but could not be deleted. + * @throws NoSuchFileException if the blob does not exist + * @throws IOException if the blob exists but could not be deleted. */ void deleteBlob(String blobName) throws IOException; diff --git a/core/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java b/core/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java index 67ce298becdc1..02a5aa357dfa6 100644 --- a/core/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java +++ b/core/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java @@ -27,13 +27,16 @@ import org.elasticsearch.common.io.Streams; import java.io.BufferedInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.file.DirectoryStream; import java.nio.file.Files; +import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; import java.nio.file.attribute.BasicFileAttributes; import java.util.HashMap; import java.util.Map; @@ -95,14 +98,18 @@ public boolean blobExists(String blobName) { @Override public InputStream readBlob(String name) throws IOException { - return new BufferedInputStream(Files.newInputStream(path.resolve(name)), blobStore.bufferSizeInBytes()); + final Path resolvedPath = path.resolve(name); + try { + return new BufferedInputStream(Files.newInputStream(resolvedPath), blobStore.bufferSizeInBytes()); + } catch (FileNotFoundException fnfe) { + throw new NoSuchFileException("[" + name + "] blob not found"); + } } @Override public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException { final Path file = path.resolve(blobName); - // TODO: why is this not specifying CREATE_NEW? Do we really need to be able to truncate existing files? - try (OutputStream outputStream = Files.newOutputStream(file)) { + try (OutputStream outputStream = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW)) { Streams.copy(inputStream, outputStream, new byte[blobStore.bufferSizeInBytes()]); } IOUtils.fsync(file, false); diff --git a/core/src/main/java/org/elasticsearch/common/blobstore/support/AbstractBlobContainer.java b/core/src/main/java/org/elasticsearch/common/blobstore/support/AbstractBlobContainer.java index 60be21127bfe0..1c4652c9f1095 100644 --- a/core/src/main/java/org/elasticsearch/common/blobstore/support/AbstractBlobContainer.java +++ b/core/src/main/java/org/elasticsearch/common/blobstore/support/AbstractBlobContainer.java @@ -20,14 +20,11 @@ package org.elasticsearch.common.blobstore.support; import org.elasticsearch.common.blobstore.BlobContainer; -import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.bytes.BytesReference; import java.io.IOException; import java.io.InputStream; -import java.util.Collection; -import java.util.Map; /** * A base abstract blob container that implements higher level container methods. diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index ff2d1d298a532..e08130d9d8d39 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -43,6 +43,7 @@ import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; import java.io.IOException; @@ -394,10 +395,12 @@ private void restore(final IndexShard indexShard, final Repository repository) { translogState.totalOperationsOnStart(0); indexShard.prepareForIndexRecovery(); ShardId snapshotShardId = shardId; - if (!shardId.getIndexName().equals(restoreSource.index())) { - snapshotShardId = new ShardId(restoreSource.index(), IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id()); + final String indexName = restoreSource.index(); + if (!shardId.getIndexName().equals(indexName)) { + snapshotShardId = new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id()); } - repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), snapshotShardId, indexShard.recoveryState()); + final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName); + repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState()); indexShard.skipTranslogRecovery(); indexShard.finalizeRecovery(); indexShard.postRecovery("restore done"); diff --git a/core/src/main/java/org/elasticsearch/repositories/IndexId.java b/core/src/main/java/org/elasticsearch/repositories/IndexId.java new file mode 100644 index 0000000000000..434582e61edb9 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/repositories/IndexId.java @@ -0,0 +1,110 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.repositories; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.Index; + +import java.io.IOException; +import java.util.Objects; + +/** + * Represents a single snapshotted index in the repository. + */ +public final class IndexId implements Writeable, ToXContent { + protected static final String NAME = "name"; + protected static final String ID = "id"; + + private final String name; + private final String id; + + public IndexId(final String name, final String id) { + this.name = name; + this.id = id; + } + + public IndexId(final StreamInput in) throws IOException { + this.name = in.readString(); + this.id = in.readString(); + } + + /** + * The name of the index. + */ + public String getName() { + return name; + } + + /** + * The unique ID for the index within the repository. This is *not* the same as the + * index's UUID, but merely a unique file/URL friendly identifier that a repository can + * use to name blobs for the index. + * + * We could not use the index's actual UUID (See {@link Index#getUUID()}) because in the + * case of snapshot/restore, the index UUID in the snapshotted index will be different + * from the index UUID assigned to it when it is restored. Hence, the actual index UUID + * is not useful in the context of snapshot/restore for tying a snapshotted index to the + * index it was snapshot from, and so we are using a separate UUID here. + */ + public String getId() { + return id; + } + + @Override + public String toString() { + return "[" + name + "/" + id + "]"; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + @SuppressWarnings("unchecked") IndexId that = (IndexId) o; + return Objects.equals(name, that.name) && Objects.equals(id, that.id); + } + + @Override + public int hashCode() { + return Objects.hash(name, id); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeString(name); + out.writeString(id); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(); + builder.field(NAME, name); + builder.field(ID, id); + builder.endObject(); + return builder; + } +} diff --git a/core/src/main/java/org/elasticsearch/repositories/Repository.java b/core/src/main/java/org/elasticsearch/repositories/Repository.java index 11a060d73e817..544f757737ca8 100644 --- a/core/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/core/src/main/java/org/elasticsearch/repositories/Repository.java @@ -47,7 +47,7 @@ *
    *
  • Master calls {@link #initializeSnapshot(SnapshotId, List, org.elasticsearch.cluster.metadata.MetaData)} * with list of indices that will be included into the snapshot
  • - *
  • Data nodes call {@link Repository#snapshotShard(IndexShard, SnapshotId, IndexCommit, IndexShardSnapshotStatus)} + *
  • Data nodes call {@link Repository#snapshotShard(IndexShard, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)} * for each shard
  • *
  • When all shard calls return master calls {@link #finalizeSnapshot} with possible list of failures
  • *
@@ -88,15 +88,14 @@ interface Factory { * @param indices list of indices * @return information about snapshot */ - MetaData getSnapshotMetaData(SnapshotInfo snapshot, List indices) throws IOException; + MetaData getSnapshotMetaData(SnapshotInfo snapshot, List indices) throws IOException; /** - * Returns the list of snapshots currently stored in the repository that match the given predicate on the snapshot name. - * To get all snapshots, the predicate filter should return true regardless of the input. - * - * @return snapshot list + * Returns a {@link RepositoryData} to describe the data in the repository, including the snapshots + * and the indices across all snapshots found in the repository. Throws a {@link RepositoryException} + * if there was an error in reading the data. */ - List getSnapshots(); + RepositoryData getRepositoryData(); /** * Starts snapshotting process @@ -105,7 +104,7 @@ interface Factory { * @param indices list of indices to be snapshotted * @param metaData cluster metadata */ - void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData); + void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData); /** * Finalizes snapshotting process @@ -113,12 +112,14 @@ interface Factory { * This method is called on master after all shards are snapshotted. * * @param snapshotId snapshot id + * @param indices list of indices in the snapshot + * @param startTime start time of the snapshot * @param failure global failure reason or null * @param totalShards total number of shards * @param shardFailures list of shard failures * @return snapshot description */ - SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, List shardFailures); + SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, List shardFailures); /** * Deletes snapshot @@ -181,10 +182,11 @@ interface Factory { * * @param shard shard to be snapshotted * @param snapshotId snapshot id + * @param indexId id for the index being snapshotted * @param snapshotIndexCommit commit point * @param snapshotStatus snapshot status */ - void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus); + void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus); /** * Restores snapshot of the shard. @@ -194,20 +196,22 @@ interface Factory { * @param shard the shard to restore the index into * @param snapshotId snapshot id * @param version version of elasticsearch that created this snapshot + * @param indexId id of the index in the repository from which the restore is occurring * @param snapshotShardId shard id (in the snapshot) * @param recoveryState recovery state */ - void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, ShardId snapshotShardId, RecoveryState recoveryState); + void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState); /** * Retrieve shard snapshot status for the stored snapshot * * @param snapshotId snapshot id * @param version version of elasticsearch that created this snapshot + * @param indexId the snapshotted index id for the shard to get status for * @param shardId shard id * @return snapshot status */ - IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId); + IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId); } diff --git a/core/src/main/java/org/elasticsearch/repositories/RepositoryData.java b/core/src/main/java/org/elasticsearch/repositories/RepositoryData.java new file mode 100644 index 0000000000000..4927e2b41b7f7 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/repositories/RepositoryData.java @@ -0,0 +1,311 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.repositories; + +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.snapshots.SnapshotId; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * A class that represents the data in a repository, as captured in the + * repository's index blob. + */ +public final class RepositoryData implements ToXContent { + + public static final RepositoryData EMPTY = new RepositoryData(Collections.emptyList(), Collections.emptyMap()); + + /** + * The ids of the snapshots in the repository. + */ + private final List snapshotIds; + /** + * The indices found in the repository across all snapshots, as a name to {@link IndexId} mapping + */ + private final Map indices; + /** + * The snapshots that each index belongs to. + */ + private final Map> indexSnapshots; + + public RepositoryData(List snapshotIds, Map> indexSnapshots) { + this.snapshotIds = Collections.unmodifiableList(snapshotIds); + this.indices = Collections.unmodifiableMap(indexSnapshots.keySet() + .stream() + .collect(Collectors.toMap(IndexId::getName, Function.identity()))); + this.indexSnapshots = Collections.unmodifiableMap(indexSnapshots); + } + + protected RepositoryData copy() { + return new RepositoryData(snapshotIds, indexSnapshots); + } + + /** + * Returns an unmodifiable list of the snapshot ids. + */ + public List getSnapshotIds() { + return snapshotIds; + } + + /** + * Returns an unmodifiable map of the index names to {@link IndexId} in the repository. + */ + public Map getIndices() { + return indices; + } + + /** + * Add a snapshot and its indices to the repository; returns a new instance. If the snapshot + * already exists in the repository data, this method throws an IllegalArgumentException. + */ + public RepositoryData addSnapshot(final SnapshotId snapshotId, final List snapshottedIndices) { + if (snapshotIds.contains(snapshotId)) { + throw new IllegalArgumentException("[" + snapshotId + "] already exists in the repository data"); + } + List snapshots = new ArrayList<>(snapshotIds); + snapshots.add(snapshotId); + Map> allIndexSnapshots = new HashMap<>(indexSnapshots); + for (final IndexId indexId : snapshottedIndices) { + if (allIndexSnapshots.containsKey(indexId)) { + Set ids = allIndexSnapshots.get(indexId); + if (ids == null) { + ids = new LinkedHashSet<>(); + allIndexSnapshots.put(indexId, ids); + } + ids.add(snapshotId); + } else { + Set ids = new LinkedHashSet<>(); + ids.add(snapshotId); + allIndexSnapshots.put(indexId, ids); + } + } + return new RepositoryData(snapshots, allIndexSnapshots); + } + + /** + * Initializes the indices in the repository metadata; returns a new instance. + */ + public RepositoryData initIndices(final Map> indexSnapshots) { + return new RepositoryData(snapshotIds, indexSnapshots); + } + + /** + * Remove a snapshot and remove any indices that no longer exist in the repository due to the deletion of the snapshot. + */ + public RepositoryData removeSnapshot(final SnapshotId snapshotId) { + List newSnapshotIds = snapshotIds + .stream() + .filter(id -> snapshotId.equals(id) == false) + .collect(Collectors.toList()); + Map> indexSnapshots = new HashMap<>(); + for (final IndexId indexId : indices.values()) { + Set set; + Set snapshotIds = this.indexSnapshots.get(indexId); + assert snapshotIds != null; + if (snapshotIds.contains(snapshotId)) { + if (snapshotIds.size() == 1) { + // removing the snapshot will mean no more snapshots have this index, so just skip over it + continue; + } + set = new LinkedHashSet<>(snapshotIds); + set.remove(snapshotId); + } else { + set = snapshotIds; + } + indexSnapshots.put(indexId, set); + } + + return new RepositoryData(newSnapshotIds, indexSnapshots); + } + + /** + * Returns an immutable collection of the snapshot ids for the snapshots that contain the given index. + */ + public Set getSnapshots(final IndexId indexId) { + Set snapshotIds = indexSnapshots.get(indexId); + if (snapshotIds == null) { + throw new IllegalArgumentException("unknown snapshot index " + indexId + ""); + } + return snapshotIds; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + @SuppressWarnings("unchecked") RepositoryData that = (RepositoryData) obj; + return snapshotIds.equals(that.snapshotIds) + && indices.equals(that.indices) + && indexSnapshots.equals(that.indexSnapshots); + } + + @Override + public int hashCode() { + return Objects.hash(snapshotIds, indices, indexSnapshots); + } + + /** + * Resolve the index name to the index id specific to the repository, + * throwing an exception if the index could not be resolved. + */ + public IndexId resolveIndexId(final String indexName) { + if (indices.containsKey(indexName)) { + return indices.get(indexName); + } else { + // on repositories created before 5.0, there was no indices information in the index + // blob, so if the repository hasn't been updated with new snapshots, no new index blob + // would have been written, so we only have old snapshots without the index information. + // in this case, the index id is just the index name + return new IndexId(indexName, indexName); + } + } + + /** + * Resolve the given index names to index ids. + */ + public List resolveIndices(final List indices) { + List resolvedIndices = new ArrayList<>(indices.size()); + for (final String indexName : indices) { + resolvedIndices.add(resolveIndexId(indexName)); + } + return resolvedIndices; + } + + /** + * Resolve the given index names to index ids, creating new index ids for + * new indices in the repository. + */ + public List resolveNewIndices(final List indicesToResolve) { + List snapshotIndices = new ArrayList<>(); + for (String index : indicesToResolve) { + final IndexId indexId; + if (indices.containsKey(index)) { + indexId = indices.get(index); + } else { + indexId = new IndexId(index, UUIDs.randomBase64UUID()); + } + snapshotIndices.add(indexId); + } + return snapshotIndices; + } + + private static final String SNAPSHOTS = "snapshots"; + private static final String INDICES = "indices"; + private static final String INDEX_ID = "id"; + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject(); + // write the snapshots list + builder.startArray(SNAPSHOTS); + for (final SnapshotId snapshot : getSnapshotIds()) { + snapshot.toXContent(builder, params); + } + builder.endArray(); + // write the indices map + builder.startObject(INDICES); + for (final IndexId indexId : getIndices().values()) { + builder.startObject(indexId.getName()); + builder.field(INDEX_ID, indexId.getId()); + builder.startArray(SNAPSHOTS); + Set snapshotIds = indexSnapshots.get(indexId); + assert snapshotIds != null; + for (final SnapshotId snapshotId : snapshotIds) { + snapshotId.toXContent(builder, params); + } + builder.endArray(); + builder.endObject(); + } + builder.endObject(); + builder.endObject(); + return builder; + } + + public static RepositoryData fromXContent(final XContentParser parser) throws IOException { + List snapshots = new ArrayList<>(); + Map> indexSnapshots = new HashMap<>(); + if (parser.nextToken() == XContentParser.Token.START_OBJECT) { + while (parser.nextToken() == XContentParser.Token.FIELD_NAME) { + String currentFieldName = parser.currentName(); + if (SNAPSHOTS.equals(currentFieldName)) { + if (parser.nextToken() == XContentParser.Token.START_ARRAY) { + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + snapshots.add(SnapshotId.fromXContent(parser)); + } + } else { + throw new ElasticsearchParseException("expected array for [" + currentFieldName + "]"); + } + } else if (INDICES.equals(currentFieldName)) { + if (parser.nextToken() != XContentParser.Token.START_OBJECT) { + throw new ElasticsearchParseException("start object expected [indices]"); + } + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String indexName = parser.currentName(); + String indexId = null; + Set snapshotIds = new LinkedHashSet<>(); + if (parser.nextToken() != XContentParser.Token.START_OBJECT) { + throw new ElasticsearchParseException("start object expected index[" + indexName + "]"); + } + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String indexMetaFieldName = parser.currentName(); + parser.nextToken(); + if (INDEX_ID.equals(indexMetaFieldName)) { + indexId = parser.text(); + } else if (SNAPSHOTS.equals(indexMetaFieldName)) { + if (parser.currentToken() != XContentParser.Token.START_ARRAY) { + throw new ElasticsearchParseException("start array expected [snapshots]"); + } + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + snapshotIds.add(SnapshotId.fromXContent(parser)); + } + } + } + assert indexId != null; + indexSnapshots.put(new IndexId(indexName, indexId), snapshotIds); + } + } else { + throw new ElasticsearchParseException("unknown field name [" + currentFieldName + "]"); + } + } + } else { + throw new ElasticsearchParseException("start object expected"); + } + return new RepositoryData(snapshots, indexSnapshots); + } + +} diff --git a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index df2d706e8d9cc..fe11a502c4290 100644 --- a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -45,6 +45,8 @@ import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.iterable.Iterables; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; import org.elasticsearch.index.snapshots.IndexShardSnapshotException; @@ -58,6 +60,8 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.Strings; @@ -103,6 +107,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import static java.util.Collections.emptyMap; @@ -119,14 +124,14 @@ * {@code * STORE_ROOT * |- index-N - list of all snapshot name as JSON array, N is the generation of the file - * |- index-latest - contains the numeric value of the latest generation of the index file (i.e. N from above) - * |- snapshot-20131010 - JSON serialized Snapshot for snapshot "20131010" + * |- index.latest - contains the numeric value of the latest generation of the index file (i.e. N from above) + * |- snap-20131010 - JSON serialized Snapshot for snapshot "20131010" * |- meta-20131010.dat - JSON serialized MetaData for snapshot "20131010" (includes only global metadata) - * |- snapshot-20131011 - JSON serialized Snapshot for snapshot "20131011" + * |- snap-20131011 - JSON serialized Snapshot for snapshot "20131011" * |- meta-20131011.dat - JSON serialized MetaData for snapshot "20131011" * ..... * |- indices/ - data for all indices - * |- foo/ - data for index "foo" + * |- Ac1342-B_x/ - data for index "foo" which was assigned the unique id of Ac1342-B_x in the repository * | |- meta-20131010.dat - JSON Serialized IndexMetaData for index "foo" * | |- 0/ - data for shard "0" of index "foo" * | | |- __1 \ @@ -146,7 +151,7 @@ * | |-2/ * | ...... * | - * |- bar/ - data for index bar + * |- 1xB0D8_B3y/ - data for index "bar" which was assigned the unique id of 1xB0D8_B3y in the repository * ...... * } * @@ -163,13 +168,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private static final String SNAPSHOT_PREFIX = "snap-"; - protected static final String SNAPSHOT_CODEC = "snapshot"; + private static final String SNAPSHOT_CODEC = "snapshot"; static final String SNAPSHOTS_FILE = "index"; // package private for unit testing - private static final String SNAPSHOTS_FILE_PREFIX = "index-"; + private static final String INDEX_FILE_PREFIX = "index-"; - private static final String SNAPSHOTS_INDEX_LATEST_BLOB = "index.latest"; + private static final String INDEX_LATEST_BLOB = "index.latest"; private static final String TESTS_FILE = "tests-"; @@ -305,7 +310,7 @@ public RepositoryMetaData getMetadata() { } @Override - public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData clusterMetadata) { + public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData clusterMetaData) { if (isReadOnly()) { throw new RepositoryException(metadata.name(), "cannot create snapshot in a readonly repository"); } @@ -315,28 +320,69 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Meta if (getSnapshots().stream().anyMatch(s -> s.getName().equals(snapshotName))) { throw new SnapshotCreationException(metadata.name(), snapshotId, "snapshot with the same name already exists"); } - if (snapshotFormat.exists(snapshotsBlobContainer, blobId(snapshotId)) || + if (snapshotFormat.exists(snapshotsBlobContainer, snapshotId.getUUID()) || snapshotLegacyFormat.exists(snapshotsBlobContainer, snapshotName)) { throw new SnapshotCreationException(metadata.name(), snapshotId, "snapshot with such name already exists"); } + // Write Global MetaData - globalMetaDataFormat.write(clusterMetadata, snapshotsBlobContainer, snapshotName); - for (String index : indices) { - final IndexMetaData indexMetaData = clusterMetadata.index(index); - final BlobPath indexPath = basePath().add("indices").add(index); + globalMetaDataFormat.write(clusterMetaData, snapshotsBlobContainer, snapshotId.getUUID()); + + // write the index metadata for each index in the snapshot + for (IndexId index : indices) { + final IndexMetaData indexMetaData = clusterMetaData.index(index.getName()); + final BlobPath indexPath = basePath().add("indices").add(index.getId()); final BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath); - indexMetaDataFormat.write(indexMetaData, indexMetaDataBlobContainer, snapshotName); + indexMetaDataFormat.write(indexMetaData, indexMetaDataBlobContainer, snapshotId.getUUID()); } } catch (IOException ex) { throw new SnapshotCreationException(metadata.name(), snapshotId, ex); } } + // Older repository index files (index-N) only contain snapshot info, not indices info, + // so if the repository data is of the older format, populate it with the indices entries + // so we know which indices of snapshots have blob ids in the older format. + private RepositoryData upgradeRepositoryData(final RepositoryData repositoryData) throws IOException { + final Map> indexToSnapshots = new HashMap<>(); + for (final SnapshotId snapshotId : repositoryData.getSnapshotIds()) { + final SnapshotInfo snapshotInfo; + try { + snapshotInfo = getSnapshotInfo(snapshotId); + } catch (SnapshotException e) { + logger.warn("[{}] repository is on a pre-5.0 format with an index file that contains snapshot [{}] but " + + "the corresponding snap-{}.dat file cannot be read. The snapshot will no longer be included in " + + "the repository but its data directories will remain.", e, getMetadata().name(), + snapshotId, snapshotId.getUUID()); + continue; + } + for (final String indexName : snapshotInfo.indices()) { + final IndexId indexId = new IndexId(indexName, indexName); + if (indexToSnapshots.containsKey(indexId)) { + indexToSnapshots.get(indexId).add(snapshotId); + } else { + indexToSnapshots.put(indexId, Sets.newHashSet(snapshotId)); + } + } + } + try { + final RepositoryData updatedRepoData = repositoryData.initIndices(indexToSnapshots); + if (isReadOnly() == false) { + // write the new index gen file with the indices included + writeIndexGen(updatedRepoData); + } + return updatedRepoData; + } catch (IOException e) { + throw new RepositoryException(metadata.name(), "failed to update the repository index blob with indices data on startup", e); + } + } + @Override public void deleteSnapshot(SnapshotId snapshotId) { if (isReadOnly()) { throw new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository"); } + final RepositoryData repositoryData = getRepositoryData(); List indices = Collections.emptyList(); SnapshotInfo snapshot = null; try { @@ -350,64 +396,29 @@ public void deleteSnapshot(SnapshotId snapshotId) { MetaData metaData = null; try { if (snapshot != null) { - metaData = readSnapshotMetaData(snapshotId, snapshot.version(), indices, true); + metaData = readSnapshotMetaData(snapshotId, snapshot.version(), repositoryData.resolveIndices(indices), true); } else { - metaData = readSnapshotMetaData(snapshotId, null, indices, true); + metaData = readSnapshotMetaData(snapshotId, null, repositoryData.resolveIndices(indices), true); } } catch (IOException | SnapshotException ex) { logger.warn("cannot read metadata for snapshot [{}]", ex, snapshotId); } try { - final String snapshotName = snapshotId.getName(); - // Delete snapshot file first so we wouldn't end up with partially deleted snapshot that looks OK - if (snapshot != null) { - try { - snapshotFormat(snapshot.version()).delete(snapshotsBlobContainer, blobId(snapshotId)); - } catch (IOException e) { - logger.debug("snapshotFormat failed to delete snapshot [{}]", snapshotId); - } - - try { - globalMetaDataFormat(snapshot.version()).delete(snapshotsBlobContainer, snapshotName); - } catch (IOException e) { - logger.debug("gloalMetaDataFormat failed to delete snapshot [{}]"); - } - } else { - // We don't know which version was the snapshot created with - try deleting both current and legacy formats - try { - snapshotFormat.delete(snapshotsBlobContainer, blobId(snapshotId)); - } catch (IOException e) { - logger.debug("snapshotFormat failed to delete snapshot [{}]"); - } + // Delete snapshot from the index file, since it is the maintainer of truth of active snapshots + writeIndexGen(repositoryData.removeSnapshot(snapshotId)); - try { - snapshotLegacyFormat.delete(snapshotsBlobContainer, snapshotName); - } catch (IOException e) { - logger.debug("snapshotLegacyFormat failed to delete snapshot [{}]"); - } - - try { - globalMetaDataLegacyFormat.delete(snapshotsBlobContainer, snapshotName); - } catch (IOException e) { - logger.debug("globalMetaDataLegacyFormat failed to delete snapshot [{}]"); - } - - try { - globalMetaDataFormat.delete(snapshotsBlobContainer, snapshotName); - } catch (IOException e) { - logger.debug("globalMetaDataFormat failed to delete snapshot [{}]"); - } - } - // Delete snapshot from the snapshot list - List snapshotIds = getSnapshots().stream().filter(id -> snapshotId.equals(id) == false).collect(Collectors.toList()); - writeSnapshotsToIndexGen(snapshotIds); + // delete the snapshot file + safeSnapshotBlobDelete(snapshot, snapshotId.getUUID()); + // delete the global metadata file + safeGlobalMetaDataBlobDelete(snapshot, snapshotId.getUUID()); // Now delete all indices for (String index : indices) { - BlobPath indexPath = basePath().add("indices").add(index); + final IndexId indexId = repositoryData.resolveIndexId(index); + BlobPath indexPath = basePath().add("indices").add(indexId.getId()); BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath); try { - indexMetaDataFormat(snapshot.version()).delete(indexMetaDataBlobContainer, snapshotId.getName()); + indexMetaDataFormat(snapshot.version()).delete(indexMetaDataBlobContainer, snapshotId.getUUID()); } catch (IOException ex) { logger.warn("[{}] failed to delete metadata for index [{}]", ex, snapshotId, index); } @@ -416,7 +427,7 @@ public void deleteSnapshot(SnapshotId snapshotId) { if (indexMetaData != null) { for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) { try { - delete(snapshotId, snapshot.version(), new ShardId(indexMetaData.getIndex(), shardId)); + delete(snapshotId, snapshot.version(), indexId, new ShardId(indexMetaData.getIndex(), shardId)); } catch (SnapshotException ex) { logger.warn("[{}] failed to delete shard data for shard [{}][{}]", ex, snapshotId, index, shardId); } @@ -429,28 +440,77 @@ public void deleteSnapshot(SnapshotId snapshotId) { } } + private void safeSnapshotBlobDelete(final SnapshotInfo snapshotInfo, final String blobId) { + if (snapshotInfo != null) { + // we know the version the snapshot was created with + try { + snapshotFormat(snapshotInfo.version()).delete(snapshotsBlobContainer, blobId); + } catch (IOException e) { + logger.warn("[{}] Unable to delete snapshot file [{}]", e, snapshotInfo.snapshotId(), blobId); + } + } else { + // we don't know the version, first try the current format, then the legacy format + try { + snapshotFormat.delete(snapshotsBlobContainer, blobId); + } catch (IOException e) { + // now try legacy format + try { + snapshotLegacyFormat.delete(snapshotsBlobContainer, blobId); + } catch (IOException e2) { + // neither snapshot file could be deleted, log the error + logger.warn("Unable to delete snapshot file [{}]", e, blobId); + } + } + } + } + + private void safeGlobalMetaDataBlobDelete(final SnapshotInfo snapshotInfo, final String blobId) { + if (snapshotInfo != null) { + // we know the version the snapshot was created with + try { + globalMetaDataFormat(snapshotInfo.version()).delete(snapshotsBlobContainer, blobId); + } catch (IOException e) { + logger.warn("[{}] Unable to delete global metadata file [{}]", e, snapshotInfo.snapshotId(), blobId); + } + } else { + // we don't know the version, first try the current format, then the legacy format + try { + globalMetaDataFormat.delete(snapshotsBlobContainer, blobId); + } catch (IOException e) { + // now try legacy format + try { + globalMetaDataLegacyFormat.delete(snapshotsBlobContainer, blobId); + } catch (IOException e2) { + // neither global metadata file could be deleted, log the error + logger.warn("Unable to delete global metadata file [{}]", e, blobId); + } + } + } + } + + /** + * {@inheritDoc} + */ @Override public SnapshotInfo finalizeSnapshot(final SnapshotId snapshotId, - final List indices, + final List indices, final long startTime, final String failure, final int totalShards, final List shardFailures) { try { SnapshotInfo blobStoreSnapshot = new SnapshotInfo(snapshotId, - indices, + indices.stream().map(IndexId::getName).collect(Collectors.toList()), startTime, failure, System.currentTimeMillis(), totalShards, shardFailures); - snapshotFormat.write(blobStoreSnapshot, snapshotsBlobContainer, blobId(snapshotId)); - List snapshotIds = getSnapshots(); + snapshotFormat.write(blobStoreSnapshot, snapshotsBlobContainer, snapshotId.getUUID()); + final RepositoryData repositoryData = getRepositoryData(); + List snapshotIds = repositoryData.getSnapshotIds(); if (!snapshotIds.contains(snapshotId)) { - snapshotIds = new ArrayList<>(snapshotIds); - snapshotIds.add(snapshotId); - snapshotIds = Collections.unmodifiableList(snapshotIds); - writeSnapshotsToIndexGen(snapshotIds); + writeIndexGen(repositoryData.addSnapshot(snapshotId, indices)); } return blobStoreSnapshot; } catch (IOException ex) { @@ -458,27 +518,19 @@ public SnapshotInfo finalizeSnapshot(final SnapshotId snapshotId, } } - @Override public List getSnapshots() { - try { - return Collections.unmodifiableList(readSnapshotsFromIndex()); - } catch (NoSuchFileException | FileNotFoundException e) { - // its a fresh repository, no index file exists, so return an empty list - return Collections.emptyList(); - } catch (IOException ioe) { - throw new RepositoryException(metadata.name(), "failed to list snapshots in repository", ioe); - } + return getRepositoryData().getSnapshotIds(); } @Override - public MetaData getSnapshotMetaData(SnapshotInfo snapshot, List indices) throws IOException { + public MetaData getSnapshotMetaData(SnapshotInfo snapshot, List indices) throws IOException { return readSnapshotMetaData(snapshot.snapshotId(), snapshot.version(), indices, false); } @Override public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) { try { - return snapshotFormat.read(snapshotsBlobContainer, blobId(snapshotId)); + return snapshotFormat.read(snapshotsBlobContainer, snapshotId.getUUID()); } catch (FileNotFoundException | NoSuchFileException ex) { // File is missing - let's try legacy format instead try { @@ -493,13 +545,13 @@ public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) { } } - private MetaData readSnapshotMetaData(SnapshotId snapshotId, Version snapshotVersion, List indices, boolean ignoreIndexErrors) throws IOException { + private MetaData readSnapshotMetaData(SnapshotId snapshotId, Version snapshotVersion, List indices, boolean ignoreIndexErrors) throws IOException { MetaData metaData; if (snapshotVersion == null) { // When we delete corrupted snapshots we might not know which version we are dealing with // We can try detecting the version based on the metadata file format assert ignoreIndexErrors; - if (globalMetaDataFormat.exists(snapshotsBlobContainer, snapshotId.getName())) { + if (globalMetaDataFormat.exists(snapshotsBlobContainer, snapshotId.getUUID())) { snapshotVersion = Version.CURRENT; } else if (globalMetaDataLegacyFormat.exists(snapshotsBlobContainer, snapshotId.getName())) { throw new SnapshotException(metadata.name(), snapshotId, "snapshot is too old"); @@ -508,21 +560,21 @@ private MetaData readSnapshotMetaData(SnapshotId snapshotId, Version snapshotVer } } try { - metaData = globalMetaDataFormat(snapshotVersion).read(snapshotsBlobContainer, snapshotId.getName()); + metaData = globalMetaDataFormat(snapshotVersion).read(snapshotsBlobContainer, snapshotId.getUUID()); } catch (FileNotFoundException | NoSuchFileException ex) { throw new SnapshotMissingException(metadata.name(), snapshotId, ex); } catch (IOException ex) { throw new SnapshotException(metadata.name(), snapshotId, "failed to get snapshots", ex); } MetaData.Builder metaDataBuilder = MetaData.builder(metaData); - for (String index : indices) { - BlobPath indexPath = basePath().add("indices").add(index); + for (IndexId index : indices) { + BlobPath indexPath = basePath().add("indices").add(index.getId()); BlobContainer indexMetaDataBlobContainer = blobStore().blobContainer(indexPath); try { - metaDataBuilder.put(indexMetaDataFormat(snapshotVersion).read(indexMetaDataBlobContainer, snapshotId.getName()), false); + metaDataBuilder.put(indexMetaDataFormat(snapshotVersion).read(indexMetaDataBlobContainer, snapshotId.getUUID()), false); } catch (ElasticsearchParseException | IOException ex) { if (ignoreIndexErrors) { - logger.warn("[{}] [{}] failed to read metadata for index", ex, snapshotId, index); + logger.warn("[{}] [{}] failed to read metadata for index", ex, snapshotId, index.getName()); } else { throw ex; } @@ -590,10 +642,6 @@ private BlobStoreFormat indexMetaDataFormat(Version version) { } } - private static final String SNAPSHOTS = "snapshots"; - private static final String NAME = "name"; - private static final String UUID = "uuid"; - @Override public long getSnapshotThrottleTimeInNanos() { return snapshotRateLimitingTimeInNanos.count(); @@ -637,6 +685,43 @@ public void endVerification(String seed) { } } + @Override + public RepositoryData getRepositoryData() { + try { + final long indexGen = latestIndexBlobId(); + final String snapshotsIndexBlobName; + final boolean legacyFormat; + if (indexGen == -1) { + // index-N file doesn't exist, either its a fresh repository, or its in the + // old format, so look for the older index file before returning an empty list + snapshotsIndexBlobName = SNAPSHOTS_FILE; + legacyFormat = true; + } else { + snapshotsIndexBlobName = INDEX_FILE_PREFIX + Long.toString(indexGen); + legacyFormat = false; + } + + RepositoryData repositoryData; + try (InputStream blob = snapshotsBlobContainer.readBlob(snapshotsIndexBlobName)) { + BytesStreamOutput out = new BytesStreamOutput(); + Streams.copy(blob, out); + try (XContentParser parser = XContentHelper.createParser(out.bytes())) { + repositoryData = RepositoryData.fromXContent(parser); + } + } + if (legacyFormat) { + // pre 5.0 repository data needs to be updated to include the indices + repositoryData = upgradeRepositoryData(repositoryData); + } + return repositoryData; + } catch (NoSuchFileException nsfe) { + // repository doesn't have an index blob, its a new blank repo + return RepositoryData.EMPTY; + } catch (IOException ioe) { + throw new RepositoryException(metadata.name(), "could not read repository data from index blob", ioe); + } + } + public static String testBlobPrefix(String seed) { return TESTS_FILE + seed; } @@ -651,35 +736,30 @@ BlobContainer blobContainer() { return snapshotsBlobContainer; } - protected void writeSnapshotsToIndexGen(final List snapshots) throws IOException { + protected void writeIndexGen(final RepositoryData repositoryData) throws IOException { assert isReadOnly() == false; // can not write to a read only repository final BytesReference snapshotsBytes; try (BytesStreamOutput bStream = new BytesStreamOutput()) { try (StreamOutput stream = new OutputStreamStreamOutput(bStream)) { XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, stream); - builder.startObject(); - builder.startArray(SNAPSHOTS); - for (SnapshotId snapshot : snapshots) { - builder.startObject(); - builder.field(NAME, snapshot.getName()); - builder.field(UUID, snapshot.getUUID()); - builder.endObject(); - } - builder.endArray(); - builder.endObject(); + repositoryData.toXContent(builder, ToXContent.EMPTY_PARAMS); builder.close(); } snapshotsBytes = bStream.bytes(); } final long gen = latestIndexBlobId() + 1; // write the index file - writeAtomic(SNAPSHOTS_FILE_PREFIX + Long.toString(gen), snapshotsBytes); + writeAtomic(INDEX_FILE_PREFIX + Long.toString(gen), snapshotsBytes); // delete the N-2 index file if it exists, keep the previous one around as a backup if (isReadOnly() == false && gen - 2 >= 0) { - final String oldSnapshotIndexFile = SNAPSHOTS_FILE_PREFIX + Long.toString(gen - 2); + final String oldSnapshotIndexFile = INDEX_FILE_PREFIX + Long.toString(gen - 2); if (snapshotsBlobContainer.blobExists(oldSnapshotIndexFile)) { snapshotsBlobContainer.deleteBlob(oldSnapshotIndexFile); } + // delete the old index file (non-generational) if it exists + if (snapshotsBlobContainer.blobExists(SNAPSHOTS_FILE)) { + snapshotsBlobContainer.deleteBlob(SNAPSHOTS_FILE); + } } // write the current generation to the index-latest file @@ -688,72 +768,10 @@ protected void writeSnapshotsToIndexGen(final List snapshots) throws bStream.writeLong(gen); genBytes = bStream.bytes(); } - if (snapshotsBlobContainer.blobExists(SNAPSHOTS_INDEX_LATEST_BLOB)) { - snapshotsBlobContainer.deleteBlob(SNAPSHOTS_INDEX_LATEST_BLOB); - } - writeAtomic(SNAPSHOTS_INDEX_LATEST_BLOB, genBytes); - } - - protected List readSnapshotsFromIndex() throws IOException { - final long indexGen = latestIndexBlobId(); - final String snapshotsIndexBlobName; - if (indexGen == -1) { - // index-N file doesn't exist, either its a fresh repository, or its in the - // old format, so look for the older index file before returning an empty list - snapshotsIndexBlobName = SNAPSHOTS_FILE; - } else { - snapshotsIndexBlobName = SNAPSHOTS_FILE_PREFIX + Long.toString(indexGen); - } - - try (InputStream blob = snapshotsBlobContainer.readBlob(snapshotsIndexBlobName)) { - BytesStreamOutput out = new BytesStreamOutput(); - Streams.copy(blob, out); - ArrayList snapshots = new ArrayList<>(); - try (XContentParser parser = XContentHelper.createParser(out.bytes())) { - if (parser.nextToken() == XContentParser.Token.START_OBJECT) { - if (parser.nextToken() == XContentParser.Token.FIELD_NAME) { - String currentFieldName = parser.currentName(); - if (SNAPSHOTS.equals(currentFieldName)) { - if (parser.nextToken() == XContentParser.Token.START_ARRAY) { - while (parser.nextToken() != XContentParser.Token.END_ARRAY) { - // the new format from 5.0 which contains the snapshot name and uuid - String name = null; - String uuid = null; - if (parser.currentToken() == XContentParser.Token.START_OBJECT) { - while (parser.nextToken() != XContentParser.Token.END_OBJECT) { - currentFieldName = parser.currentName(); - parser.nextToken(); - if (NAME.equals(currentFieldName)) { - name = parser.text(); - } else if (UUID.equals(currentFieldName)) { - uuid = parser.text(); - } - } - snapshots.add(new SnapshotId(name, uuid)); - } - // the old format pre 5.0 that only contains the snapshot name, use the name as the uuid too - else { - name = parser.text(); - snapshots.add(new SnapshotId(name, SnapshotId.UNASSIGNED_UUID)); - } - } - } - } - } - } - } - return Collections.unmodifiableList(snapshots); - } - } - - // Package private for testing - static String blobId(final SnapshotId snapshotId) { - final String uuid = snapshotId.getUUID(); - if (uuid.equals(SnapshotId.UNASSIGNED_UUID)) { - // the old snapshot blob naming - return snapshotId.getName(); + if (snapshotsBlobContainer.blobExists(INDEX_LATEST_BLOB)) { + snapshotsBlobContainer.deleteBlob(INDEX_LATEST_BLOB); } - return snapshotId.getName() + "-" + uuid; + writeAtomic(INDEX_LATEST_BLOB, genBytes); } /** @@ -790,7 +808,7 @@ long latestIndexBlobId() throws IOException { // package private for testing long readSnapshotIndexLatestBlob() throws IOException { - try (InputStream blob = snapshotsBlobContainer.readBlob(SNAPSHOTS_INDEX_LATEST_BLOB)) { + try (InputStream blob = snapshotsBlobContainer.readBlob(INDEX_LATEST_BLOB)) { BytesStreamOutput out = new BytesStreamOutput(); Streams.copy(blob, out); return Numbers.bytesToLong(out.bytes().toBytesRef()); @@ -798,7 +816,7 @@ long readSnapshotIndexLatestBlob() throws IOException { } private long listBlobsToGetLatestIndexId() throws IOException { - Map blobs = snapshotsBlobContainer.listBlobsByPrefix(SNAPSHOTS_FILE_PREFIX); + Map blobs = snapshotsBlobContainer.listBlobsByPrefix(INDEX_FILE_PREFIX); long latest = -1; if (blobs.isEmpty()) { // no snapshot index blobs have been written yet @@ -807,7 +825,7 @@ private long listBlobsToGetLatestIndexId() throws IOException { for (final BlobMetaData blobMetaData : blobs.values()) { final String blobName = blobMetaData.name(); try { - final long curr = Long.parseLong(blobName.substring(SNAPSHOTS_FILE_PREFIX.length())); + final long curr = Long.parseLong(blobName.substring(INDEX_FILE_PREFIX.length())); latest = Math.max(latest, curr); } catch (NumberFormatException nfe) { // the index- blob wasn't of the format index-N where N is a number, @@ -830,9 +848,11 @@ private void writeAtomic(final String blobName, final BytesReference bytesRef) t } } + + @Override - public void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) { - SnapshotContext snapshotContext = new SnapshotContext(shard, snapshotId, snapshotStatus); + public void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) { + SnapshotContext snapshotContext = new SnapshotContext(shard, snapshotId, indexId, snapshotStatus); snapshotStatus.startTime(System.currentTimeMillis()); try { @@ -852,8 +872,8 @@ public void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexCommit s } @Override - public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, ShardId snapshotShardId, RecoveryState recoveryState) { - final RestoreContext snapshotContext = new RestoreContext(shard, snapshotId, version, snapshotShardId, recoveryState); + public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) { + final RestoreContext snapshotContext = new RestoreContext(shard, snapshotId, version, indexId, snapshotShardId, recoveryState); try { snapshotContext.restore(); } catch (Exception e) { @@ -862,8 +882,8 @@ public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version versio } @Override - public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId) { - Context context = new Context(snapshotId, version, shardId); + public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) { + Context context = new Context(snapshotId, version, indexId, shardId); BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot(); IndexShardSnapshotStatus status = new IndexShardSnapshotStatus(); status.updateStage(IndexShardSnapshotStatus.Stage.DONE); @@ -897,8 +917,8 @@ public void verify(String seed, DiscoveryNode localNode) { * @param snapshotId snapshot id * @param shardId shard id */ - public void delete(SnapshotId snapshotId, Version version, ShardId shardId) { - Context context = new Context(snapshotId, version, shardId, shardId); + private void delete(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) { + Context context = new Context(snapshotId, version, indexId, shardId, shardId); context.delete(); } @@ -931,15 +951,15 @@ private class Context { protected final Version version; - public Context(SnapshotId snapshotId, Version version, ShardId shardId) { - this(snapshotId, version, shardId, shardId); + public Context(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) { + this(snapshotId, version, indexId, shardId, shardId); } - public Context(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId) { + public Context(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId, ShardId snapshotShardId) { this.snapshotId = snapshotId; this.version = version; this.shardId = shardId; - blobContainer = blobStore().blobContainer(basePath().add("indices").add(snapshotShardId.getIndexName()).add(Integer.toString(snapshotShardId.getId()))); + blobContainer = blobStore().blobContainer(basePath().add("indices").add(indexId.getId()).add(Integer.toString(snapshotShardId.getId()))); } /** @@ -958,7 +978,7 @@ public void delete() { int fileListGeneration = tuple.v2(); try { - indexShardSnapshotFormat(version).delete(blobContainer, snapshotId.getName()); + indexShardSnapshotFormat(version).delete(blobContainer, snapshotId.getUUID()); } catch (IOException e) { logger.debug("[{}] [{}] failed to delete shard snapshot file", shardId, snapshotId); } @@ -979,7 +999,7 @@ public void delete() { */ public BlobStoreIndexShardSnapshot loadSnapshot() { try { - return indexShardSnapshotFormat(version).read(blobContainer, snapshotId.getName()); + return indexShardSnapshotFormat(version).read(blobContainer, snapshotId.getUUID()); } catch (IOException ex) { throw new IndexShardRestoreFailedException(shardId, "failed to read shard snapshot file", ex); } @@ -1108,7 +1128,7 @@ protected Tuple buildBlobStoreIndexShardS try { BlobStoreIndexShardSnapshot snapshot = null; if (name.startsWith(SNAPSHOT_PREFIX)) { - snapshot = indexShardSnapshotFormat.readBlob(blobContainer, name); + snapshot = indexShardSnapshotFormat.readBlob(blobContainer, snapshotId.getUUID()); } else if (name.startsWith(LEGACY_SNAPSHOT_PREFIX)) { snapshot = indexShardSnapshotLegacyFormat.readBlob(blobContainer, name); } @@ -1137,10 +1157,11 @@ private class SnapshotContext extends Context { * * @param shard shard to be snapshotted * @param snapshotId snapshot id + * @param indexId the id of the index being snapshotted * @param snapshotStatus snapshot status to report progress */ - public SnapshotContext(IndexShard shard, SnapshotId snapshotId, IndexShardSnapshotStatus snapshotStatus) { - super(snapshotId, Version.CURRENT, shard.shardId()); + public SnapshotContext(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexShardSnapshotStatus snapshotStatus) { + super(snapshotId, Version.CURRENT, indexId, shard.shardId()); this.snapshotStatus = snapshotStatus; this.store = shard.store(); } @@ -1248,7 +1269,7 @@ public void snapshot(IndexCommit snapshotIndexCommit) { //TODO: The time stored in snapshot doesn't include cleanup time. logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); try { - indexShardSnapshotFormat.write(snapshot, blobContainer, snapshotId.getName()); + indexShardSnapshotFormat.write(snapshot, blobContainer, snapshotId.getUUID()); } catch (IOException e) { throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e); } @@ -1424,11 +1445,12 @@ private class RestoreContext extends Context { * * @param shard shard to restore into * @param snapshotId snapshot id + * @param indexId id of the index being restored * @param snapshotShardId shard in the snapshot that data should be restored from * @param recoveryState recovery state to report progress */ - public RestoreContext(IndexShard shard, SnapshotId snapshotId, Version version, ShardId snapshotShardId, RecoveryState recoveryState) { - super(snapshotId, version, shard.shardId(), snapshotShardId); + public RestoreContext(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) { + super(snapshotId, version, indexId, shard.shardId(), snapshotShardId); this.recoveryState = recoveryState; store = shard.store(); } @@ -1602,6 +1624,6 @@ private void restoreFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo) th } } } - } + } diff --git a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java index dedcc6d8d2163..7ab579aa455c2 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -63,8 +63,10 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.EmptyTransportResponseHandler; import org.elasticsearch.transport.TransportChannel; @@ -185,7 +187,8 @@ public void restoreSnapshot(final RestoreRequest request, final ActionListener matchingSnapshotId = repository.getSnapshots().stream() + final RepositoryData repositoryData = repository.getRepositoryData(); + final Optional matchingSnapshotId = repositoryData.getSnapshotIds().stream() .filter(s -> request.snapshotName.equals(s.getName())).findFirst(); if (matchingSnapshotId.isPresent() == false) { throw new SnapshotRestoreException(request.repositoryName, request.snapshotName, "snapshot does not exist"); @@ -194,7 +197,7 @@ public void restoreSnapshot(final RestoreRequest request, final ActionListener filteredIndices = SnapshotUtils.filterIndices(snapshotInfo.indices(), request.indices(), request.indicesOptions()); - MetaData metaDataIn = repository.getSnapshotMetaData(snapshotInfo, filteredIndices); + MetaData metaDataIn = repository.getSnapshotMetaData(snapshotInfo, repositoryData.resolveIndices(filteredIndices)); final MetaData metaData; if (snapshotInfo.version().before(Version.V_2_0_0_beta1)) { diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotId.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotId.java index 16f371b28f76c..4866a79afb95a 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotId.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotId.java @@ -22,6 +22,9 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; import java.util.Objects; @@ -29,12 +32,10 @@ /** * SnapshotId - snapshot name + snapshot UUID */ -public final class SnapshotId implements Writeable { +public final class SnapshotId implements Writeable, ToXContent { - /** - * This value is for older snapshots that don't have a UUID. - */ - public static final String UNASSIGNED_UUID = "_na_"; + private static final String NAME = "name"; + private static final String UUID = "uuid"; private final String name; private final String uuid; @@ -115,4 +116,35 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(uuid); } + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(NAME, name); + builder.field(UUID, uuid); + builder.endObject(); + return builder; + } + + public static SnapshotId fromXContent(XContentParser parser) throws IOException { + // the new format from 5.0 which contains the snapshot name and uuid + if (parser.currentToken() == XContentParser.Token.START_OBJECT) { + String name = null; + String uuid = null; + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String currentFieldName = parser.currentName(); + parser.nextToken(); + if (NAME.equals(currentFieldName)) { + name = parser.text(); + } else if (UUID.equals(currentFieldName)) { + uuid = parser.text(); + } + } + return new SnapshotId(name, uuid); + } else { + // the old format pre 5.0 that only contains the snapshot name, use the name as the uuid too + final String name = parser.text(); + return new SnapshotId(name, name); + } + } + } diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java index 2159fda22378e..ddcee4b035366 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java @@ -458,7 +458,7 @@ public static SnapshotInfo fromXContent(final XContentParser parser) throws IOEx } if (uuid == null) { // the old format where there wasn't a UUID - uuid = SnapshotId.UNASSIGNED_UUID; + uuid = name; } return new SnapshotInfo(new SnapshotId(name, uuid), indices, diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 7741ef1c0e6de..136f37eee7158 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -46,6 +46,7 @@ import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.EmptyTransportResponseHandler; @@ -66,6 +67,8 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; +import java.util.stream.Collectors; import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; @@ -208,8 +211,11 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) { Map> newSnapshots = new HashMap<>(); // Now go through all snapshots and update existing or create missing final String localNodeId = clusterService.localNode().getId(); + final Map> snapshotIndices = new HashMap<>(); if (snapshotsInProgress != null) { for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { + snapshotIndices.put(entry.snapshot(), + entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity()))); if (entry.state() == SnapshotsInProgress.State.STARTED) { Map startedShards = new HashMap<>(); SnapshotShards snapshotShards = shardSnapshots.get(entry.snapshot()); @@ -289,14 +295,18 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) { if (newSnapshots.isEmpty() == false) { Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); for (final Map.Entry> entry : newSnapshots.entrySet()) { + Map indicesMap = snapshotIndices.get(entry.getKey()); + assert indicesMap != null; for (final Map.Entry shardEntry : entry.getValue().entrySet()) { final ShardId shardId = shardEntry.getKey(); try { final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()); + final IndexId indexId = indicesMap.get(shardId.getIndexName()); + assert indexId != null; executor.execute(new AbstractRunnable() { @Override public void doRun() { - snapshot(indexShard, entry.getKey(), shardEntry.getValue()); + snapshot(indexShard, entry.getKey(), indexId, shardEntry.getValue()); updateIndexShardSnapshotStatus(entry.getKey(), shardId, new SnapshotsInProgress.ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.SUCCESS)); } @@ -321,7 +331,7 @@ public void onFailure(Exception e) { * @param snapshot snapshot * @param snapshotStatus snapshot status */ - private void snapshot(final IndexShard indexShard, final Snapshot snapshot, final IndexShardSnapshotStatus snapshotStatus) { + private void snapshot(final IndexShard indexShard, final Snapshot snapshot, final IndexId indexId, final IndexShardSnapshotStatus snapshotStatus) { Repository repository = snapshotsService.getRepositoriesService().repository(snapshot.getRepository()); ShardId shardId = indexShard.shardId(); if (!indexShard.routingEntry().primary()) { @@ -340,7 +350,7 @@ private void snapshot(final IndexShard indexShard, final Snapshot snapshot, fina // we flush first to make sure we get the latest writes snapshotted IndexCommit snapshotIndexCommit = indexShard.snapshotIndex(true); try { - repository.snapshotShard(indexShard, snapshot.getSnapshotId(), snapshotIndexCommit, snapshotStatus); + repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotIndexCommit, snapshotStatus); if (logger.isDebugEnabled()) { StringBuilder sb = new StringBuilder(); sb.append(" index : version [").append(snapshotStatus.indexVersion()).append("], number_of_files [").append(snapshotStatus.numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(snapshotStatus.totalSize())).append("]\n"); diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index df56f2a24a687..1725536205fb6 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -56,8 +56,10 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryMissingException; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.threadpool.ThreadPool; @@ -132,7 +134,7 @@ public SnapshotsService(Settings settings, ClusterService clusterService, IndexN public List snapshotIds(final String repositoryName) { Repository repository = repositoriesService.repository(repositoryName); assert repository != null; // should only be called once we've validated the repository exists - return repository.getSnapshots(); + return repository.getRepositoryData().getSnapshotIds(); } /** @@ -218,6 +220,7 @@ public void createSnapshot(final SnapshotRequest request, final CreateSnapshotLi final String snapshotName = request.snapshotName; validate(repositoryName, snapshotName); final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot + final RepositoryData repositoryData = repositoriesService.repository(repositoryName).getRepositoryData(); clusterService.submitStateUpdateTask(request.cause(), new ClusterStateUpdateTask() { @@ -232,11 +235,12 @@ public ClusterState execute(ClusterState currentState) { // Store newSnapshot here to be processed in clusterStateProcessed List indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request.indicesOptions(), request.indices())); logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices); + List snapshotIndices = repositoryData.resolveNewIndices(indices); newSnapshot = new SnapshotsInProgress.Entry(new Snapshot(repositoryName, snapshotId), request.includeGlobalState(), request.partial(), State.INIT, - indices, + snapshotIndices, System.currentTimeMillis(), null); snapshots = new SnapshotsInProgress(newSnapshot); @@ -334,8 +338,8 @@ private void beginSnapshot(final ClusterState clusterState, if (!snapshot.includeGlobalState()) { // Remove global state from the cluster state MetaData.Builder builder = MetaData.builder(); - for (String index : snapshot.indices()) { - builder.put(metaData.index(index), false); + for (IndexId index : snapshot.indices()) { + builder.put(metaData.index(index.getName()), false); } metaData = builder.build(); } @@ -473,7 +477,9 @@ private void cleanupAfterError(Exception exception) { } private SnapshotInfo inProgressSnapshot(SnapshotsInProgress.Entry entry) { - return new SnapshotInfo(entry.snapshot().getSnapshotId(), entry.indices(), entry.startTime()); + return new SnapshotInfo(entry.snapshot().getSnapshotId(), + entry.indices().stream().map(IndexId::getName).collect(Collectors.toList()), + entry.startTime()); } /** @@ -546,8 +552,10 @@ public Map snapshotShards(final String reposi final SnapshotInfo snapshotInfo) throws IOException { Map shardStatus = new HashMap<>(); Repository repository = repositoriesService.repository(repositoryName); - MetaData metaData = repository.getSnapshotMetaData(snapshotInfo, snapshotInfo.indices()); + RepositoryData repositoryData = repository.getRepositoryData(); + MetaData metaData = repository.getSnapshotMetaData(snapshotInfo, repositoryData.resolveIndices(snapshotInfo.indices())); for (String index : snapshotInfo.indices()) { + IndexId indexId = repositoryData.resolveIndexId(index); IndexMetaData indexMetaData = metaData.indices().get(index); if (indexMetaData != null) { int numberOfShards = indexMetaData.getNumberOfShards(); @@ -561,7 +569,7 @@ public Map snapshotShards(final String reposi shardStatus.put(shardId, shardSnapshotStatus); } else { IndexShardSnapshotStatus shardSnapshotStatus = - repository.getShardSnapshotStatus(snapshotInfo.snapshotId(), snapshotInfo.version(), shardId); + repository.getShardSnapshotStatus(snapshotInfo.snapshotId(), snapshotInfo.version(), indexId, shardId); shardStatus.put(shardId, shardSnapshotStatus); } } @@ -953,7 +961,10 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS public void deleteSnapshot(final String repositoryName, final String snapshotName, final DeleteSnapshotListener listener) { // First, look for the snapshot in the repository final Repository repository = repositoriesService.repository(repositoryName); - Optional matchedEntry = repository.getSnapshots().stream().filter(s -> s.getName().equals(snapshotName)).findFirst(); + Optional matchedEntry = repository.getRepositoryData().getSnapshotIds() + .stream() + .filter(s -> s.getName().equals(snapshotName)) + .findFirst(); // if nothing found by the same name, then look in the cluster state for current in progress snapshots if (matchedEntry.isPresent() == false) { matchedEntry = currentSnapshots(repositoryName, Collections.emptyList()).stream() @@ -1121,21 +1132,22 @@ private void deleteSnapshotFromRepository(final Snapshot snapshot, final DeleteS * @param indices list of indices to be snapshotted * @return list of shard to be included into current snapshot */ - private ImmutableOpenMap shards(ClusterState clusterState, List indices) { + private ImmutableOpenMap shards(ClusterState clusterState, List indices) { ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); MetaData metaData = clusterState.metaData(); - for (String index : indices) { - IndexMetaData indexMetaData = metaData.index(index); + for (IndexId index : indices) { + final String indexName = index.getName(); + IndexMetaData indexMetaData = metaData.index(indexName); if (indexMetaData == null) { // The index was deleted before we managed to start the snapshot - mark it as missing. - builder.put(new ShardId(index, IndexMetaData.INDEX_UUID_NA_VALUE, 0), new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "missing index")); + builder.put(new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, 0), new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "missing index")); } else if (indexMetaData.getState() == IndexMetaData.State.CLOSE) { for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) { ShardId shardId = new ShardId(indexMetaData.getIndex(), i); builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "index is closed")); } } else { - IndexRoutingTable indexRoutingTable = clusterState.getRoutingTable().index(index); + IndexRoutingTable indexRoutingTable = clusterState.getRoutingTable().index(indexName); for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) { ShardId shardId = new ShardId(indexMetaData.getIndex(), i); if (indexRoutingTable != null) { @@ -1191,8 +1203,8 @@ private static Set indicesToFailForCloseOrDeletion(ClusterState currentSt for (final SnapshotsInProgress.Entry entry : snapshots.entries()) { if (entry.partial() == false) { if (entry.state() == State.INIT) { - for (String index : entry.indices()) { - IndexMetaData indexMetaData = currentState.metaData().index(index); + for (IndexId index : entry.indices()) { + IndexMetaData indexMetaData = currentState.metaData().index(index.getName()); if (indexMetaData != null && indices.contains(indexMetaData)) { if (indicesToFail == null) { indicesToFail = new HashSet<>(); diff --git a/core/src/test/java/org/elasticsearch/bwcompat/RepositoryUpgradabilityIT.java b/core/src/test/java/org/elasticsearch/bwcompat/RepositoryUpgradabilityIT.java index c29d83b4454e3..9bfcc554998b5 100644 --- a/core/src/test/java/org/elasticsearch/bwcompat/RepositoryUpgradabilityIT.java +++ b/core/src/test/java/org/elasticsearch/bwcompat/RepositoryUpgradabilityIT.java @@ -46,7 +46,7 @@ * as blob names and repository blob formats have changed between the snapshot versions. */ @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST) -// this test sometimes fails in recovery when the recovery is reset, increasing the logging level to help debug +// this test sometimes fails in recovery when the recovery is reset, increasing the logging level to help debug @TestLogging("indices.recovery:DEBUG") public class RepositoryUpgradabilityIT extends AbstractSnapshotIntegTestCase { @@ -70,7 +70,7 @@ public void testRepositoryWorksWithCrossVersions() throws Exception { final Set snapshotInfos = Sets.newHashSet(getSnapshots(repoName)); assertThat(snapshotInfos.size(), equalTo(1)); SnapshotInfo originalSnapshot = snapshotInfos.iterator().next(); - assertThat(originalSnapshot.snapshotId(), equalTo(new SnapshotId("test_1", SnapshotId.UNASSIGNED_UUID))); + assertThat(originalSnapshot.snapshotId(), equalTo(new SnapshotId("test_1", "test_1"))); assertThat(Sets.newHashSet(originalSnapshot.indices()), equalTo(indices)); logger.info("--> restore the original snapshot"); diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java b/core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java index 68a0f73eb3467..9ffabec6fc00a 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java @@ -53,7 +53,6 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.snapshots.Snapshot; -import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.test.ESIntegTestCase; import java.util.Collections; @@ -659,7 +658,7 @@ public ClusterState.Custom randomCreate(String name) { randomBoolean(), randomBoolean(), SnapshotsInProgress.State.fromValue((byte) randomIntBetween(0, 6)), - Collections.emptyList(), + Collections.emptyList(), Math.abs(randomLong()), ImmutableOpenMap.of())); case 1: diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 95a705f8e27c0..a412d37c1112f 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -100,7 +100,9 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; @@ -121,8 +123,10 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; @@ -1184,9 +1188,9 @@ public void testRestoreShard() throws IOException { test_target_shard.updateRoutingEntry(routing); DiscoveryNode localNode = new DiscoveryNode("foo", LocalTransportAddress.buildUnique(), emptyMap(), emptySet(), Version.CURRENT); test_target_shard.markAsRecovering("store", new RecoveryState(routing.shardId(), routing.primary(), RecoveryState.Type.SNAPSHOT, routing.restoreSource(), localNode)); - assertTrue(test_target_shard.restoreFromRepository(new RestoreOnlyRepository() { + assertTrue(test_target_shard.restoreFromRepository(new RestoreOnlyRepository("test") { @Override - public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, ShardId snapshotShardId, RecoveryState recoveryState) { + public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) { try { cleanLuceneIndex(targetStore.directory()); for (String file : sourceStore.directory().listAll()) { @@ -1645,8 +1649,10 @@ public void testRecoverFromLocalShard() throws IOException { /** A dummy repository for testing which just needs restore overridden */ private abstract static class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository { - public RestoreOnlyRepository() { + private final String indexName; + public RestoreOnlyRepository(String indexName) { super(Settings.EMPTY); + this.indexName = indexName; } @Override protected void doStart() {} @@ -1663,17 +1669,19 @@ public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) { return null; } @Override - public MetaData getSnapshotMetaData(SnapshotInfo snapshot, List indices) throws IOException { + public MetaData getSnapshotMetaData(SnapshotInfo snapshot, List indices) throws IOException { return null; } @Override - public List getSnapshots() { - return null; + public RepositoryData getRepositoryData() { + Map> map = new HashMap<>(); + map.put(new IndexId(indexName, "blah"), Collections.emptySet()); + return new RepositoryData(Collections.emptyList(), map); } @Override - public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) {} + public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) {} @Override - public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, List shardFailures) { + public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, List shardFailures) { return null; } @Override @@ -1697,9 +1705,9 @@ public boolean isReadOnly() { return false; } @Override - public void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {} + public void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {} @Override - public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId) { + public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) { return null; } @Override diff --git a/core/src/test/java/org/elasticsearch/repositories/IndexIdTests.java b/core/src/test/java/org/elasticsearch/repositories/IndexIdTests.java new file mode 100644 index 0000000000000..30002d54a6bf7 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/repositories/IndexIdTests.java @@ -0,0 +1,89 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.repositories; + +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +/** + * Tests for the {@link IndexId} class. + */ +public class IndexIdTests extends ESTestCase { + + public void testEqualsAndHashCode() { + // assert equals and hashcode + String name = randomAsciiOfLength(8); + String id = UUIDs.randomBase64UUID(); + IndexId indexId1 = new IndexId(name, id); + IndexId indexId2 = new IndexId(name, id); + assertEquals(indexId1, indexId2); + assertEquals(indexId1.hashCode(), indexId2.hashCode()); + // assert equals when using index name for id + id = name; + indexId1 = new IndexId(name, id); + indexId2 = new IndexId(name, id); + assertEquals(indexId1, indexId2); + assertEquals(indexId1.hashCode(), indexId2.hashCode()); + //assert not equals when name or id differ + indexId2 = new IndexId(randomAsciiOfLength(8), id); + assertNotEquals(indexId1, indexId2); + assertNotEquals(indexId1.hashCode(), indexId2.hashCode()); + indexId2 = new IndexId(name, UUIDs.randomBase64UUID()); + assertNotEquals(indexId1, indexId2); + assertNotEquals(indexId1.hashCode(), indexId2.hashCode()); + } + + public void testSerialization() throws IOException { + IndexId indexId = new IndexId(randomAsciiOfLength(8), UUIDs.randomBase64UUID()); + BytesStreamOutput out = new BytesStreamOutput(); + indexId.writeTo(out); + assertEquals(indexId, new IndexId(out.bytes().streamInput())); + } + + public void testXContent() throws IOException { + IndexId indexId = new IndexId(randomAsciiOfLength(8), UUIDs.randomBase64UUID()); + XContentBuilder builder = JsonXContent.contentBuilder(); + indexId.toXContent(builder, ToXContent.EMPTY_PARAMS); + XContentParser parser = XContentType.JSON.xContent().createParser(builder.bytes()); + assertEquals(XContentParser.Token.START_OBJECT, parser.nextToken()); + String name = null; + String id = null; + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + final String currentFieldName = parser.currentName(); + parser.nextToken(); + if (currentFieldName.equals(IndexId.NAME)) { + name = parser.text(); + } else if (currentFieldName.equals(IndexId.ID)) { + id = parser.text(); + } + } + assertNotNull(name); + assertNotNull(id); + assertEquals(indexId, new IndexId(name, id)); + } +} diff --git a/core/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java b/core/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java new file mode 100644 index 0000000000000..1fb34249fd268 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java @@ -0,0 +1,171 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.repositories; + +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.hamcrest.Matchers.greaterThan; + +/** + * Tests for the {@link RepositoryData} class. + */ +public class RepositoryDataTests extends ESTestCase { + + public void testEqualsAndHashCode() { + RepositoryData repositoryData1 = generateRandomRepoData(); + RepositoryData repositoryData2 = repositoryData1.copy(); + assertEquals(repositoryData1, repositoryData2); + assertEquals(repositoryData1.hashCode(), repositoryData2.hashCode()); + } + + public void testXContent() throws IOException { + RepositoryData repositoryData = generateRandomRepoData(); + XContentBuilder builder = JsonXContent.contentBuilder(); + repositoryData.toXContent(builder, ToXContent.EMPTY_PARAMS); + XContentParser parser = XContentType.JSON.xContent().createParser(builder.bytes()); + assertEquals(repositoryData, RepositoryData.fromXContent(parser)); + } + + public void testAddSnapshots() { + RepositoryData repositoryData = generateRandomRepoData(); + // test that adding the same snapshot id to the repository data throws an exception + final SnapshotId snapshotId = repositoryData.getSnapshotIds().get(0); + Map indexIdMap = repositoryData.getIndices(); + expectThrows(IllegalArgumentException.class, + () -> repositoryData.addSnapshot(new SnapshotId(snapshotId.getName(), snapshotId.getUUID()), Collections.emptyList())); + // test that adding a snapshot and its indices works + SnapshotId newSnapshot = new SnapshotId(randomAsciiOfLength(7), UUIDs.randomBase64UUID()); + List indices = new ArrayList<>(); + Set newIndices = new HashSet<>(); + int numNew = randomIntBetween(1, 10); + for (int i = 0; i < numNew; i++) { + IndexId indexId = new IndexId(randomAsciiOfLength(7), UUIDs.randomBase64UUID()); + newIndices.add(indexId); + indices.add(indexId); + } + int numOld = randomIntBetween(1, indexIdMap.size()); + List indexNames = new ArrayList<>(indexIdMap.keySet()); + for (int i = 0; i < numOld; i++) { + indices.add(indexIdMap.get(indexNames.get(i))); + } + RepositoryData newRepoData = repositoryData.addSnapshot(newSnapshot, indices); + // verify that the new repository data has the new snapshot and its indices + assertTrue(newRepoData.getSnapshotIds().contains(newSnapshot)); + for (IndexId indexId : indices) { + Set snapshotIds = newRepoData.getSnapshots(indexId); + assertTrue(snapshotIds.contains(newSnapshot)); + if (newIndices.contains(indexId)) { + assertEquals(snapshotIds.size(), 1); // if it was a new index, only the new snapshot should be in its set + } + } + } + + public void testInitIndices() { + final int numSnapshots = randomIntBetween(1, 30); + final List snapshotIds = new ArrayList<>(numSnapshots); + for (int i = 0; i < numSnapshots; i++) { + snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID())); + } + RepositoryData repositoryData = new RepositoryData(snapshotIds, Collections.emptyMap()); + // test that initializing indices works + Map> indices = randomIndices(snapshotIds); + RepositoryData newRepoData = repositoryData.initIndices(indices); + assertEquals(repositoryData.getSnapshotIds(), newRepoData.getSnapshotIds()); + for (IndexId indexId : indices.keySet()) { + assertEquals(indices.get(indexId), newRepoData.getSnapshots(indexId)); + } + } + + public void testRemoveSnapshot() { + RepositoryData repositoryData = generateRandomRepoData(); + List snapshotIds = new ArrayList<>(repositoryData.getSnapshotIds()); + assertThat(snapshotIds.size(), greaterThan(0)); + SnapshotId removedSnapshotId = snapshotIds.remove(randomIntBetween(0, snapshotIds.size() - 1)); + RepositoryData newRepositoryData = repositoryData.removeSnapshot(removedSnapshotId); + // make sure the repository data's indices no longer contain the removed snapshot + for (final IndexId indexId : newRepositoryData.getIndices().values()) { + assertFalse(newRepositoryData.getSnapshots(indexId).contains(removedSnapshotId)); + } + } + + public void testResolveIndexId() { + RepositoryData repositoryData = generateRandomRepoData(); + Map indices = repositoryData.getIndices(); + Set indexNames = indices.keySet(); + assertThat(indexNames.size(), greaterThan(0)); + String indexName = indexNames.iterator().next(); + IndexId indexId = indices.get(indexName); + assertEquals(indexId, repositoryData.resolveIndexId(indexName)); + String notInRepoData = randomAsciiOfLength(5); + assertFalse(indexName.contains(notInRepoData)); + assertEquals(new IndexId(notInRepoData, notInRepoData), repositoryData.resolveIndexId(notInRepoData)); + } + + public static RepositoryData generateRandomRepoData() { + return generateRandomRepoData(new ArrayList<>()); + } + + public static RepositoryData generateRandomRepoData(final List origSnapshotIds) { + List snapshotIds = randomSnapshots(origSnapshotIds); + return new RepositoryData(snapshotIds, randomIndices(snapshotIds)); + } + + private static List randomSnapshots(final List origSnapshotIds) { + final int numSnapshots = randomIntBetween(1, 30); + final List snapshotIds = new ArrayList<>(origSnapshotIds); + for (int i = 0; i < numSnapshots; i++) { + snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID())); + } + return snapshotIds; + } + + private static Map> randomIndices(final List snapshotIds) { + final int totalSnapshots = snapshotIds.size(); + final int numIndices = randomIntBetween(1, 30); + final Map> indices = new HashMap<>(numIndices); + for (int i = 0; i < numIndices; i++) { + final IndexId indexId = new IndexId(randomAsciiOfLength(8), UUIDs.randomBase64UUID()); + final Set indexSnapshots = new LinkedHashSet<>(); + final int numIndicesForSnapshot = randomIntBetween(1, numIndices); + for (int j = 0; j < numIndicesForSnapshot; j++) { + indexSnapshots.add(snapshotIds.get(randomIntBetween(0, totalSnapshots - 1))); + } + indices.put(indexId, indexSnapshots); + } + return indices; + } +} diff --git a/core/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/core/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java index c9d8ff81aa256..6c4af1f773734 100644 --- a/core/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/core/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -28,11 +28,11 @@ import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESSingleNodeTestCase; @@ -44,7 +44,7 @@ import java.util.List; import java.util.stream.Collectors; -import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.blobId; +import static org.elasticsearch.repositories.RepositoryDataTests.generateRandomRepoData; import static org.hamcrest.Matchers.equalTo; /** @@ -109,86 +109,56 @@ public void testRetrieveSnapshots() throws Exception { public void testReadAndWriteSnapshotsThroughIndexFile() throws Exception { final BlobStoreRepository repository = setupRepo(); - // write to and read from a snapshot file with no entries + // write to and read from a index file with no entries assertThat(repository.getSnapshots().size(), equalTo(0)); - repository.writeSnapshotsToIndexGen(Collections.emptyList()); - assertThat(repository.getSnapshots().size(), equalTo(0)); - - // write to and read from a snapshot file with a random number of entries - final int numSnapshots = randomIntBetween(1, 1000); + final RepositoryData emptyData = RepositoryData.EMPTY; + repository.writeIndexGen(emptyData); + final RepositoryData readData = repository.getRepositoryData(); + assertEquals(readData, emptyData); + assertEquals(readData.getIndices().size(), 0); + assertEquals(readData.getSnapshotIds().size(), 0); + + // write to and read from an index file with snapshots but no indices + final int numSnapshots = randomIntBetween(1, 20); final List snapshotIds = new ArrayList<>(numSnapshots); for (int i = 0; i < numSnapshots; i++) { snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID())); } - repository.writeSnapshotsToIndexGen(snapshotIds); - assertThat(repository.getSnapshots(), equalTo(snapshotIds)); + RepositoryData repositoryData = new RepositoryData(snapshotIds, Collections.emptyMap()); + repository.writeIndexGen(repositoryData); + assertEquals(repository.getRepositoryData(), repositoryData); + + // write to and read from a index file with random repository data + repositoryData = generateRandomRepoData(); + repository.writeIndexGen(repositoryData); + assertThat(repository.getRepositoryData(), equalTo(repositoryData)); } public void testIndexGenerationalFiles() throws Exception { final BlobStoreRepository repository = setupRepo(); // write to index generational file - final int numSnapshots = randomIntBetween(1, 1000); - final List snapshotIds = new ArrayList<>(numSnapshots); - for (int i = 0; i < numSnapshots; i++) { - snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID())); - } - repository.writeSnapshotsToIndexGen(snapshotIds); - assertThat(Sets.newHashSet(repository.readSnapshotsFromIndex()), equalTo(Sets.newHashSet(snapshotIds))); + RepositoryData repositoryData = generateRandomRepoData(); + repository.writeIndexGen(repositoryData); + assertThat(repository.getRepositoryData(), equalTo(repositoryData)); assertThat(repository.latestIndexBlobId(), equalTo(0L)); assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(0L)); // adding more and writing to a new index generational file - for (int i = 0; i < 10; i++) { - snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID())); - } - repository.writeSnapshotsToIndexGen(snapshotIds); - assertThat(Sets.newHashSet(repository.readSnapshotsFromIndex()), equalTo(Sets.newHashSet(snapshotIds))); + repositoryData = generateRandomRepoData(); + repository.writeIndexGen(repositoryData); + assertEquals(repository.getRepositoryData(), repositoryData); assertThat(repository.latestIndexBlobId(), equalTo(1L)); assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(1L)); - // removing a snapshot adn writing to a new index generational file - snapshotIds.remove(0); - repository.writeSnapshotsToIndexGen(snapshotIds); - assertThat(Sets.newHashSet(repository.readSnapshotsFromIndex()), equalTo(Sets.newHashSet(snapshotIds))); + // removing a snapshot and writing to a new index generational file + repositoryData = repositoryData.removeSnapshot(repositoryData.getSnapshotIds().get(0)); + repository.writeIndexGen(repositoryData); + assertEquals(repository.getRepositoryData(), repositoryData); assertThat(repository.latestIndexBlobId(), equalTo(2L)); assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(2L)); } - public void testOldIndexFileFormat() throws Exception { - final BlobStoreRepository repository = setupRepo(); - - // write old index file format - final int numOldSnapshots = randomIntBetween(1, 50); - final List snapshotIds = new ArrayList<>(); - for (int i = 0; i < numOldSnapshots; i++) { - snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), SnapshotId.UNASSIGNED_UUID)); - } - writeOldFormat(repository, snapshotIds.stream().map(SnapshotId::getName).collect(Collectors.toList())); - assertThat(Sets.newHashSet(repository.getSnapshots()), equalTo(Sets.newHashSet(snapshotIds))); - - // write to and read from a snapshot file with a random number of new entries added - final int numSnapshots = randomIntBetween(1, 1000); - for (int i = 0; i < numSnapshots; i++) { - snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID())); - } - repository.writeSnapshotsToIndexGen(snapshotIds); - assertThat(Sets.newHashSet(repository.getSnapshots()), equalTo(Sets.newHashSet(snapshotIds))); - } - - public void testBlobId() { - SnapshotId snapshotId = new SnapshotId("abc123", SnapshotId.UNASSIGNED_UUID); - assertThat(blobId(snapshotId), equalTo("abc123")); // just the snapshot name - snapshotId = new SnapshotId("abc-123", SnapshotId.UNASSIGNED_UUID); - assertThat(blobId(snapshotId), equalTo("abc-123")); // just the snapshot name - String uuid = UUIDs.randomBase64UUID(); - snapshotId = new SnapshotId("abc123", uuid); - assertThat(blobId(snapshotId), equalTo("abc123-" + uuid)); // snapshot name + '-' + uuid - uuid = UUIDs.randomBase64UUID(); - snapshotId = new SnapshotId("abc-123", uuid); - assertThat(blobId(snapshotId), equalTo("abc-123-" + uuid)); // snapshot name + '-' + uuid - } - private BlobStoreRepository setupRepo() { final Client client = client(); final Path location = ESIntegTestCase.randomRepoPath(node().settings()); diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 313bf065b0af5..27d8dad294362 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -61,7 +61,9 @@ import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.indices.InvalidIndexNameException; +import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.test.junit.annotations.TestLogging; @@ -884,7 +886,7 @@ public void testDeleteSnapshotWithMissingMetadata() throws Exception { assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); logger.info("--> delete index metadata and shard metadata"); - Path metadata = repo.resolve("meta-test-snap-1.dat"); + Path metadata = repo.resolve("meta-" + createSnapshotResponse.getSnapshotInfo().snapshotId().getUUID() + ".dat"); Files.delete(metadata); logger.info("--> delete snapshot"); @@ -917,7 +919,7 @@ public void testDeleteSnapshotWithCorruptedSnapshotFile() throws Exception { assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); logger.info("--> truncate snapshot file to make it unreadable"); - Path snapshotPath = repo.resolve("snap-test-snap-1-" + createSnapshotResponse.getSnapshotInfo().snapshotId().getUUID() + ".dat"); + Path snapshotPath = repo.resolve("snap-" + createSnapshotResponse.getSnapshotInfo().snapshotId().getUUID() + ".dat"); try(SeekableByteChannel outChan = Files.newByteChannel(snapshotPath, StandardOpenOption.WRITE)) { outChan.truncate(randomInt(10)); } @@ -2017,6 +2019,9 @@ public void testDeleteOrphanSnapshot() throws Exception { assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); logger.info("--> emulate an orphan snapshot"); + RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName()); + final RepositoryData repositoryData = repositoriesService.repository(repositoryName).getRepositoryData(); + final IndexId indexId = repositoryData.resolveIndexId(idxName); clusterService.submitStateUpdateTask("orphan snapshot test", new ClusterStateUpdateTask() { @@ -2033,7 +2038,7 @@ public ClusterState execute(ClusterState currentState) { true, false, State.ABORTED, - Collections.singletonList(idxName), + Collections.singletonList(indexId), System.currentTimeMillis(), shards.build())); return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(Collections.unmodifiableList(entries))).build(); @@ -2189,7 +2194,7 @@ public void testListCorruptedSnapshot() throws Exception { assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); logger.info("--> truncate snapshot file to make it unreadable"); - Path snapshotPath = repo.resolve("snap-test-snap-2-" + createSnapshotResponse.getSnapshotInfo().snapshotId().getUUID() + ".dat"); + Path snapshotPath = repo.resolve("snap-" + createSnapshotResponse.getSnapshotInfo().snapshotId().getUUID() + ".dat"); try(SeekableByteChannel outChan = Files.newByteChannel(snapshotPath, StandardOpenOption.WRITE)) { outChan.truncate(randomInt(10)); } diff --git a/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index d17f0ea82c900..60c5e01482800 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -48,6 +48,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.snapshots.SnapshotId; @@ -112,8 +113,8 @@ public MockRepository(RepositoryMetaData metadata, Environment environment) thro } @Override - public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData clusterMetadata) { - if (blockOnInitialization ) { + public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData clusterMetadata) { + if (blockOnInitialization) { blockExecution(); } super.initializeSnapshot(snapshotId, indices, clusterMetadata); diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/cloud/azure/blobstore/AzureBlobContainer.java b/plugins/repository-azure/src/main/java/org/elasticsearch/cloud/azure/blobstore/AzureBlobContainer.java index 6117062fc2950..2ae6a26307d17 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/cloud/azure/blobstore/AzureBlobContainer.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/cloud/azure/blobstore/AzureBlobContainer.java @@ -30,12 +30,12 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.repositories.RepositoryException; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.HttpURLConnection; import java.net.URISyntaxException; +import java.nio.file.NoSuchFileException; import java.util.Map; /** @@ -71,15 +71,11 @@ public boolean blobExists(String blobName) { public InputStream readBlob(String blobName) throws IOException { logger.trace("readBlob({})", blobName); - if (!blobExists(blobName)) { - throw new IOException("Blob [" + blobName + "] does not exist"); - } - try { return blobStore.getInputStream(blobStore.container(), buildKey(blobName)); } catch (StorageException e) { if (e.getHttpStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { - throw new FileNotFoundException(e.getMessage()); + throw new NoSuchFileException(e.getMessage()); } throw new IOException(e); } catch (URISyntaxException e) { @@ -108,7 +104,7 @@ private OutputStream createOutput(String blobName) throws IOException { return new AzureOutputStream(blobStore.getOutputStream(blobStore.container(), buildKey(blobName))); } catch (StorageException e) { if (e.getHttpStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { - throw new FileNotFoundException(e.getMessage()); + throw new NoSuchFileException(e.getMessage()); } throw new IOException(e); } catch (URISyntaxException e) { diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java index 4b4f7d6ae8e98..1cf75780ce1cd 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java @@ -32,6 +32,8 @@ import org.elasticsearch.cloud.azure.storage.AzureStorageService.Storage; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; @@ -44,7 +46,6 @@ import org.elasticsearch.repositories.RepositoryVerificationException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.snapshots.SnapshotCreationException; -import org.elasticsearch.snapshots.SnapshotId; import static org.elasticsearch.cloud.azure.storage.AzureStorageSettings.getEffectiveSetting; import static org.elasticsearch.cloud.azure.storage.AzureStorageSettings.getValue; @@ -153,7 +154,7 @@ protected ByteSizeValue chunkSize() { } @Override - public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData clusterMetadata) { + public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData clusterMetadata) { try { if (!blobStore.doesContainerExist(blobStore.container())) { logger.debug("container [{}] does not exist. Creating...", blobStore.container()); diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/cloud/azure/storage/AzureStorageServiceMock.java b/plugins/repository-azure/src/test/java/org/elasticsearch/cloud/azure/storage/AzureStorageServiceMock.java index 0f12a97b6dc34..51b5eae57ae05 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/cloud/azure/storage/AzureStorageServiceMock.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/cloud/azure/storage/AzureStorageServiceMock.java @@ -21,22 +21,19 @@ import com.microsoft.azure.storage.LocationMode; import com.microsoft.azure.storage.StorageException; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.URISyntaxException; +import java.nio.file.NoSuchFileException; import java.util.Locale; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -82,7 +79,7 @@ public void deleteBlob(String account, LocationMode mode, String container, Stri @Override public InputStream getInputStream(String account, LocationMode mode, String container, String blob) throws IOException { if (!blobExists(account, mode, container, blob)) { - throw new FileNotFoundException("missing blob [" + blob + "]"); + throw new NoSuchFileException("missing blob [" + blob + "]"); } return new ByteArrayInputStream(blobs.get(blob).toByteArray()); } diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreContainerTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreContainerTests.java index 5b161613c9bbd..85ca44205aa94 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreContainerTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreContainerTests.java @@ -22,11 +22,10 @@ import com.microsoft.azure.storage.StorageException; import org.elasticsearch.cloud.azure.blobstore.AzureBlobStore; import org.elasticsearch.cloud.azure.storage.AzureStorageServiceMock; +import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.repositories.ESBlobStoreContainerTestCase; -import org.elasticsearch.repositories.RepositoryName; -import org.elasticsearch.repositories.RepositorySettings; import java.io.IOException; import java.net.URISyntaxException; @@ -35,11 +34,9 @@ public class AzureBlobStoreContainerTests extends ESBlobStoreContainerTestCase { @Override protected BlobStore newBlobStore() throws IOException { try { - RepositoryName repositoryName = new RepositoryName("azure", "ittest"); - RepositorySettings repositorySettings = new RepositorySettings( - Settings.EMPTY, Settings.EMPTY); - AzureStorageServiceMock client = new AzureStorageServiceMock(Settings.EMPTY); - return new AzureBlobStore(repositoryName, Settings.EMPTY, repositorySettings, client); + RepositoryMetaData repositoryMetaData = new RepositoryMetaData("azure", "ittest", Settings.EMPTY); + AzureStorageServiceMock client = new AzureStorageServiceMock(); + return new AzureBlobStore(repositoryMetaData, Settings.EMPTY, client); } catch (URISyntaxException | StorageException e) { throw new IOException(e); } diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/common/blobstore/gcs/GoogleCloudStorageBlobStore.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/common/blobstore/gcs/GoogleCloudStorageBlobStore.java index 6ff5aa4181946..b63d03487b2a5 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/common/blobstore/gcs/GoogleCloudStorageBlobStore.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/common/blobstore/gcs/GoogleCloudStorageBlobStore.java @@ -41,9 +41,9 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.CountDown; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.nio.file.NoSuchFileException; import java.security.AccessController; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; @@ -196,7 +196,7 @@ InputStream readBlob(String blobName) throws IOException { } catch (GoogleJsonResponseException e) { GoogleJsonError error = e.getDetails(); if ((e.getStatusCode() == HTTP_NOT_FOUND) || ((error != null) && (error.getCode() == HTTP_NOT_FOUND))) { - throw new FileNotFoundException(e.getMessage()); + throw new NoSuchFileException(e.getMessage()); } throw e; } @@ -227,6 +227,9 @@ void writeBlob(String blobName, InputStream inputStream, long blobSize) throws I * @param blobName name of the blob */ void deleteBlob(String blobName) throws IOException { + if (!blobExists(blobName)) { + throw new NoSuchFileException("Blob [" + blobName + "] does not exist"); + } doPrivileged(() -> client.objects().delete(bucket, blobName).execute()); } diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java index 17abf7adacdfe..eac97b97b818e 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java @@ -32,9 +32,9 @@ import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; import org.elasticsearch.repositories.hdfs.HdfsBlobStore.Operation; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.nio.file.NoSuchFileException; import java.util.Collections; import java.util.EnumSet; import java.util.LinkedHashMap; @@ -69,7 +69,7 @@ public Boolean run(FileContext fileContext) throws IOException { @Override public void deleteBlob(String blobName) throws IOException { if (!blobExists(blobName)) { - throw new IOException("Blob [" + blobName + "] does not exist"); + throw new NoSuchFileException("Blob [" + blobName + "] does not exist"); } store.execute(new Operation() { @@ -93,6 +93,9 @@ public Void run(FileContext fileContext) throws IOException { @Override public InputStream readBlob(String blobName) throws IOException { + if (!blobExists(blobName)) { + throw new NoSuchFileException("Blob [" + blobName + "] does not exist"); + } // FSDataInputStream does buffering internally return store.execute(new Operation() { @Override diff --git a/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreContainerTests.java b/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreContainerTests.java index a96a8183e58f0..cdc6dd968039d 100644 --- a/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreContainerTests.java +++ b/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreContainerTests.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.AbstractFileSystem; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.repositories.ESBlobStoreContainerTestCase; @@ -55,7 +56,8 @@ public HdfsBlobStore run() { }); } - public FileContext createContext(URI uri) { + @SuppressForbidden(reason = "lesser of two evils (the other being a bunch of JNI/classloader nightmares)") + private FileContext createContext(URI uri) { // mirrors HdfsRepository.java behaviour Configuration cfg = new Configuration(true); cfg.setClassLoader(HdfsRepository.class.getClassLoader()); @@ -85,8 +87,7 @@ public FileContext createContext(URI uri) { // set file system to TestingFs to avoid a bunch of security // checks, similar to what is done in HdfsTests.java - cfg.set(String.format("fs.AbstractFileSystem.%s.impl", uri.getScheme()), - TestingFs.class.getName()); + cfg.set("fs.AbstractFileSystem." + uri.getScheme() + ".impl", TestingFs.class.getName()); // create the FileContext with our user return Subject.doAs(subject, new PrivilegedAction() { diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobContainer.java index 41604577eaedb..5659b2df1c88f 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/cloud/aws/blobstore/S3BlobContainer.java @@ -37,10 +37,10 @@ import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.io.Streams; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.file.NoSuchFileException; import java.security.AccessController; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; @@ -89,7 +89,7 @@ public InputStream readBlob(String blobName) throws IOException { } else { if (e instanceof AmazonS3Exception) { if (404 == ((AmazonS3Exception) e).getStatusCode()) { - throw new FileNotFoundException("Blob object [" + blobName + "] not found: " + e.getMessage()); + throw new NoSuchFileException("Blob object [" + blobName + "] not found: " + e.getMessage()); } } throw e; @@ -116,7 +116,7 @@ public void writeBlob(String blobName, BytesReference bytes) throws IOException @Override public void deleteBlob(String blobName) throws IOException { if (!blobExists(blobName)) { - throw new IOException("Blob [" + blobName + "] does not exist"); + throw new NoSuchFileException("Blob [" + blobName + "] does not exist"); } try { diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java index 67ad0eb73582f..aedbc946d7444 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java @@ -129,16 +129,17 @@ public void testDeleteBlob() throws IOException { } @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/15579") - public void testOverwriteFails() throws IOException { + public void testVerifyOverwriteFails() throws IOException { try (final BlobStore store = newBlobStore()) { final String blobName = "foobar"; final BlobContainer container = store.blobContainer(new BlobPath()); byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16))); final BytesArray bytesArray = new BytesArray(data); container.writeBlob(blobName, bytesArray); + // should not be able to overwrite existing blob expectThrows(IOException.class, () -> container.writeBlob(blobName, bytesArray)); container.deleteBlob(blobName); - container.writeBlob(blobName, bytesArray); // deleted it, so should be able to write it again + container.writeBlob(blobName, bytesArray); // after deleting the previous blob, we should be able to write to it again } }