Skip to content

Commit 9f88a81

Browse files
author
Ali Beyad
authored
Merge pull request #19706 from elastic/enhancement/snapshot-blob-handling
More resilient blob handling in snapshot repositories
2 parents 3869029 + 401edeb commit 9f88a81

File tree

33 files changed

+1540
-336
lines changed

33 files changed

+1540
-336
lines changed

core/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.common.xcontent.ToXContent;
3030
import org.elasticsearch.common.xcontent.XContentBuilder;
3131
import org.elasticsearch.index.shard.ShardId;
32+
import org.elasticsearch.repositories.IndexId;
3233
import org.elasticsearch.snapshots.Snapshot;
3334

3435
import java.io.IOException;
@@ -70,12 +71,12 @@ public static class Entry {
7071
private final boolean includeGlobalState;
7172
private final boolean partial;
7273
private final ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;
73-
private final List<String> indices;
74+
private final List<IndexId> indices;
7475
private final ImmutableOpenMap<String, List<ShardId>> waitingIndices;
7576
private final long startTime;
7677

77-
public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<String> indices, long startTime,
78-
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
78+
public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
79+
long startTime, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
7980
this.state = state;
8081
this.snapshot = snapshot;
8182
this.includeGlobalState = includeGlobalState;
@@ -111,7 +112,7 @@ public State state() {
111112
return state;
112113
}
113114

114-
public List<String> indices() {
115+
public List<IndexId> indices() {
115116
return indices;
116117
}
117118

@@ -377,9 +378,9 @@ public SnapshotsInProgress readFrom(StreamInput in) throws IOException {
377378
boolean partial = in.readBoolean();
378379
State state = State.fromValue(in.readByte());
379380
int indices = in.readVInt();
380-
List<String> indexBuilder = new ArrayList<>();
381+
List<IndexId> indexBuilder = new ArrayList<>();
381382
for (int j = 0; j < indices; j++) {
382-
indexBuilder.add(in.readString());
383+
indexBuilder.add(new IndexId(in.readString(), in.readString()));
383384
}
384385
long startTime = in.readLong();
385386
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> builder = ImmutableOpenMap.builder();
@@ -410,8 +411,8 @@ public void writeTo(StreamOutput out) throws IOException {
410411
out.writeBoolean(entry.partial());
411412
out.writeByte(entry.state().value());
412413
out.writeVInt(entry.indices().size());
413-
for (String index : entry.indices()) {
414-
out.writeString(index);
414+
for (IndexId index : entry.indices()) {
415+
index.writeTo(out);
415416
}
416417
out.writeLong(entry.startTime());
417418
out.writeVInt(entry.shards().size());
@@ -458,8 +459,8 @@ public void toXContent(Entry entry, XContentBuilder builder, ToXContent.Params p
458459
builder.field(STATE, entry.state());
459460
builder.startArray(INDICES);
460461
{
461-
for (String index : entry.indices()) {
462-
builder.value(index);
462+
for (IndexId index : entry.indices()) {
463+
index.toXContent(builder, params);
463464
}
464465
}
465466
builder.endArray();

core/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
import java.io.IOException;
2525
import java.io.InputStream;
26-
import java.util.Collection;
26+
import java.nio.file.NoSuchFileException;
2727
import java.util.Map;
2828

2929
/**
@@ -53,7 +53,8 @@ public interface BlobContainer {
5353
* @param blobName
5454
* The name of the blob to get an {@link InputStream} for.
5555
* @return The {@code InputStream} to read the blob.
56-
* @throws IOException if the blob does not exist or can not be read.
56+
* @throws NoSuchFileException if the blob does not exist
57+
* @throws IOException if the blob can not be read.
5758
*/
5859
InputStream readBlob(String blobName) throws IOException;
5960

@@ -95,7 +96,8 @@ public interface BlobContainer {
9596
*
9697
* @param blobName
9798
* The name of the blob to delete.
98-
* @throws IOException if the blob does not exist, or if the blob exists but could not be deleted.
99+
* @throws NoSuchFileException if the blob does not exist
100+
* @throws IOException if the blob exists but could not be deleted.
99101
*/
100102
void deleteBlob(String blobName) throws IOException;
101103

core/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,16 @@
2727
import org.elasticsearch.common.io.Streams;
2828

2929
import java.io.BufferedInputStream;
30+
import java.io.FileNotFoundException;
3031
import java.io.IOException;
3132
import java.io.InputStream;
3233
import java.io.OutputStream;
3334
import java.nio.file.DirectoryStream;
3435
import java.nio.file.Files;
36+
import java.nio.file.NoSuchFileException;
3537
import java.nio.file.Path;
3638
import java.nio.file.StandardCopyOption;
39+
import java.nio.file.StandardOpenOption;
3740
import java.nio.file.attribute.BasicFileAttributes;
3841
import java.util.HashMap;
3942
import java.util.Map;
@@ -85,7 +88,7 @@ public Map<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws
8588
@Override
8689
public void deleteBlob(String blobName) throws IOException {
8790
Path blobPath = path.resolve(blobName);
88-
Files.deleteIfExists(blobPath);
91+
Files.delete(blobPath);
8992
}
9093

9194
@Override
@@ -95,14 +98,18 @@ public boolean blobExists(String blobName) {
9598

9699
@Override
97100
public InputStream readBlob(String name) throws IOException {
98-
return new BufferedInputStream(Files.newInputStream(path.resolve(name)), blobStore.bufferSizeInBytes());
101+
final Path resolvedPath = path.resolve(name);
102+
try {
103+
return new BufferedInputStream(Files.newInputStream(resolvedPath), blobStore.bufferSizeInBytes());
104+
} catch (FileNotFoundException fnfe) {
105+
throw new NoSuchFileException("[" + name + "] blob not found");
106+
}
99107
}
100108

101109
@Override
102110
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
103111
final Path file = path.resolve(blobName);
104-
// TODO: why is this not specifying CREATE_NEW? Do we really need to be able to truncate existing files?
105-
try (OutputStream outputStream = Files.newOutputStream(file)) {
112+
try (OutputStream outputStream = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW)) {
106113
Streams.copy(inputStream, outputStream, new byte[blobStore.bufferSizeInBytes()]);
107114
}
108115
IOUtils.fsync(file, false);

core/src/main/java/org/elasticsearch/common/blobstore/support/AbstractBlobContainer.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,11 @@
2020
package org.elasticsearch.common.blobstore.support;
2121

2222
import org.elasticsearch.common.blobstore.BlobContainer;
23-
import org.elasticsearch.common.blobstore.BlobMetaData;
2423
import org.elasticsearch.common.blobstore.BlobPath;
2524
import org.elasticsearch.common.bytes.BytesReference;
2625

2726
import java.io.IOException;
2827
import java.io.InputStream;
29-
import java.util.Collection;
30-
import java.util.Map;
3128

3229
/**
3330
* A base abstract blob container that implements higher level container methods.

core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
4444
import org.elasticsearch.index.store.Store;
4545
import org.elasticsearch.indices.recovery.RecoveryState;
46+
import org.elasticsearch.repositories.IndexId;
4647
import org.elasticsearch.repositories.Repository;
4748

4849
import java.io.IOException;
@@ -394,10 +395,12 @@ private void restore(final IndexShard indexShard, final Repository repository) {
394395
translogState.totalOperationsOnStart(0);
395396
indexShard.prepareForIndexRecovery();
396397
ShardId snapshotShardId = shardId;
397-
if (!shardId.getIndexName().equals(restoreSource.index())) {
398-
snapshotShardId = new ShardId(restoreSource.index(), IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id());
398+
final String indexName = restoreSource.index();
399+
if (!shardId.getIndexName().equals(indexName)) {
400+
snapshotShardId = new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id());
399401
}
400-
repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), snapshotShardId, indexShard.recoveryState());
402+
final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName);
403+
repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState());
401404
indexShard.skipTranslogRecovery();
402405
indexShard.finalizeRecovery();
403406
indexShard.postRecovery("restore done");
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.repositories;
21+
22+
import org.elasticsearch.common.io.stream.StreamInput;
23+
import org.elasticsearch.common.io.stream.StreamOutput;
24+
import org.elasticsearch.common.io.stream.Writeable;
25+
import org.elasticsearch.common.xcontent.ToXContent;
26+
import org.elasticsearch.common.xcontent.XContentBuilder;
27+
import org.elasticsearch.index.Index;
28+
29+
import java.io.IOException;
30+
import java.util.Objects;
31+
32+
/**
33+
* Represents a single snapshotted index in the repository.
34+
*/
35+
public final class IndexId implements Writeable, ToXContent {
36+
protected static final String NAME = "name";
37+
protected static final String ID = "id";
38+
39+
private final String name;
40+
private final String id;
41+
42+
public IndexId(final String name, final String id) {
43+
this.name = name;
44+
this.id = id;
45+
}
46+
47+
public IndexId(final StreamInput in) throws IOException {
48+
this.name = in.readString();
49+
this.id = in.readString();
50+
}
51+
52+
/**
53+
* The name of the index.
54+
*/
55+
public String getName() {
56+
return name;
57+
}
58+
59+
/**
60+
* The unique ID for the index within the repository. This is *not* the same as the
61+
* index's UUID, but merely a unique file/URL friendly identifier that a repository can
62+
* use to name blobs for the index.
63+
*
64+
* We could not use the index's actual UUID (See {@link Index#getUUID()}) because in the
65+
* case of snapshot/restore, the index UUID in the snapshotted index will be different
66+
* from the index UUID assigned to it when it is restored. Hence, the actual index UUID
67+
* is not useful in the context of snapshot/restore for tying a snapshotted index to the
68+
* index it was snapshot from, and so we are using a separate UUID here.
69+
*/
70+
public String getId() {
71+
return id;
72+
}
73+
74+
@Override
75+
public String toString() {
76+
return "[" + name + "/" + id + "]";
77+
}
78+
79+
@Override
80+
public boolean equals(Object o) {
81+
if (this == o) {
82+
return true;
83+
}
84+
if (o == null || getClass() != o.getClass()) {
85+
return false;
86+
}
87+
@SuppressWarnings("unchecked") IndexId that = (IndexId) o;
88+
return Objects.equals(name, that.name) && Objects.equals(id, that.id);
89+
}
90+
91+
@Override
92+
public int hashCode() {
93+
return Objects.hash(name, id);
94+
}
95+
96+
@Override
97+
public void writeTo(final StreamOutput out) throws IOException {
98+
out.writeString(name);
99+
out.writeString(id);
100+
}
101+
102+
@Override
103+
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
104+
builder.startObject();
105+
builder.field(NAME, name);
106+
builder.field(ID, id);
107+
builder.endObject();
108+
return builder;
109+
}
110+
}

core/src/main/java/org/elasticsearch/repositories/Repository.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
* <ul>
4848
* <li>Master calls {@link #initializeSnapshot(SnapshotId, List, org.elasticsearch.cluster.metadata.MetaData)}
4949
* with list of indices that will be included into the snapshot</li>
50-
* <li>Data nodes call {@link Repository#snapshotShard(IndexShard, SnapshotId, IndexCommit, IndexShardSnapshotStatus)}
50+
* <li>Data nodes call {@link Repository#snapshotShard(IndexShard, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)}
5151
* for each shard</li>
5252
* <li>When all shard calls return master calls {@link #finalizeSnapshot} with possible list of failures</li>
5353
* </ul>
@@ -88,15 +88,14 @@ interface Factory {
8888
* @param indices list of indices
8989
* @return information about snapshot
9090
*/
91-
MetaData getSnapshotMetaData(SnapshotInfo snapshot, List<String> indices) throws IOException;
91+
MetaData getSnapshotMetaData(SnapshotInfo snapshot, List<IndexId> indices) throws IOException;
9292

9393
/**
94-
* Returns the list of snapshots currently stored in the repository that match the given predicate on the snapshot name.
95-
* To get all snapshots, the predicate filter should return true regardless of the input.
96-
*
97-
* @return snapshot list
94+
* Returns a {@link RepositoryData} to describe the data in the repository, including the snapshots
95+
* and the indices across all snapshots found in the repository. Throws a {@link RepositoryException}
96+
* if there was an error in reading the data.
9897
*/
99-
List<SnapshotId> getSnapshots();
98+
RepositoryData getRepositoryData();
10099

101100
/**
102101
* Starts snapshotting process
@@ -105,20 +104,22 @@ interface Factory {
105104
* @param indices list of indices to be snapshotted
106105
* @param metaData cluster metadata
107106
*/
108-
void initializeSnapshot(SnapshotId snapshotId, List<String> indices, MetaData metaData);
107+
void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData metaData);
109108

110109
/**
111110
* Finalizes snapshotting process
112111
* <p>
113112
* This method is called on master after all shards are snapshotted.
114113
*
115114
* @param snapshotId snapshot id
115+
* @param indices list of indices in the snapshot
116+
* @param startTime start time of the snapshot
116117
* @param failure global failure reason or null
117118
* @param totalShards total number of shards
118119
* @param shardFailures list of shard failures
119120
* @return snapshot description
120121
*/
121-
SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<String> indices, long startTime, String failure, int totalShards, List<SnapshotShardFailure> shardFailures);
122+
SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards, List<SnapshotShardFailure> shardFailures);
122123

123124
/**
124125
* Deletes snapshot
@@ -181,10 +182,11 @@ interface Factory {
181182
*
182183
* @param shard shard to be snapshotted
183184
* @param snapshotId snapshot id
185+
* @param indexId id for the index being snapshotted
184186
* @param snapshotIndexCommit commit point
185187
* @param snapshotStatus snapshot status
186188
*/
187-
void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus);
189+
void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus);
188190

189191
/**
190192
* Restores snapshot of the shard.
@@ -194,20 +196,22 @@ interface Factory {
194196
* @param shard the shard to restore the index into
195197
* @param snapshotId snapshot id
196198
* @param version version of elasticsearch that created this snapshot
199+
* @param indexId id of the index in the repository from which the restore is occurring
197200
* @param snapshotShardId shard id (in the snapshot)
198201
* @param recoveryState recovery state
199202
*/
200-
void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, ShardId snapshotShardId, RecoveryState recoveryState);
203+
void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState);
201204

202205
/**
203206
* Retrieve shard snapshot status for the stored snapshot
204207
*
205208
* @param snapshotId snapshot id
206209
* @param version version of elasticsearch that created this snapshot
210+
* @param indexId the snapshotted index id for the shard to get status for
207211
* @param shardId shard id
208212
* @return snapshot status
209213
*/
210-
IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId);
214+
IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId);
211215

212216

213217
}

0 commit comments

Comments
 (0)