From 6a808652825dcc0bea57418d8e1166ce34023a7b Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 17 Oct 2019 09:11:00 +0200 Subject: [PATCH] Cleanup FileRestoreContext Abstractions This class is only used by the blob store repository and CCR and the abstractions didn't really make sense with CCR ignoring the concrete `restoreFiles` method completely and having a method used only by the blobstore overriden as unsupported. => Moved to a more fitting set of abstractions => Dried up the stream wrapping in `BlobStoreRepository` a little now that the `restoreFile` method could be simplified Relates #48110 as it makes changing the API of `FileRestoreContext` to what is needed for async restores simpler --- .../blobstore/BlobStoreRepository.java | 69 +++++++++++++++---- .../blobstore/FileRestoreContext.java | 65 +++-------------- .../xpack/ccr/repository/CcrRepository.java | 8 +-- 3 files changed, 63 insertions(+), 79 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 59c6a248ca0f4..72e6e2a2f3227 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -22,9 +22,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexFormatTooNewException; +import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; @@ -37,6 +41,7 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Numbers; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; @@ -1149,17 +1154,51 @@ public void restoreShard(Store store, SnapshotId snapshotId, Version version, In final BlobContainer container = shardContainer(indexId, snapshotShardId); BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId); SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); - new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState, BUFFER_SIZE) { + new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState) { @Override - protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) { - final InputStream dataBlobCompositeStream = new SlicedInputStream(fileInfo.numberOfParts()) { - @Override - protected InputStream openSlice(long slice) throws IOException { - return container.readBlob(fileInfo.partName(slice)); + protected void restoreFiles(List filesToRecover, Store store) throws IOException { + // restore the files from the snapshot to the Lucene store + for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) { + logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name()); + restoreFile(fileToRecover, store); + } + } + + private void restoreFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store store) throws IOException { + boolean success = false; + + try (InputStream stream = maybeRateLimit(new SlicedInputStream(fileInfo.numberOfParts()) { + @Override + protected InputStream openSlice(long slice) throws IOException { + return container.readBlob(fileInfo.partName(slice)); + } + }, + restoreRateLimiter, restoreRateLimitingTimeInNanos)) { + try (IndexOutput indexOutput = + store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) { + final byte[] buffer = new byte[BUFFER_SIZE]; + int length; + while ((length = stream.read(buffer)) > 0) { + indexOutput.writeBytes(buffer, 0, length); + recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), length); + } + Store.verify(indexOutput); + indexOutput.close(); + store.directory().sync(Collections.singleton(fileInfo.physicalName())); + success = true; + } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { + try { + store.markStoreCorrupted(ex); + } catch (IOException e) { + logger.warn("store cannot be marked as corrupted", e); + } + throw ex; + } finally { + if (success == false) { + store.deleteQuiet(fileInfo.physicalName()); + } } - }; - return restoreRateLimiter == null ? dataBlobCompositeStream - : new RateLimitingInputStream(dataBlobCompositeStream, restoreRateLimiter, restoreRateLimitingTimeInNanos::inc); + } } }.restore(snapshotFiles, store); } catch (Exception e) { @@ -1167,6 +1206,10 @@ protected InputStream openSlice(long slice) throws IOException { } } + private static InputStream maybeRateLimit(InputStream stream, @Nullable RateLimiter rateLimiter, CounterMetric metric) { + return rateLimiter == null ? stream : new RateLimitingInputStream(stream, rateLimiter, metric::inc); + } + @Override public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) { BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(shardContainer(indexId, shardId), snapshotId); @@ -1328,13 +1371,9 @@ private void snapshotFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, IndexId for (int i = 0; i < fileInfo.numberOfParts(); i++) { final long partBytes = fileInfo.partBytes(i); - InputStream inputStream = new InputStreamIndexInput(indexInput, partBytes); - if (snapshotRateLimiter != null) { - inputStream = new RateLimitingInputStream(inputStream, snapshotRateLimiter, - snapshotRateLimitingTimeInNanos::inc); - } // Make reads abortable by mutating the snapshotStatus object - inputStream = new FilterInputStream(inputStream) { + final InputStream inputStream = new FilterInputStream(maybeRateLimit( + new InputStreamIndexInput(indexInput, partBytes), snapshotRateLimiter, snapshotRateLimitingTimeInNanos)) { @Override public int read() throws IOException { checkAborted(); diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java index 96402d0d1bbc6..8282f6f730e52 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java @@ -21,11 +21,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.index.CorruptIndexException; -import org.apache.lucene.index.IndexFormatTooNewException; -import org.apache.lucene.index.IndexFormatTooOldException; -import org.apache.lucene.store.IOContext; -import org.apache.lucene.store.IndexOutput; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.index.shard.ShardId; @@ -38,10 +33,8 @@ import org.elasticsearch.snapshots.SnapshotId; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -63,7 +56,6 @@ public abstract class FileRestoreContext { protected final RecoveryState recoveryState; protected final SnapshotId snapshotId; protected final ShardId shardId; - protected final int bufferSize; /** * Constructs new restore context @@ -71,15 +63,12 @@ public abstract class FileRestoreContext { * @param shardId shard id to restore into * @param snapshotId snapshot id * @param recoveryState recovery state to report progress - * @param bufferSize buffer size for restore */ - protected FileRestoreContext(String repositoryName, ShardId shardId, SnapshotId snapshotId, RecoveryState recoveryState, - int bufferSize) { + protected FileRestoreContext(String repositoryName, ShardId shardId, SnapshotId snapshotId, RecoveryState recoveryState) { this.repositoryName = repositoryName; this.recoveryState = recoveryState; this.snapshotId = snapshotId; this.shardId = shardId; - this.bufferSize = bufferSize; } /** @@ -210,54 +199,16 @@ public void restore(SnapshotFiles snapshotFiles, Store store) throws IOException } } - protected void restoreFiles(List filesToRecover, Store store) throws IOException { - // restore the files from the snapshot to the Lucene store - for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) { - logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name()); - restoreFile(fileToRecover, store); - } - } - - protected abstract InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo); - - @SuppressWarnings("unchecked") - private static Iterable concat(Store.RecoveryDiff diff) { - return Iterables.concat(diff.different, diff.missing); - } - /** - * Restores a file + * Restores given list of {@link BlobStoreIndexShardSnapshot.FileInfo} to the given {@link Store}. * - * @param fileInfo file to be restored + * @param filesToRecover List of files to restore + * @param store Store to restore into */ - private void restoreFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo, final Store store) throws IOException { - boolean success = false; + protected abstract void restoreFiles(List filesToRecover, Store store) throws IOException; - try (InputStream stream = fileInputStream(fileInfo)) { - try (IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) { - final byte[] buffer = new byte[bufferSize]; - int length; - while ((length = stream.read(buffer)) > 0) { - indexOutput.writeBytes(buffer, 0, length); - recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), length); - } - Store.verify(indexOutput); - indexOutput.close(); - store.directory().sync(Collections.singleton(fileInfo.physicalName())); - success = true; - } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { - try { - store.markStoreCorrupted(ex); - } catch (IOException e) { - logger.warn("store cannot be marked as corrupted", e); - } - throw ex; - } finally { - if (success == false) { - store.deleteQuiet(fileInfo.physicalName()); - } - } - } + @SuppressWarnings("unchecked") + private static Iterable concat(Store.RecoveryDiff diff) { + return Iterables.concat(diff.different, diff.missing); } - } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 6ac65c3182061..a6f3b662a3947 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -81,7 +81,6 @@ import java.io.Closeable; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -454,7 +453,7 @@ private static class RestoreSession extends FileRestoreContext implements Closea RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, ShardId shardId, RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, long mappingVersion, ThreadPool threadPool, CcrSettings ccrSettings, LongConsumer throttleListener) { - super(repositoryName, shardId, SNAPSHOT_ID, recoveryState, Math.toIntExact(ccrSettings.getChunkSize().getBytes())); + super(repositoryName, shardId, SNAPSHOT_ID, recoveryState); this.remoteClient = remoteClient; this.sessionUUID = sessionUUID; this.node = node; @@ -569,11 +568,6 @@ private void handleError(Store store, Exception e) throws IOException { } } - @Override - protected InputStream fileInputStream(FileInfo fileInfo) { - throw new UnsupportedOperationException(); - } - @Override public void close() { ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(sessionUUID, node);