Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
Expand All @@ -37,6 +41,7 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Numbers;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
Expand Down Expand Up @@ -1193,24 +1198,62 @@ public void restoreShard(Store store, SnapshotId snapshotId, Version version, In
final BlobContainer container = shardContainer(indexId, snapshotShardId);
BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId);
SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState, BUFFER_SIZE) {
new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState) {
@Override
protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) {
final InputStream dataBlobCompositeStream = new SlicedInputStream(fileInfo.numberOfParts()) {
@Override
protected InputStream openSlice(long slice) throws IOException {
return container.readBlob(fileInfo.partName(slice));
protected void restoreFiles(List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover, Store store) throws IOException {
// restore the files from the snapshot to the Lucene store
for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) {
logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name());
restoreFile(fileToRecover, store);
}
}

private void restoreFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store store) throws IOException {
boolean success = false;

try (InputStream stream = maybeRateLimit(new SlicedInputStream(fileInfo.numberOfParts()) {
@Override
protected InputStream openSlice(long slice) throws IOException {
return container.readBlob(fileInfo.partName(slice));
}
},
restoreRateLimiter, restoreRateLimitingTimeInNanos)) {
try (IndexOutput indexOutput =
store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) {
final byte[] buffer = new byte[BUFFER_SIZE];
int length;
while ((length = stream.read(buffer)) > 0) {
indexOutput.writeBytes(buffer, 0, length);
recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), length);
}
Store.verify(indexOutput);
indexOutput.close();
store.directory().sync(Collections.singleton(fileInfo.physicalName()));
success = true;
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
try {
store.markStoreCorrupted(ex);
} catch (IOException e) {
logger.warn("store cannot be marked as corrupted", e);
}
throw ex;
} finally {
if (success == false) {
store.deleteQuiet(fileInfo.physicalName());
}
}
};
return restoreRateLimiter == null ? dataBlobCompositeStream
: new RateLimitingInputStream(dataBlobCompositeStream, restoreRateLimiter, restoreRateLimitingTimeInNanos::inc);
}
}
}.restore(snapshotFiles, store);
} catch (Exception e) {
throw new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e);
}
}

private static InputStream maybeRateLimit(InputStream stream, @Nullable RateLimiter rateLimiter, CounterMetric metric) {
return rateLimiter == null ? stream : new RateLimitingInputStream(stream, rateLimiter, metric::inc);
}

@Override
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) {
BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(shardContainer(indexId, shardId), snapshotId);
Expand Down Expand Up @@ -1372,13 +1415,9 @@ private void snapshotFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, IndexId
for (int i = 0; i < fileInfo.numberOfParts(); i++) {
final long partBytes = fileInfo.partBytes(i);

InputStream inputStream = new InputStreamIndexInput(indexInput, partBytes);
if (snapshotRateLimiter != null) {
inputStream = new RateLimitingInputStream(inputStream, snapshotRateLimiter,
snapshotRateLimitingTimeInNanos::inc);
}
// Make reads abortable by mutating the snapshotStatus object
inputStream = new FilterInputStream(inputStream) {
final InputStream inputStream = new FilterInputStream(maybeRateLimit(
new InputStreamIndexInput(indexInput, partBytes), snapshotRateLimiter, snapshotRateLimitingTimeInNanos)) {
@Override
public int read() throws IOException {
checkAborted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.index.shard.ShardId;
Expand All @@ -38,10 +33,8 @@
import org.elasticsearch.snapshots.SnapshotId;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -63,23 +56,19 @@ public abstract class FileRestoreContext {
protected final RecoveryState recoveryState;
protected final SnapshotId snapshotId;
protected final ShardId shardId;
protected final int bufferSize;

/**
* Constructs new restore context
*
* @param shardId shard id to restore into
* @param snapshotId snapshot id
* @param recoveryState recovery state to report progress
* @param bufferSize buffer size for restore
*/
protected FileRestoreContext(String repositoryName, ShardId shardId, SnapshotId snapshotId, RecoveryState recoveryState,
int bufferSize) {
protected FileRestoreContext(String repositoryName, ShardId shardId, SnapshotId snapshotId, RecoveryState recoveryState) {
this.repositoryName = repositoryName;
this.recoveryState = recoveryState;
this.snapshotId = snapshotId;
this.shardId = shardId;
this.bufferSize = bufferSize;
}

/**
Expand Down Expand Up @@ -194,54 +183,16 @@ public void restore(SnapshotFiles snapshotFiles, Store store) {
}
}

protected void restoreFiles(List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover, Store store) throws IOException {
// restore the files from the snapshot to the Lucene store
for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) {
logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name());
restoreFile(fileToRecover, store);
}
}

protected abstract InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo);
/**
* Restores given list of {@link BlobStoreIndexShardSnapshot.FileInfo} to the given {@link Store}.
*
* @param filesToRecover List of files to restore
* @param store Store to restore into
*/
protected abstract void restoreFiles(List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover, Store store) throws IOException;

@SuppressWarnings("unchecked")
private static Iterable<StoreFileMetaData> concat(Store.RecoveryDiff diff) {
return Iterables.concat(diff.different, diff.missing);
}

/**
* Restores a file
*
* @param fileInfo file to be restored
*/
private void restoreFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo, final Store store) throws IOException {
boolean success = false;

try (InputStream stream = fileInputStream(fileInfo)) {
try (IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) {
final byte[] buffer = new byte[bufferSize];
int length;
while ((length = stream.read(buffer)) > 0) {
indexOutput.writeBytes(buffer, 0, length);
recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), length);
}
Store.verify(indexOutput);
indexOutput.close();
store.directory().sync(Collections.singleton(fileInfo.physicalName()));
success = true;
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
try {
store.markStoreCorrupted(ex);
} catch (IOException e) {
logger.warn("store cannot be marked as corrupted", e);
}
throw ex;
} finally {
if (success == false) {
store.deleteQuiet(fileInfo.physicalName());
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -458,7 +457,7 @@ private static class RestoreSession extends FileRestoreContext implements Closea
RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, ShardId shardId,
RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, long mappingVersion,
ThreadPool threadPool, CcrSettings ccrSettings, LongConsumer throttleListener) {
super(repositoryName, shardId, SNAPSHOT_ID, recoveryState, Math.toIntExact(ccrSettings.getChunkSize().getBytes()));
super(repositoryName, shardId, SNAPSHOT_ID, recoveryState);
this.remoteClient = remoteClient;
this.sessionUUID = sessionUUID;
this.node = node;
Expand Down Expand Up @@ -573,11 +572,6 @@ private void handleError(Store store, Exception e) throws IOException {
}
}

@Override
protected InputStream fileInputStream(FileInfo fileInfo) {
throw new UnsupportedOperationException();
}

@Override
public void close() {
ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(sessionUUID, node);
Expand Down