Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;

import java.io.IOException;
Expand Down Expand Up @@ -70,12 +71,12 @@ public static class Entry {
private final boolean includeGlobalState;
private final boolean partial;
private final ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;
private final List<String> indices;
private final List<IndexId> indices;
private final ImmutableOpenMap<String, List<ShardId>> waitingIndices;
private final long startTime;

public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<String> indices, long startTime,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
long startTime, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
this.state = state;
this.snapshot = snapshot;
this.includeGlobalState = includeGlobalState;
Expand Down Expand Up @@ -111,7 +112,7 @@ public State state() {
return state;
}

public List<String> indices() {
public List<IndexId> indices() {
return indices;
}

Expand Down Expand Up @@ -377,9 +378,9 @@ public SnapshotsInProgress readFrom(StreamInput in) throws IOException {
boolean partial = in.readBoolean();
State state = State.fromValue(in.readByte());
int indices = in.readVInt();
List<String> indexBuilder = new ArrayList<>();
List<IndexId> indexBuilder = new ArrayList<>();
for (int j = 0; j < indices; j++) {
indexBuilder.add(in.readString());
indexBuilder.add(new IndexId(in.readString(), in.readString()));
}
long startTime = in.readLong();
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> builder = ImmutableOpenMap.builder();
Expand Down Expand Up @@ -410,8 +411,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(entry.partial());
out.writeByte(entry.state().value());
out.writeVInt(entry.indices().size());
for (String index : entry.indices()) {
out.writeString(index);
for (IndexId index : entry.indices()) {
index.writeTo(out);
}
out.writeLong(entry.startTime());
out.writeVInt(entry.shards().size());
Expand Down Expand Up @@ -458,8 +459,8 @@ public void toXContent(Entry entry, XContentBuilder builder, ToXContent.Params p
builder.field(STATE, entry.state());
builder.startArray(INDICES);
{
for (String index : entry.indices()) {
builder.value(index);
for (IndexId index : entry.indices()) {
index.toXContent(builder, params);
}
}
builder.endArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.nio.file.NoSuchFileException;
import java.util.Map;

/**
Expand Down Expand Up @@ -53,7 +53,8 @@ public interface BlobContainer {
* @param blobName
* The name of the blob to get an {@link InputStream} for.
* @return The {@code InputStream} to read the blob.
* @throws IOException if the blob does not exist or can not be read.
* @throws NoSuchFileException if the blob does not exist
* @throws IOException if the blob can not be read.
*/
InputStream readBlob(String blobName) throws IOException;

Expand Down Expand Up @@ -95,7 +96,8 @@ public interface BlobContainer {
*
* @param blobName
* The name of the blob to delete.
* @throws IOException if the blob does not exist, or if the blob exists but could not be deleted.
* @throws NoSuchFileException if the blob does not exist
* @throws IOException if the blob exists but could not be deleted.
*/
void deleteBlob(String blobName) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@
import org.elasticsearch.common.io.Streams;

import java.io.BufferedInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -95,14 +98,18 @@ public boolean blobExists(String blobName) {

@Override
public InputStream readBlob(String name) throws IOException {
return new BufferedInputStream(Files.newInputStream(path.resolve(name)), blobStore.bufferSizeInBytes());
final Path resolvedPath = path.resolve(name);
try {
return new BufferedInputStream(Files.newInputStream(resolvedPath), blobStore.bufferSizeInBytes());
} catch (FileNotFoundException fnfe) {
throw new NoSuchFileException("[" + name + "] blob not found");
}
}

@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
final Path file = path.resolve(blobName);
// TODO: why is this not specifying CREATE_NEW? Do we really need to be able to truncate existing files?
try (OutputStream outputStream = Files.newOutputStream(file)) {
try (OutputStream outputStream = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW)) {
Streams.copy(inputStream, outputStream, new byte[blobStore.bufferSizeInBytes()]);
}
IOUtils.fsync(file, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@
package org.elasticsearch.common.blobstore.support;

import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.bytes.BytesReference;

import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.Map;

/**
* A base abstract blob container that implements higher level container methods.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;

import java.io.IOException;
Expand Down Expand Up @@ -394,10 +395,12 @@ private void restore(final IndexShard indexShard, final Repository repository) {
translogState.totalOperationsOnStart(0);
indexShard.prepareForIndexRecovery();
ShardId snapshotShardId = shardId;
if (!shardId.getIndexName().equals(restoreSource.index())) {
snapshotShardId = new ShardId(restoreSource.index(), IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id());
final String indexName = restoreSource.index();
if (!shardId.getIndexName().equals(indexName)) {
snapshotShardId = new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id());
}
repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), snapshotShardId, indexShard.recoveryState());
final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName);
repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState());
indexShard.skipTranslogRecovery();
indexShard.finalizeRecovery();
indexShard.postRecovery("restore done");
Expand Down
110 changes: 110 additions & 0 deletions core/src/main/java/org/elasticsearch/repositories/IndexId.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.repositories;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.Index;

import java.io.IOException;
import java.util.Objects;

/**
* Represents a single snapshotted index in the repository.
*/
public final class IndexId implements Writeable, ToXContent {
protected static final String NAME = "name";
protected static final String ID = "id";

private final String name;
private final String id;

public IndexId(final String name, final String id) {
this.name = name;
this.id = id;
}

public IndexId(final StreamInput in) throws IOException {
this.name = in.readString();
this.id = in.readString();
}

/**
* The name of the index.
*/
public String getName() {
return name;
}

/**
* The unique ID for the index within the repository. This is *not* the same as the
* index's UUID, but merely a unique file/URL friendly identifier that a repository can
* use to name blobs for the index.
*
* We could not use the index's actual UUID (See {@link Index#getUUID()}) because in the
* case of snapshot/restore, the index UUID in the snapshotted index will be different
* from the index UUID assigned to it when it is restored. Hence, the actual index UUID
* is not useful in the context of snapshot/restore for tying a snapshotted index to the
* index it was snapshot from, and so we are using a separate UUID here.
*/
public String getId() {
return id;
}

@Override
public String toString() {
return "[" + name + "/" + id + "]";
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
@SuppressWarnings("unchecked") IndexId that = (IndexId) o;
return Objects.equals(name, that.name) && Objects.equals(id, that.id);
}

@Override
public int hashCode() {
return Objects.hash(name, id);
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
out.writeString(name);
out.writeString(id);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.field(NAME, name);
builder.field(ID, id);
builder.endObject();
return builder;
}
}
28 changes: 16 additions & 12 deletions core/src/main/java/org/elasticsearch/repositories/Repository.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
* <ul>
* <li>Master calls {@link #initializeSnapshot(SnapshotId, List, org.elasticsearch.cluster.metadata.MetaData)}
* with list of indices that will be included into the snapshot</li>
* <li>Data nodes call {@link Repository#snapshotShard(IndexShard, SnapshotId, IndexCommit, IndexShardSnapshotStatus)}
* <li>Data nodes call {@link Repository#snapshotShard(IndexShard, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)}
* for each shard</li>
* <li>When all shard calls return master calls {@link #finalizeSnapshot} with possible list of failures</li>
* </ul>
Expand Down Expand Up @@ -88,15 +88,14 @@ interface Factory {
* @param indices list of indices
* @return information about snapshot
*/
MetaData getSnapshotMetaData(SnapshotInfo snapshot, List<String> indices) throws IOException;
MetaData getSnapshotMetaData(SnapshotInfo snapshot, List<IndexId> indices) throws IOException;

/**
* Returns the list of snapshots currently stored in the repository that match the given predicate on the snapshot name.
* To get all snapshots, the predicate filter should return true regardless of the input.
*
* @return snapshot list
* Returns a {@link RepositoryData} to describe the data in the repository, including the snapshots
* and the indices across all snapshots found in the repository. Throws a {@link RepositoryException}
* if there was an error in reading the data.
*/
List<SnapshotId> getSnapshots();
RepositoryData getRepositoryData();

/**
* Starts snapshotting process
Expand All @@ -105,20 +104,22 @@ interface Factory {
* @param indices list of indices to be snapshotted
* @param metaData cluster metadata
*/
void initializeSnapshot(SnapshotId snapshotId, List<String> indices, MetaData metaData);
void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData metaData);

/**
* Finalizes snapshotting process
* <p>
* This method is called on master after all shards are snapshotted.
*
* @param snapshotId snapshot id
* @param indices list of indices in the snapshot
* @param startTime start time of the snapshot
* @param failure global failure reason or null
* @param totalShards total number of shards
* @param shardFailures list of shard failures
* @return snapshot description
*/
SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<String> indices, long startTime, String failure, int totalShards, List<SnapshotShardFailure> shardFailures);
SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards, List<SnapshotShardFailure> shardFailures);

/**
* Deletes snapshot
Expand Down Expand Up @@ -181,10 +182,11 @@ interface Factory {
*
* @param shard shard to be snapshotted
* @param snapshotId snapshot id
* @param indexId id for the index being snapshotted
* @param snapshotIndexCommit commit point
* @param snapshotStatus snapshot status
*/
void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus);
void snapshotShard(IndexShard shard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus);

/**
* Restores snapshot of the shard.
Expand All @@ -194,20 +196,22 @@ interface Factory {
* @param shard the shard to restore the index into
* @param snapshotId snapshot id
* @param version version of elasticsearch that created this snapshot
* @param indexId id of the index in the repository from which the restore is occurring
* @param snapshotShardId shard id (in the snapshot)
* @param recoveryState recovery state
*/
void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, ShardId snapshotShardId, RecoveryState recoveryState);
void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState);

/**
* Retrieve shard snapshot status for the stored snapshot
*
* @param snapshotId snapshot id
* @param version version of elasticsearch that created this snapshot
* @param indexId the snapshotted index id for the shard to get status for
* @param shardId shard id
* @return snapshot status
*/
IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, ShardId shardId);
IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId);


}
Loading