diff --git a/server/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java b/server/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java index c754d6b8eae9e..f15b3b9cf3295 100644 --- a/server/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java +++ b/server/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java @@ -21,7 +21,6 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefIterator; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.ByteArray; import org.elasticsearch.common.util.PageCacheRecycler; @@ -35,17 +34,15 @@ public class PagedBytesReference extends BytesReference { private static final int PAGE_SIZE = PageCacheRecycler.BYTE_PAGE_SIZE; - private final BigArrays bigarrays; - protected final ByteArray byteArray; + private final ByteArray byteArray; private final int offset; private final int length; - public PagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int length) { - this(bigarrays, byteArray, 0, length); + public PagedBytesReference(ByteArray byteArray, int length) { + this(byteArray, 0, length); } - public PagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int from, int length) { - this.bigarrays = bigarrays; + private PagedBytesReference(ByteArray byteArray, int from, int length) { this.byteArray = byteArray; this.offset = from; this.length = length; @@ -67,7 +64,7 @@ public BytesReference slice(int from, int length) { throw new IllegalArgumentException("can't slice a buffer with length [" + length() + "], with slice parameters from [" + from + "], length [" + length + "]"); } - return new PagedBytesReference(bigarrays, byteArray, offset + from, length); + return new PagedBytesReference(byteArray, offset + from, length); } @Override diff --git a/server/src/main/java/org/elasticsearch/common/bytes/ReleasablePagedBytesReference.java b/server/src/main/java/org/elasticsearch/common/bytes/ReleasablePagedBytesReference.java index ac90e546f7eb5..209a6edc5696a 100644 --- a/server/src/main/java/org/elasticsearch/common/bytes/ReleasablePagedBytesReference.java +++ b/server/src/main/java/org/elasticsearch/common/bytes/ReleasablePagedBytesReference.java @@ -21,7 +21,6 @@ import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.ByteArray; /** @@ -32,9 +31,8 @@ public final class ReleasablePagedBytesReference extends PagedBytesReference imp private final Releasable releasable; - public ReleasablePagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int length, - Releasable releasable) { - super(bigarrays, byteArray, length); + public ReleasablePagedBytesReference(ByteArray byteArray, int length, Releasable releasable) { + super(byteArray, length); this.releasable = releasable; } diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java b/server/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java index 768dbd1d17775..ad9cde3abbc48 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java @@ -140,7 +140,7 @@ public int size() { @Override public BytesReference bytes() { - return new PagedBytesReference(bigArrays, bytes, count); + return new PagedBytesReference(bytes, count); } /** diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java b/server/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java index 5616da2db9636..725ecd1c3cc4f 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java @@ -56,7 +56,7 @@ public ReleasableBytesStreamOutput(int expectedSize, BigArrays bigArrays) { */ @Override public ReleasablePagedBytesReference bytes() { - return new ReleasablePagedBytesReference(bigArrays, bytes, count, releasable); + return new ReleasablePagedBytesReference(bytes, count, releasable); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 99cb1db3e3652..9f2297b48775b 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -22,15 +22,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexCommit; -import org.apache.lucene.index.IndexFormatTooNewException; -import org.apache.lucene.index.IndexFormatTooOldException; -import org.apache.lucene.index.IndexNotFoundException; -import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; -import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; @@ -65,7 +59,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -111,17 +104,12 @@ import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import static java.util.Collections.emptyMap; -import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName; /** @@ -864,9 +852,14 @@ public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, @Override public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) { - final RestoreContext snapshotContext = new RestoreContext(shard, snapshotId, indexId, snapshotShardId, recoveryState); + final Context context = new Context(snapshotId, indexId, shard.shardId(), snapshotShardId); + BlobPath path = basePath().add("indices").add(indexId.getId()).add(Integer.toString(snapshotShardId.getId())); + BlobContainer blobContainer = blobStore().blobContainer(path); + final RestoreContext snapshotContext = new RestoreContext(shard, snapshotId, recoveryState, blobContainer); try { - snapshotContext.restore(); + BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot(); + SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); + snapshotContext.restore(snapshotFiles); } catch (Exception e) { throw new IndexShardRestoreFailedException(shard.shardId(), "failed to restore snapshot [" + snapshotId + "]", e); } @@ -1459,216 +1452,29 @@ protected InputStream openSlice(long slice) throws IOException { /** * Context for restore operations */ - private class RestoreContext extends Context { + private class RestoreContext extends FileRestoreContext { - private final IndexShard targetShard; - - private final RecoveryState recoveryState; + private final BlobContainer blobContainer; /** * Constructs new restore context - * - * @param shard shard to restore into - * @param snapshotId snapshot id - * @param indexId id of the index being restored - * @param snapshotShardId shard in the snapshot that data should be restored from - * @param recoveryState recovery state to report progress + * @param indexShard shard to restore into + * @param snapshotId snapshot id + * @param recoveryState recovery state to report progress + * @param blobContainer the blob container to read the files from */ - RestoreContext(IndexShard shard, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) { - super(snapshotId, indexId, shard.shardId(), snapshotShardId); - this.recoveryState = recoveryState; - this.targetShard = shard; - } - - /** - * Performs restore operation - */ - public void restore() throws IOException { - final Store store = targetShard.store(); - store.incRef(); - try { - logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, metadata.name(), shardId); - BlobStoreIndexShardSnapshot snapshot = loadSnapshot(); - - if (snapshot.indexFiles().size() == 1 - && snapshot.indexFiles().get(0).physicalName().startsWith("segments_") - && snapshot.indexFiles().get(0).hasUnknownChecksum()) { - // If the shard has no documents, it will only contain a single segments_N file for the - // shard's snapshot. If we are restoring a snapshot created by a previous supported version, - // it is still possible that in that version, an empty shard has a segments_N file with an unsupported - // version (and no checksum), because we don't know the Lucene version to assign segments_N until we - // have written some data. Since the segments_N for an empty shard could have an incompatible Lucene - // version number and no checksum, even though the index itself is perfectly fine to restore, this - // empty shard would cause exceptions to be thrown. Since there is no data to restore from an empty - // shard anyway, we just create the empty shard here and then exit. - store.createEmpty(targetShard.indexSettings().getIndexMetaData().getCreationVersion().luceneVersion); - return; - } - - SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); - Store.MetadataSnapshot recoveryTargetMetadata; - try { - // this will throw an IOException if the store has no segments infos file. The - // store can still have existing files but they will be deleted just before being - // restored. - recoveryTargetMetadata = targetShard.snapshotStoreMetadata(); - } catch (IndexNotFoundException e) { - // happens when restore to an empty shard, not a big deal - logger.trace("[{}] [{}] restoring from to an empty shard", shardId, snapshotId); - recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY; - } catch (IOException e) { - logger.warn(() -> new ParameterizedMessage("{} Can't read metadata from store, will not reuse any " + - "local file while restoring", shardId), e); - recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY; - } - - final List filesToRecover = new ArrayList<>(); - final Map snapshotMetaData = new HashMap<>(); - final Map fileInfos = new HashMap<>(); - for (final BlobStoreIndexShardSnapshot.FileInfo fileInfo : snapshot.indexFiles()) { - try { - // in 1.3.3 we added additional hashes for .si / segments_N files - // to ensure we don't double the space in the repo since old snapshots - // don't have this hash we try to read that hash from the blob store - // in a bwc compatible way. - maybeRecalculateMetadataHash(blobContainer, fileInfo, recoveryTargetMetadata); - } catch (Exception e) { - // if the index is broken we might not be able to read it - logger.warn(() -> new ParameterizedMessage("{} Can't calculate hash from blog for file [{}] [{}]", - shardId, fileInfo.physicalName(), fileInfo.metadata()), e); - } - snapshotMetaData.put(fileInfo.metadata().name(), fileInfo.metadata()); - fileInfos.put(fileInfo.metadata().name(), fileInfo); - } - - final Store.MetadataSnapshot sourceMetaData = new Store.MetadataSnapshot(unmodifiableMap(snapshotMetaData), emptyMap(), 0); - - final StoreFileMetaData restoredSegmentsFile = sourceMetaData.getSegmentsFile(); - if (restoredSegmentsFile == null) { - throw new IndexShardRestoreFailedException(shardId, "Snapshot has no segments file"); - } - - final Store.RecoveryDiff diff = sourceMetaData.recoveryDiff(recoveryTargetMetadata); - for (StoreFileMetaData md : diff.identical) { - BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfos.get(md.name()); - recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), true); - if (logger.isTraceEnabled()) { - logger.trace("[{}] [{}] not_recovering [{}] from [{}], exists in local store and is same", - shardId, snapshotId, fileInfo.physicalName(), fileInfo.name()); - } - } - - for (StoreFileMetaData md : Iterables.concat(diff.different, diff.missing)) { - BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfos.get(md.name()); - filesToRecover.add(fileInfo); - recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), false); - if (logger.isTraceEnabled()) { - logger.trace("[{}] [{}] recovering [{}] from [{}], exists in local store but is different", shardId, snapshotId, - fileInfo.physicalName(), fileInfo.name()); - } - } - - if (filesToRecover.isEmpty()) { - logger.trace("no files to recover, all exists within the local store"); - } - - try { - // list of all existing store files - final List deleteIfExistFiles = Arrays.asList(store.directory().listAll()); - - // restore the files from the snapshot to the Lucene store - for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) { - // if a file with a same physical name already exist in the store we need to delete it - // before restoring it from the snapshot. We could be lenient and try to reuse the existing - // store files (and compare their names/length/checksum again with the snapshot files) but to - // avoid extra complexity we simply delete them and restore them again like StoreRecovery - // does with dangling indices. Any existing store file that is not restored from the snapshot - // will be clean up by RecoveryTarget.cleanFiles(). - final String physicalName = fileToRecover.physicalName(); - if (deleteIfExistFiles.contains(physicalName)) { - logger.trace("[{}] [{}] deleting pre-existing file [{}]", shardId, snapshotId, physicalName); - store.directory().deleteFile(physicalName); - } - - logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name()); - restoreFile(fileToRecover, store); - } - } catch (IOException ex) { - throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", ex); - } - - // read the snapshot data persisted - final SegmentInfos segmentCommitInfos; - try { - segmentCommitInfos = Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory()); - } catch (IOException e) { - throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e); - } - recoveryState.getIndex().updateVersion(segmentCommitInfos.getVersion()); - - /// now, go over and clean files that are in the store, but were not in the snapshot - try { - for (String storeFile : store.directory().listAll()) { - if (Store.isAutogenerated(storeFile) || snapshotFiles.containPhysicalIndexFile(storeFile)) { - continue; //skip write.lock, checksum files and files that exist in the snapshot - } - try { - store.deleteQuiet("restore", storeFile); - store.directory().deleteFile(storeFile); - } catch (IOException e) { - logger.warn("[{}] failed to delete file [{}] during snapshot cleanup", snapshotId, storeFile); - } - } - } catch (IOException e) { - logger.warn("[{}] failed to list directory - some of files might not be deleted", snapshotId); - } - } finally { - store.decRef(); - } + RestoreContext(IndexShard indexShard, SnapshotId snapshotId, RecoveryState recoveryState, BlobContainer blobContainer) { + super(metadata.name(), indexShard, snapshotId, recoveryState, BUFFER_SIZE); + this.blobContainer = blobContainer; } - /** - * Restores a file - * This is asynchronous method. Upon completion of the operation latch is getting counted down and any failures are - * added to the {@code failures} list - * - * @param fileInfo file to be restored - */ - private void restoreFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo, final Store store) throws IOException { - boolean success = false; - - try (InputStream partSliceStream = new PartSliceStream(blobContainer, fileInfo)) { - final InputStream stream; - if (restoreRateLimiter == null) { - stream = partSliceStream; - } else { - stream = new RateLimitingInputStream(partSliceStream, restoreRateLimiter, restoreRateLimitingTimeInNanos::inc); - } - - try (IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), - fileInfo.metadata(), IOContext.DEFAULT)) { - final byte[] buffer = new byte[BUFFER_SIZE]; - int length; - while ((length = stream.read(buffer)) > 0) { - indexOutput.writeBytes(buffer, 0, length); - recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.name(), length); - } - Store.verify(indexOutput); - indexOutput.close(); - store.directory().sync(Collections.singleton(fileInfo.physicalName())); - success = true; - } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { - try { - store.markStoreCorrupted(ex); - } catch (IOException e) { - logger.warn("store cannot be marked as corrupted", e); - } - throw ex; - } finally { - if (success == false) { - store.deleteQuiet(fileInfo.physicalName()); - } - } + @Override + protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) { + if (restoreRateLimiter == null) { + return new PartSliceStream(blobContainer, fileInfo); + } else { + RateLimitingInputStream.Listener listener = restoreRateLimitingTimeInNanos::inc; + return new RateLimitingInputStream(new PartSliceStream(blobContainer, fileInfo), restoreRateLimiter, listener); } } } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java new file mode 100644 index 0000000000000..2f837812ae2e2 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java @@ -0,0 +1,304 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.repositories.blobstore; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.IndexFormatTooNewException; +import org.apache.lucene.index.IndexFormatTooOldException; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.BytesRefBuilder; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.util.iterable.Iterables; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.StoreFileMetaData; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.snapshots.SnapshotId; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.unmodifiableMap; + +/** + * This context will execute a file restore of the lucene files. It is primarily designed to be used to + * restore from some form of a snapshot. It will setup a new store, identify files that need to be copied + * for the source, and perform the copies. Implementers must implement the functionality of opening the + * underlying file streams for snapshotted lucene file. + */ +public abstract class FileRestoreContext { + + private static final Logger logger = LogManager.getLogger(FileRestoreContext.class); + + private final String repositoryName; + private final IndexShard indexShard; + private final RecoveryState recoveryState; + private final SnapshotId snapshotId; + private final ShardId shardId; + private final int bufferSize; + + /** + * Constructs new restore context + * + * @param indexShard shard to restore into + * @param snapshotId snapshot id + * @param recoveryState recovery state to report progress + * @param bufferSize buffer size for restore + */ + protected FileRestoreContext(String repositoryName, IndexShard indexShard, SnapshotId snapshotId, RecoveryState recoveryState, + int bufferSize) { + this.repositoryName = repositoryName; + this.recoveryState = recoveryState; + this.indexShard = indexShard; + this.snapshotId = snapshotId; + this.shardId = indexShard.shardId(); + this.bufferSize = bufferSize; + } + + /** + * Performs restore operation + */ + public void restore(SnapshotFiles snapshotFiles) throws IOException { + final Store store = indexShard.store(); + store.incRef(); + try { + logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId); + + if (snapshotFiles.indexFiles().size() == 1 + && snapshotFiles.indexFiles().get(0).physicalName().startsWith("segments_") + && snapshotFiles.indexFiles().get(0).hasUnknownChecksum()) { + // If the shard has no documents, it will only contain a single segments_N file for the + // shard's snapshot. If we are restoring a snapshot created by a previous supported version, + // it is still possible that in that version, an empty shard has a segments_N file with an unsupported + // version (and no checksum), because we don't know the Lucene version to assign segments_N until we + // have written some data. Since the segments_N for an empty shard could have an incompatible Lucene + // version number and no checksum, even though the index itself is perfectly fine to restore, this + // empty shard would cause exceptions to be thrown. Since there is no data to restore from an empty + // shard anyway, we just create the empty shard here and then exit. + store.createEmpty(indexShard.indexSettings().getIndexMetaData().getCreationVersion().luceneVersion); + return; + } + + Store.MetadataSnapshot recoveryTargetMetadata; + try { + // this will throw an IOException if the store has no segments infos file. The + // store can still have existing files but they will be deleted just before being + // restored. + recoveryTargetMetadata = indexShard.snapshotStoreMetadata(); + } catch (IndexNotFoundException e) { + // happens when restore to an empty shard, not a big deal + logger.trace("[{}] [{}] restoring from to an empty shard", shardId, snapshotId); + recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY; + } catch (IOException e) { + logger.warn(new ParameterizedMessage("[{}] [{}] Can't read metadata from store, will not reuse local files during restore", + shardId, snapshotId), e); + recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY; + } + + final List filesToRecover = new ArrayList<>(); + final Map snapshotMetaData = new HashMap<>(); + final Map fileInfos = new HashMap<>(); + for (final BlobStoreIndexShardSnapshot.FileInfo fileInfo : snapshotFiles.indexFiles()) { + try { + // in 1.3.3 we added additional hashes for .si / segments_N files + // to ensure we don't double the space in the repo since old snapshots + // don't have this hash we try to read that hash from the blob store + // in a bwc compatible way. + maybeRecalculateMetadataHash(fileInfo, recoveryTargetMetadata); + } catch (Exception e) { + // if the index is broken we might not be able to read it + logger.warn(new ParameterizedMessage("[{}] Can't calculate hash from blog for file [{}] [{}]", shardId, + fileInfo.physicalName(), fileInfo.metadata()), e); + } + snapshotMetaData.put(fileInfo.metadata().name(), fileInfo.metadata()); + fileInfos.put(fileInfo.metadata().name(), fileInfo); + } + + final Store.MetadataSnapshot sourceMetaData = new Store.MetadataSnapshot(unmodifiableMap(snapshotMetaData), emptyMap(), 0); + + final StoreFileMetaData restoredSegmentsFile = sourceMetaData.getSegmentsFile(); + if (restoredSegmentsFile == null) { + throw new IndexShardRestoreFailedException(shardId, "Snapshot has no segments file"); + } + + final Store.RecoveryDiff diff = sourceMetaData.recoveryDiff(recoveryTargetMetadata); + for (StoreFileMetaData md : diff.identical) { + BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfos.get(md.name()); + recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), true); + if (logger.isTraceEnabled()) { + logger.trace("[{}] [{}] not_recovering file [{}] from [{}], exists in local store and is same", shardId, snapshotId, + fileInfo.physicalName(), fileInfo.name()); + } + } + + for (StoreFileMetaData md : concat(diff)) { + BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfos.get(md.name()); + filesToRecover.add(fileInfo); + recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), false); + if (logger.isTraceEnabled()) { + logger.trace("[{}] [{}] recovering [{}] from [{}]", shardId, snapshotId, + fileInfo.physicalName(), fileInfo.name()); + } + } + + if (filesToRecover.isEmpty()) { + logger.trace("[{}] [{}] no files to recover, all exist within the local store", shardId, snapshotId); + } + + try { + // list of all existing store files + final List deleteIfExistFiles = Arrays.asList(store.directory().listAll()); + + // restore the files from the snapshot to the Lucene store + for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) { + // if a file with a same physical name already exist in the store we need to delete it + // before restoring it from the snapshot. We could be lenient and try to reuse the existing + // store files (and compare their names/length/checksum again with the snapshot files) but to + // avoid extra complexity we simply delete them and restore them again like StoreRecovery + // does with dangling indices. Any existing store file that is not restored from the snapshot + // will be clean up by RecoveryTarget.cleanFiles(). + final String physicalName = fileToRecover.physicalName(); + if (deleteIfExistFiles.contains(physicalName)) { + logger.trace("[{}] [{}] deleting pre-existing file [{}]", shardId, snapshotId, physicalName); + store.directory().deleteFile(physicalName); + } + + logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name()); + restoreFile(fileToRecover, store); + } + } catch (IOException ex) { + throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", ex); + } + + // read the snapshot data persisted + final SegmentInfos segmentCommitInfos; + try { + segmentCommitInfos = Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory()); + } catch (IOException e) { + throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e); + } + recoveryState.getIndex().updateVersion(segmentCommitInfos.getVersion()); + + /// now, go over and clean files that are in the store, but were not in the snapshot + try { + for (String storeFile : store.directory().listAll()) { + if (Store.isAutogenerated(storeFile) || snapshotFiles.containPhysicalIndexFile(storeFile)) { + continue; //skip write.lock, checksum files and files that exist in the snapshot + } + try { + store.deleteQuiet("restore", storeFile); + store.directory().deleteFile(storeFile); + } catch (IOException e) { + logger.warn("[{}] [{}] failed to delete file [{}] during snapshot cleanup", shardId, snapshotId, storeFile); + } + } + } catch (IOException e) { + logger.warn("[{}] [{}] failed to list directory - some of files might not be deleted", shardId, snapshotId); + } + } finally { + store.decRef(); + } + } + + protected abstract InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo); + + @SuppressWarnings("unchecked") + private Iterable concat(Store.RecoveryDiff diff) { + return Iterables.concat(diff.different, diff.missing); + } + + /** + * Restores a file + * + * @param fileInfo file to be restored + */ + private void restoreFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo, final Store store) throws IOException { + boolean success = false; + + try (InputStream stream = fileInputStream(fileInfo)) { + try (IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) { + final byte[] buffer = new byte[bufferSize]; + int length; + while ((length = stream.read(buffer)) > 0) { + indexOutput.writeBytes(buffer, 0, length); + recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.name(), length); + } + Store.verify(indexOutput); + indexOutput.close(); + store.directory().sync(Collections.singleton(fileInfo.physicalName())); + success = true; + } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { + try { + store.markStoreCorrupted(ex); + } catch (IOException e) { + logger.warn("store cannot be marked as corrupted", e); + } + throw ex; + } finally { + if (success == false) { + store.deleteQuiet(fileInfo.physicalName()); + } + } + } + } + + /** + * This is a BWC layer to ensure we update the snapshots metadata with the corresponding hashes before we compare them. + * The new logic for StoreFileMetaData reads the entire {@code .si} and {@code segments.n} files to strengthen the + * comparison of the files on a per-segment / per-commit level. + */ + private void maybeRecalculateMetadataHash(final BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store.MetadataSnapshot snapshot) + throws IOException { + final StoreFileMetaData metadata; + if (fileInfo != null && (metadata = snapshot.get(fileInfo.physicalName())) != null) { + if (metadata.hash().length > 0 && fileInfo.metadata().hash().length == 0) { + // we have a hash - check if our repo has a hash too otherwise we have + // to calculate it. + // we might have multiple parts even though the file is small... make sure we read all of it. + try (InputStream stream = fileInputStream(fileInfo)) { + BytesRefBuilder builder = new BytesRefBuilder(); + Store.MetadataSnapshot.hashFile(builder, stream, fileInfo.length()); + BytesRef hash = fileInfo.metadata().hash(); // reset the file infos metadata hash + assert hash.length == 0; + hash.bytes = builder.bytes(); + hash.offset = 0; + hash.length = builder.length(); + } + } + } + } +} diff --git a/server/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTests.java b/server/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTests.java index 0938b70906f52..40e02c560a9c0 100644 --- a/server/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTests.java +++ b/server/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTests.java @@ -120,8 +120,8 @@ public void testEquals() { } // get refs & compare - BytesReference pbr = new PagedBytesReference(bigarrays, ba1, length); - BytesReference pbr2 = new PagedBytesReference(bigarrays, ba2, length); + BytesReference pbr = new PagedBytesReference(ba1, length); + BytesReference pbr2 = new PagedBytesReference(ba2, length); assertEquals(pbr, pbr2); int offsetToFlip = randomIntBetween(0, length - 1); int value = ~Byte.toUnsignedInt(ba1.get(offsetToFlip)); diff --git a/test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java b/test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java index 1a00698325974..1a907feabe24a 100644 --- a/test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java @@ -528,7 +528,7 @@ public void testEquals() throws IOException { public void testSliceEquals() { int length = randomIntBetween(100, PAGE_SIZE * randomIntBetween(2, 5)); ByteArray ba1 = bigarrays.newByteArray(length, false); - BytesReference pbr = new PagedBytesReference(bigarrays, ba1, length); + BytesReference pbr = new PagedBytesReference(ba1, length); // test equality of slices int sliceFrom = randomIntBetween(0, pbr.length()); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 367e0ded60775..0ffc9f05ff265 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -448,6 +448,16 @@ protected IndexShard newStartedShard(final boolean primary) throws IOException { return newStartedShard(primary, Settings.EMPTY, new InternalEngineFactory()); } + /** + * Creates a new empty shard and starts it. + * + * @param primary controls whether the shard will be a primary or a replica. + * @param settings the settings to use for this shard + */ + protected IndexShard newStartedShard(final boolean primary, Settings settings) throws IOException { + return newStartedShard(primary, settings, new InternalEngineFactory()); + } + /** * Creates a new empty shard with the specified settings and engine factory and starts it. * diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index b3d2d05048211..370d017a4bde7 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -61,6 +61,7 @@ import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction; import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction; import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryAction; +import org.elasticsearch.xpack.ccr.action.repositories.GetCcrRestoreFileChunkAction; import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction; import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryAction; import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; @@ -193,6 +194,8 @@ public List> getPersistentTasksExecutor(ClusterServic PutCcrRestoreSessionAction.TransportPutCcrRestoreSessionAction.class), new ActionHandler<>(ClearCcrRestoreSessionAction.INSTANCE, ClearCcrRestoreSessionAction.TransportDeleteCcrRestoreSessionAction.class), + new ActionHandler<>(GetCcrRestoreFileChunkAction.INSTANCE, + GetCcrRestoreFileChunkAction.TransportGetCcrRestoreFileChunkAction.class), // stats action new ActionHandler<>(FollowStatsAction.INSTANCE, TransportFollowStatsAction.class), new ActionHandler<>(CcrStatsAction.INSTANCE, TransportCcrStatsAction.class), diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java new file mode 100644 index 0000000000000..3f473f25c2411 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java @@ -0,0 +1,129 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action.repositories; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.bytes.ReleasablePagedBytesReference; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.ByteArray; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportActionProxy; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; + +import java.io.IOException; + +public class GetCcrRestoreFileChunkAction extends Action { + + public static final GetCcrRestoreFileChunkAction INSTANCE = new GetCcrRestoreFileChunkAction(); + public static final String NAME = "internal:admin/ccr/restore/file_chunk/get"; + + private GetCcrRestoreFileChunkAction() { + super(NAME); + } + + @Override + public GetCcrRestoreFileChunkResponse newResponse() { + throw new UnsupportedOperationException(); + } + + @Override + public Writeable.Reader getResponseReader() { + return GetCcrRestoreFileChunkResponse::new; + } + + + public static class TransportGetCcrRestoreFileChunkAction + extends HandledTransportAction { + + private final CcrRestoreSourceService restoreSourceService; + private final ThreadPool threadPool; + private final BigArrays bigArrays; + + @Inject + public TransportGetCcrRestoreFileChunkAction(BigArrays bigArrays, TransportService transportService, ActionFilters actionFilters, + CcrRestoreSourceService restoreSourceService) { + super(NAME, transportService, actionFilters, GetCcrRestoreFileChunkRequest::new); + TransportActionProxy.registerProxyAction(transportService, NAME, GetCcrRestoreFileChunkResponse::new); + this.threadPool = transportService.getThreadPool(); + this.restoreSourceService = restoreSourceService; + this.bigArrays = bigArrays; + } + + @Override + protected void doExecute(Task task, GetCcrRestoreFileChunkRequest request, + ActionListener listener) { + threadPool.generic().execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + @Override + protected void doRun() throws Exception { + int bytesRequested = request.getSize(); + ByteArray array = bigArrays.newByteArray(bytesRequested, false); + String fileName = request.getFileName(); + String sessionUUID = request.getSessionUUID(); + // This is currently safe to do because calling `onResponse` will serialize the bytes to the network layer data + // structure on the same thread. So the bytes will be copied before the reference is released. + try (ReleasablePagedBytesReference reference = new ReleasablePagedBytesReference(array, bytesRequested, array)) { + try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID)) { + long offsetAfterRead = sessionReader.readFileBytes(fileName, reference); + long offsetBeforeRead = offsetAfterRead - reference.length(); + listener.onResponse(new GetCcrRestoreFileChunkResponse(offsetBeforeRead, reference)); + } + } + } + }); + } + } + + public static class GetCcrRestoreFileChunkResponse extends ActionResponse { + + private final long offset; + private final BytesReference chunk; + + GetCcrRestoreFileChunkResponse(StreamInput streamInput) throws IOException { + super(streamInput); + offset = streamInput.readVLong(); + chunk = streamInput.readBytesReference(); + } + + GetCcrRestoreFileChunkResponse(long offset, BytesReference chunk) { + this.offset = offset; + this.chunk = chunk; + } + + public long getOffset() { + return offset; + } + + public BytesReference getChunk() { + return chunk; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVLong(offset); + out.writeBytesReference(chunk); + } + + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkRequest.java new file mode 100644 index 0000000000000..5da0efcb372f5 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkRequest.java @@ -0,0 +1,76 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action.repositories; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.transport.RemoteClusterAwareRequest; + +import java.io.IOException; + +public class GetCcrRestoreFileChunkRequest extends ActionRequest implements RemoteClusterAwareRequest { + + private final DiscoveryNode node; + private final String sessionUUID; + private final String fileName; + private final int size; + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public GetCcrRestoreFileChunkRequest(DiscoveryNode node, String sessionUUID, String fileName, int size) { + this.node = node; + this.sessionUUID = sessionUUID; + this.fileName = fileName; + this.size = size; + assert size > -1 : "The file chunk request size must be positive. Found: [" + size + "]."; + } + + GetCcrRestoreFileChunkRequest(StreamInput in) throws IOException { + super(in); + node = null; + sessionUUID = in.readString(); + fileName = in.readString(); + size = in.readVInt(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(sessionUUID); + out.writeString(fileName); + out.writeVInt(size); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + throw new UnsupportedOperationException(); + } + + String getSessionUUID() { + return sessionUUID; + } + + String getFileName() { + return fileName; + } + + int getSize() { + return size; + } + + @Override + public DiscoveryNode getPreferredTargetNode() { + assert node != null : "Target node is null"; + return node; + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java index 2a1b354f5d8ea..07ee076135a1b 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java @@ -22,6 +22,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; +import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -70,8 +71,8 @@ protected PutCcrRestoreSessionResponse shardOperation(PutCcrRestoreSessionReques if (indexShard == null) { throw new ShardNotFoundException(shardId); } - ccrRestoreService.openSession(request.getSessionUUID(), indexShard); - return new PutCcrRestoreSessionResponse(clusterService.localNode()); + Store.MetadataSnapshot storeFileMetaData = ccrRestoreService.openSession(request.getSessionUUID(), indexShard); + return new PutCcrRestoreSessionResponse(clusterService.localNode(), storeFileMetaData); } @Override @@ -95,33 +96,42 @@ protected ShardsIterator shards(ClusterState state, InternalRequest request) { public static class PutCcrRestoreSessionResponse extends ActionResponse { private DiscoveryNode node; + private Store.MetadataSnapshot storeFileMetaData; PutCcrRestoreSessionResponse() { } - PutCcrRestoreSessionResponse(DiscoveryNode node) { + PutCcrRestoreSessionResponse(DiscoveryNode node, Store.MetadataSnapshot storeFileMetaData) { this.node = node; + this.storeFileMetaData = storeFileMetaData; } PutCcrRestoreSessionResponse(StreamInput in) throws IOException { super(in); node = new DiscoveryNode(in); + storeFileMetaData = new Store.MetadataSnapshot(in); } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); node = new DiscoveryNode(in); + storeFileMetaData = new Store.MetadataSnapshot(in); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); node.writeTo(out); + storeFileMetaData.writeTo(out); } public DiscoveryNode getNode() { return node; } + + public Store.MetadataSnapshot getStoreFileMetaData() { + return storeFileMetaData; + } } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionRequest.java index 2b94193f674af..da0c43116ee76 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionRequest.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionRequest.java @@ -11,7 +11,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.store.Store; import java.io.IOException; @@ -19,16 +18,14 @@ public class PutCcrRestoreSessionRequest extends SingleShardRequest ccrMetaData = indexShard.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY); String leaderUUID = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY); Index leaderIndex = new Index(shardId.getIndexName(), leaderUUID); ShardId leaderShardId = new ShardId(leaderIndex, shardId.getId()); Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); - String sessionUUID = UUIDs.randomBase64UUID(); - PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE, - new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId, recoveryMetadata)).actionGet(); - DiscoveryNode node = response.getNode(); - // TODO: Implement file restore - closeSession(remoteClient, node, sessionUUID); + // TODO: There should be some local timeout. And if the remote cluster returns an unknown session + // response, we should be able to retry by creating a new session. + String name = metadata.name(); + try (RestoreSession restoreSession = RestoreSession.openSession(name, remoteClient, leaderShardId, indexShard, recoveryState)) { + restoreSession.restoreFiles(); + } catch (Exception e) { + throw new IndexShardRestoreFailedException(indexShard.shardId(), "failed to restore snapshot [" + snapshotId + "]", e); + } + maybeUpdateMappings(client, remoteClient, leaderIndex, indexShard.indexSettings()); } @@ -278,9 +286,108 @@ private void maybeUpdateMappings(Client localClient, Client remoteClient, Index } } - private void closeSession(Client remoteClient, DiscoveryNode node, String sessionUUID) { - ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(sessionUUID, node); - ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse response = - remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet(); + private static class RestoreSession extends FileRestoreContext implements Closeable { + + private static final int BUFFER_SIZE = 1 << 16; + + private final Client remoteClient; + private final String sessionUUID; + private final DiscoveryNode node; + private final Store.MetadataSnapshot sourceMetaData; + + RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, IndexShard indexShard, + RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData) { + super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, BUFFER_SIZE); + this.remoteClient = remoteClient; + this.sessionUUID = sessionUUID; + this.node = node; + this.sourceMetaData = sourceMetaData; + } + + static RestoreSession openSession(String repositoryName, Client remoteClient, ShardId leaderShardId, IndexShard indexShard, + RecoveryState recoveryState) { + String sessionUUID = UUIDs.randomBase64UUID(); + PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE, + new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet(); + return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShard, recoveryState, + response.getStoreFileMetaData()); + } + + void restoreFiles() throws IOException { + ArrayList fileInfos = new ArrayList<>(); + for (StoreFileMetaData fileMetaData : sourceMetaData) { + ByteSizeValue fileSize = new ByteSizeValue(fileMetaData.length()); + fileInfos.add(new BlobStoreIndexShardSnapshot.FileInfo(fileMetaData.name(), fileMetaData, fileSize)); + } + SnapshotFiles snapshotFiles = new SnapshotFiles(LATEST, fileInfos); + restore(snapshotFiles); + } + + @Override + protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) { + return new RestoreFileInputStream(remoteClient, sessionUUID, node, fileInfo.metadata()); + } + + @Override + public void close() { + ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(sessionUUID, node); + ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse response = + remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet(); + } + } + + private static class RestoreFileInputStream extends InputStream { + + private final Client remoteClient; + private final String sessionUUID; + private final DiscoveryNode node; + private final StoreFileMetaData fileToRecover; + + private long pos = 0; + + private RestoreFileInputStream(Client remoteClient, String sessionUUID, DiscoveryNode node, StoreFileMetaData fileToRecover) { + this.remoteClient = remoteClient; + this.sessionUUID = sessionUUID; + this.node = node; + this.fileToRecover = fileToRecover; + } + + + @Override + public int read() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int read(byte[] bytes, int off, int len) throws IOException { + long remainingBytes = fileToRecover.length() - pos; + if (remainingBytes <= 0) { + return 0; + } + + int bytesRequested = (int) Math.min(remainingBytes, len); + String fileName = fileToRecover.name(); + GetCcrRestoreFileChunkRequest request = new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileName, bytesRequested); + GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse response = + remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request).actionGet(); + BytesReference fileChunk = response.getChunk(); + + int bytesReceived = fileChunk.length(); + if (bytesReceived > bytesRequested) { + throw new IOException("More bytes [" + bytesReceived + "] received than requested [" + bytesRequested + "]"); + } + + long leaderOffset = response.getOffset(); + assert pos == leaderOffset : "Position [" + pos + "] should be equal to the leader file offset [" + leaderOffset + "]."; + + try (StreamInput streamInput = fileChunk.streamInput()) { + int bytesRead = streamInput.read(bytes, 0, bytesReceived); + assert bytesRead == bytesReceived : "Did not read the correct number of bytes"; + } + + pos += bytesReceived; + + return bytesReceived; + } } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java index 642036168ad7b..197d5ddbf38ff 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java @@ -8,10 +8,18 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.BytesRefIterator; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexEventListener; @@ -23,9 +31,11 @@ import java.io.Closeable; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; @@ -33,7 +43,7 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen private static final Logger logger = LogManager.getLogger(CcrRestoreSourceService.class); - private final Map onGoingRestores = ConcurrentCollections.newConcurrentMap(); + private final Map onGoingRestores = ConcurrentCollections.newConcurrentMap(); private final Map> sessionsForShard = new HashMap<>(); private final CopyOnWriteArrayList> openSessionListeners = new CopyOnWriteArrayList<>(); private final CopyOnWriteArrayList> closeSessionListeners = new CopyOnWriteArrayList<>(); @@ -48,8 +58,9 @@ public synchronized void afterIndexShardClosed(ShardId shardId, @Nullable IndexS HashSet sessions = sessionsForShard.remove(indexShard); if (sessions != null) { for (String sessionUUID : sessions) { - RestoreContext restore = onGoingRestores.remove(sessionUUID); - IOUtils.closeWhileHandlingException(restore); + RestoreSession restore = onGoingRestores.remove(sessionUUID); + assert restore != null : "Session UUID [" + sessionUUID + "] registered for shard but not found in ongoing restores"; + restore.decRef(); } } } @@ -68,7 +79,7 @@ protected void doStop() { @Override protected synchronized void doClose() throws IOException { sessionsForShard.clear(); - IOUtils.closeWhileHandlingException(onGoingRestores.values()); + onGoingRestores.values().forEach(AbstractRefCounted::decRef); onGoingRestores.clear(); } @@ -88,7 +99,7 @@ synchronized HashSet getSessionsForShard(IndexShard indexShard) { } // default visibility for testing - synchronized RestoreContext getOngoingRestore(String sessionUUID) { + synchronized RestoreSession getOngoingRestore(String sessionUUID) { return onGoingRestores.get(sessionUUID); } @@ -96,7 +107,7 @@ synchronized RestoreContext getOngoingRestore(String sessionUUID) { // complete. Or it could be for session to have been touched. public synchronized Store.MetadataSnapshot openSession(String sessionUUID, IndexShard indexShard) throws IOException { boolean success = false; - RestoreContext restore = null; + RestoreSession restore = null; try { if (onGoingRestores.containsKey(sessionUUID)) { logger.debug("not opening new session [{}] as it already exists", sessionUUID); @@ -106,46 +117,72 @@ public synchronized Store.MetadataSnapshot openSession(String sessionUUID, Index if (indexShard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(indexShard.shardId(), "cannot open ccr restore session if shard closed"); } - restore = new RestoreContext(sessionUUID, indexShard, indexShard.acquireSafeIndexCommit()); + restore = new RestoreSession(sessionUUID, indexShard, indexShard.acquireSafeIndexCommit()); onGoingRestores.put(sessionUUID, restore); openSessionListeners.forEach(c -> c.accept(sessionUUID)); - HashSet sessions = sessionsForShard.computeIfAbsent(indexShard, (s) -> new HashSet<>()); + HashSet sessions = sessionsForShard.computeIfAbsent(indexShard, (s) -> new HashSet<>()); sessions.add(sessionUUID); } Store.MetadataSnapshot metaData = restore.getMetaData(); success = true; return metaData; } finally { - if (success == false) { + if (success == false) { onGoingRestores.remove(sessionUUID); - IOUtils.closeWhileHandlingException(restore); + if (restore != null) { + restore.decRef(); + } + } + } + } + + public void closeSession(String sessionUUID) { + final RestoreSession restore; + synchronized (this) { + closeSessionListeners.forEach(c -> c.accept(sessionUUID)); + restore = onGoingRestores.remove(sessionUUID); + if (restore == null) { + logger.debug("could not close session [{}] because session not found", sessionUUID); + throw new IllegalArgumentException("session [" + sessionUUID + "] not found"); + } + HashSet sessions = sessionsForShard.get(restore.indexShard); + assert sessions != null : "No session UUIDs for shard even though one [" + sessionUUID + "] is active in ongoing restores"; + if (sessions != null) { + boolean removed = sessions.remove(sessionUUID); + assert removed : "No session found for UUID [" + sessionUUID +"]"; + if (sessions.isEmpty()) { + sessionsForShard.remove(restore.indexShard); + } } } + restore.decRef(); } - public synchronized void closeSession(String sessionUUID) { - closeSessionListeners.forEach(c -> c.accept(sessionUUID)); - RestoreContext restore = onGoingRestores.remove(sessionUUID); + public synchronized SessionReader getSessionReader(String sessionUUID) { + RestoreSession restore = onGoingRestores.get(sessionUUID); if (restore == null) { - logger.info("could not close session [{}] because session not found", sessionUUID); + logger.debug("could not get session [{}] because session not found", sessionUUID); throw new IllegalArgumentException("session [" + sessionUUID + "] not found"); } - IOUtils.closeWhileHandlingException(restore); + return new SessionReader(restore); } - private class RestoreContext implements Closeable { + private static class RestoreSession extends AbstractRefCounted { private final String sessionUUID; private final IndexShard indexShard; private final Engine.IndexCommitRef commitRef; + private final KeyedLock keyedLock = new KeyedLock<>(); + private final Map cachedInputs = new ConcurrentHashMap<>(); - private RestoreContext(String sessionUUID, IndexShard indexShard, Engine.IndexCommitRef commitRef) { + private RestoreSession(String sessionUUID, IndexShard indexShard, Engine.IndexCommitRef commitRef) { + super("restore-session"); this.sessionUUID = sessionUUID; this.indexShard = indexShard; this.commitRef = commitRef; } - Store.MetadataSnapshot getMetaData() throws IOException { + private Store.MetadataSnapshot getMetaData() throws IOException { indexShard.store().incRef(); try { return indexShard.store().getMetadata(commitRef.getIndexCommit()); @@ -154,22 +191,71 @@ Store.MetadataSnapshot getMetaData() throws IOException { } } - @Override - public void close() { - assert Thread.holdsLock(CcrRestoreSourceService.this); - removeSessionForShard(sessionUUID, indexShard); - IOUtils.closeWhileHandlingException(commitRef); - } + private long readFileBytes(String fileName, BytesReference reference) throws IOException { + Releasable lock = keyedLock.tryAcquire(fileName); + if (lock == null) { + throw new IllegalStateException("can't read from the same file on the same session concurrently"); + } + try (Releasable releasable = lock) { + final IndexInput indexInput = cachedInputs.computeIfAbsent(fileName, f -> { + try { + return commitRef.getIndexCommit().getDirectory().openInput(fileName, IOContext.READONCE); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); - private void removeSessionForShard(String sessionUUID, IndexShard indexShard) { - logger.debug("closing session [{}] for shard [{}]", sessionUUID, indexShard.shardId()); - HashSet sessions = sessionsForShard.get(indexShard); - if (sessions != null) { - sessions.remove(sessionUUID); - if (sessions.isEmpty()) { - sessionsForShard.remove(indexShard); + BytesRefIterator refIterator = reference.iterator(); + BytesRef ref; + while ((ref = refIterator.next()) != null) { + byte[] refBytes = ref.bytes; + indexInput.readBytes(refBytes, 0, refBytes.length); } + + long offsetAfterRead = indexInput.getFilePointer(); + + if (offsetAfterRead == indexInput.length()) { + cachedInputs.remove(fileName); + IOUtils.close(indexInput); + } + + return offsetAfterRead; } } + + @Override + protected void closeInternal() { + logger.debug("closing session [{}] for shard [{}]", sessionUUID, indexShard.shardId()); + assert keyedLock.hasLockedKeys() == false : "Should not hold any file locks when closing"; + IOUtils.closeWhileHandlingException(cachedInputs.values()); + } + } + + public static class SessionReader implements Closeable { + + private final RestoreSession restoreSession; + + private SessionReader(RestoreSession restoreSession) { + this.restoreSession = restoreSession; + restoreSession.incRef(); + } + + /** + * Read bytes into the reference from the file. This method will return the offset in the file where + * the read completed. + * + * @param fileName to read + * @param reference to read bytes into + * @return the offset of the file after the read is complete + * @throws IOException if the read fails + */ + public long readFileBytes(String fileName, BytesReference reference) throws IOException { + return restoreSession.readFileBytes(fileName, reference); + } + + @Override + public void close() { + restoreSession.decRef(); + } } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java index 2d3ca857ff848..36e1027dc5f87 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -21,7 +22,6 @@ import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.index.IndexSettings; @@ -40,8 +40,8 @@ import java.io.IOException; import java.util.Locale; import java.util.Map; -import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Collections.singletonMap; import static org.elasticsearch.snapshots.RestoreService.restoreInProgress; @@ -159,7 +159,7 @@ public void testThatRepositoryRecoversEmptyIndexBasedOnLeaderSettings() throws I assertNotEquals(leaderMetadata.getIndexUUID(), followerMetadata.getIndexUUID()); } - public void testThatSessionIsRegisteredWithPrimaryShard() throws IOException { + public void testDocsAreRecovered() throws Exception { String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster"; String leaderIndex = "index1"; String followerIndex = "index2"; @@ -173,6 +173,45 @@ public void testThatSessionIsRegisteredWithPrimaryShard() throws IOException { final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class); final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class); + final int firstBatchNumDocs = randomIntBetween(1, 64); + logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs); + for (int i = 0; i < firstBatchNumDocs; i++) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); + leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); + } + + leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).setWaitIfOngoing(true).get(); + + AtomicBoolean isRunning = new AtomicBoolean(true); + + // Concurrently index new docs with mapping changes + Thread thread = new Thread(() -> { + char[] chars = "abcdeghijklmnopqrstuvwxyz".toCharArray(); + for (char c : chars) { + if (isRunning.get() == false) { + break; + } + final String source; + long l = randomLongBetween(0, 50000); + if (randomBoolean()) { + source = String.format(Locale.ROOT, "{\"%c\":%d}", c, l); + } else { + source = String.format(Locale.ROOT, "{\"%c\":\"%d\"}", c, l); + } + for (int i = 64; i < 150; i++) { + if (isRunning.get() == false) { + break; + } + leaderClient().prepareIndex("index1", "doc", Long.toString(i)).setSource(source, XContentType.JSON).get(); + if (rarely()) { + leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).get(); + } + } + leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).setWaitIfOngoing(true).get(); + } + }); + thread.start(); + Settings.Builder settingsBuilder = Settings.builder() .put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex) .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true); @@ -182,22 +221,18 @@ public void testThatSessionIsRegisteredWithPrimaryShard() throws IOException { false, true, settingsBuilder.build(), new String[0], "restore_snapshot[" + leaderClusterRepoName + ":" + leaderIndex + "]"); - Set sessionsOpened = ConcurrentCollections.newConcurrentSet(); - Set sessionsClosed = ConcurrentCollections.newConcurrentSet(); - for (CcrRestoreSourceService restoreSourceService : getLeaderCluster().getDataNodeInstances(CcrRestoreSourceService.class)) { - restoreSourceService.addOpenSessionListener(sessionsOpened::add); - restoreSourceService.addCloseSessionListener(sessionsClosed::add); - } - PlainActionFuture future = PlainActionFuture.newFuture(); restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future)); RestoreInfo restoreInfo = future.actionGet(); - assertEquals(numberOfPrimaryShards, sessionsOpened.size()); - assertEquals(numberOfPrimaryShards, sessionsClosed.size()); - assertEquals(restoreInfo.totalShards(), restoreInfo.successfulShards()); assertEquals(0, restoreInfo.failedShards()); + for (int i = 0; i < firstBatchNumDocs; ++i) { + assertExpectedDocument(followerIndex, i); + } + + isRunning.set(false); + thread.join(); } public void testFollowerMappingIsUpdated() throws IOException { @@ -254,6 +289,13 @@ public void testFollowerMappingIsUpdated() throws IOException { assertThat(XContentMapValues.extractValue("properties.k.type", mappingMetaData.sourceAsMap()), equalTo("long")); } + private void assertExpectedDocument(String followerIndex, final int value) { + final GetResponse getResponse = followerClient().prepareGet(followerIndex, "doc", Integer.toString(value)).get(); + assertTrue("Doc with id [" + value + "] is missing", getResponse.isExists()); + assertTrue((getResponse.getSource().containsKey("f"))); + assertThat(getResponse.getSource().get("f"), equalTo(value)); + } + private ActionListener waitForRestore(ClusterService clusterService, ActionListener listener) { return new ActionListener() { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java index dfa7e5ef12660..efcd93e90fd07 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java @@ -6,14 +6,20 @@ package org.elasticsearch.xpack.ccr.repository; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; +import org.elasticsearch.index.store.StoreFileMetaData; import org.junit.Before; import java.io.IOException; +import java.util.ArrayList; import java.util.HashSet; public class CcrRestoreSourceServiceTests extends IndexShardTestCase { @@ -122,4 +128,63 @@ public void testCloseShardListenerFunctionality() throws IOException { restoreSourceService.closeSession(sessionUUID3); closeShards(indexShard1, indexShard2); } + + public void testGetSessionReader() throws IOException { + IndexShard indexShard1 = newStartedShard(true); + final String sessionUUID1 = UUIDs.randomBase64UUID(); + + restoreSourceService.openSession(sessionUUID1, indexShard1); + + ArrayList files = new ArrayList<>(); + indexShard1.snapshotStoreMetadata().forEach(files::add); + + StoreFileMetaData fileMetaData = files.get(0); + String fileName = fileMetaData.name(); + + byte[] expectedBytes = new byte[(int) fileMetaData.length()]; + byte[] actualBytes = new byte[(int) fileMetaData.length()]; + Engine.IndexCommitRef indexCommitRef = indexShard1.acquireSafeIndexCommit(); + try (IndexInput indexInput = indexCommitRef.getIndexCommit().getDirectory().openInput(fileName, IOContext.READONCE)) { + indexInput.seek(0); + indexInput.readBytes(expectedBytes, 0, (int) fileMetaData.length()); + } + + BytesArray byteArray = new BytesArray(actualBytes); + try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID1)) { + long offset = sessionReader.readFileBytes(fileName, byteArray); + assertEquals(offset, fileMetaData.length()); + } + + assertArrayEquals(expectedBytes, actualBytes); + restoreSourceService.closeSession(sessionUUID1); + closeShards(indexShard1); + } + + public void testGetSessionDoesNotLeakFileIfClosed() throws IOException { + Settings settings = Settings.builder().put("index.merge.enabled", false).build(); + IndexShard indexShard = newStartedShard(true, settings); + for (int i = 0; i < 5; i++) { + indexDoc(indexShard, "_doc", Integer.toString(i)); + flushShard(indexShard, true); + } + final String sessionUUID1 = UUIDs.randomBase64UUID(); + + restoreSourceService.openSession(sessionUUID1, indexShard); + + ArrayList files = new ArrayList<>(); + indexShard.snapshotStoreMetadata().forEach(files::add); + + try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID1)) { + sessionReader.readFileBytes(files.get(0).name(), new BytesArray(new byte[10])); + } + + // Request a second file to ensure that original file is not leaked + try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID1)) { + sessionReader.readFileBytes(files.get(1).name(), new BytesArray(new byte[10])); + } + + restoreSourceService.closeSession(sessionUUID1); + closeShards(indexShard); + // Exception will be thrown if file is not closed. + } }