From 2f118f5df502fd9766ce832d74ee8966b70f52cb Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 14 Jan 2019 13:07:55 -0700 Subject: [PATCH] Implement ccr file restore (#37130) This is related to #35975. It implements a file based restore in the CcrRepository. The restore transfers files from the leader cluster to the follower cluster. It does not implement any advanced resiliency features at the moment. Any request failure will end the restore. --- .../http/netty4/Netty4HttpChannelTests.java | 2 +- .../common/bytes/PagedBytesReference.java | 13 +- .../bytes/ReleasablePagedBytesReference.java | 6 +- .../common/io/stream/BytesStreamOutput.java | 2 +- .../stream/ReleasableBytesStreamOutput.java | 2 +- .../blobstore/BlobStoreRepository.java | 252 ++------------- .../blobstore/FileRestoreContext.java | 304 ++++++++++++++++++ .../bytes/PagedBytesReferenceTests.java | 4 +- .../bytes/AbstractBytesReferenceTestCase.java | 2 +- .../index/shard/IndexShardTestCase.java | 6 + .../java/org/elasticsearch/xpack/ccr/Ccr.java | 3 + .../GetCcrRestoreFileChunkAction.java | 138 ++++++++ .../GetCcrRestoreFileChunkRequest.java | 83 +++++ .../GetCcrRestoreFileChunkRequestBuilder.java | 18 ++ .../PutCcrRestoreSessionAction.java | 16 +- .../PutCcrRestoreSessionRequest.java | 15 +- .../xpack/ccr/repository/CcrRepository.java | 141 +++++++- .../repository/CcrRestoreSourceService.java | 148 +++++++-- .../xpack/ccr/CcrRepositoryIT.java | 68 +++- .../CcrRestoreSourceServiceTests.java | 65 ++++ 20 files changed, 966 insertions(+), 322 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkRequest.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkRequestBuilder.java diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java index 918e98fd2e7c0..d5a0e7f8ce855 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java @@ -590,7 +590,7 @@ private static class TestResponse extends RestResponse { } final ByteArray bigArray = bigArrays.newByteArray(bytes.length); bigArray.set(0, bytes, 0, bytes.length); - reference = new ReleasablePagedBytesReference(bigArrays, bigArray, bytes.length, Releasables.releaseOnce(bigArray)); + reference = new ReleasablePagedBytesReference(bigArray, bytes.length, Releasables.releaseOnce(bigArray)); } @Override diff --git a/server/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java b/server/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java index c754d6b8eae9e..f15b3b9cf3295 100644 --- a/server/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java +++ b/server/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java @@ -21,7 +21,6 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefIterator; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.ByteArray; import org.elasticsearch.common.util.PageCacheRecycler; @@ -35,17 +34,15 @@ public class PagedBytesReference extends BytesReference { private static final int PAGE_SIZE = PageCacheRecycler.BYTE_PAGE_SIZE; - private final BigArrays bigarrays; - protected final ByteArray byteArray; + private final ByteArray byteArray; private final int offset; private final int length; - public PagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int length) { - this(bigarrays, byteArray, 0, length); + public PagedBytesReference(ByteArray byteArray, int length) { + this(byteArray, 0, length); } - public PagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int from, int length) { - this.bigarrays = bigarrays; + private PagedBytesReference(ByteArray byteArray, int from, int length) { this.byteArray = byteArray; this.offset = from; this.length = length; @@ -67,7 +64,7 @@ public BytesReference slice(int from, int length) { throw new IllegalArgumentException("can't slice a buffer with length [" + length() + "], with slice parameters from [" + from + "], length [" + length + "]"); } - return new PagedBytesReference(bigarrays, byteArray, offset + from, length); + return new PagedBytesReference(byteArray, offset + from, length); } @Override diff --git a/server/src/main/java/org/elasticsearch/common/bytes/ReleasablePagedBytesReference.java b/server/src/main/java/org/elasticsearch/common/bytes/ReleasablePagedBytesReference.java index ac90e546f7eb5..209a6edc5696a 100644 --- a/server/src/main/java/org/elasticsearch/common/bytes/ReleasablePagedBytesReference.java +++ b/server/src/main/java/org/elasticsearch/common/bytes/ReleasablePagedBytesReference.java @@ -21,7 +21,6 @@ import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.ByteArray; /** @@ -32,9 +31,8 @@ public final class ReleasablePagedBytesReference extends PagedBytesReference imp private final Releasable releasable; - public ReleasablePagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int length, - Releasable releasable) { - super(bigarrays, byteArray, length); + public ReleasablePagedBytesReference(ByteArray byteArray, int length, Releasable releasable) { + super(byteArray, length); this.releasable = releasable; } diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java b/server/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java index 768dbd1d17775..ad9cde3abbc48 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java @@ -140,7 +140,7 @@ public int size() { @Override public BytesReference bytes() { - return new PagedBytesReference(bigArrays, bytes, count); + return new PagedBytesReference(bytes, count); } /** diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java b/server/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java index 5616da2db9636..725ecd1c3cc4f 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java @@ -56,7 +56,7 @@ public ReleasableBytesStreamOutput(int expectedSize, BigArrays bigArrays) { */ @Override public ReleasablePagedBytesReference bytes() { - return new ReleasablePagedBytesReference(bigArrays, bytes, count, releasable); + return new ReleasablePagedBytesReference(bytes, count, releasable); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 93bbafd61bce7..264d6a19e4073 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -22,17 +22,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexCommit; -import org.apache.lucene.index.IndexFormatTooNewException; -import org.apache.lucene.index.IndexFormatTooOldException; -import org.apache.lucene.index.IndexNotFoundException; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; -import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; @@ -68,7 +60,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -114,17 +105,12 @@ import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import static java.util.Collections.emptyMap; -import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName; /** @@ -866,9 +852,14 @@ public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, @Override public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) { - final RestoreContext snapshotContext = new RestoreContext(shard, snapshotId, indexId, snapshotShardId, recoveryState); + final Context context = new Context(snapshotId, indexId, shard.shardId(), snapshotShardId); + BlobPath path = basePath().add("indices").add(indexId.getId()).add(Integer.toString(snapshotShardId.getId())); + BlobContainer blobContainer = blobStore().blobContainer(path); + final RestoreContext snapshotContext = new RestoreContext(shard, snapshotId, recoveryState, blobContainer); try { - snapshotContext.restore(); + BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot(); + SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); + snapshotContext.restore(snapshotFiles); } catch (Exception e) { throw new IndexShardRestoreFailedException(shard.shardId(), "failed to restore snapshot [" + snapshotId + "]", e); } @@ -1217,6 +1208,7 @@ public void snapshot(final IndexCommit snapshotIndexCommit) { // TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should final Collection fileNames; try { + logger.trace("[{}] [{}] Loading store metadata using index commit [{}]", shardId, snapshotId, snapshotIndexCommit); metadata = store.getMetadata(snapshotIndexCommit); fileNames = snapshotIndexCommit.getFileNames(); } catch (IOException e) { @@ -1463,225 +1455,29 @@ protected InputStream openSlice(long slice) throws IOException { /** * Context for restore operations */ - private class RestoreContext extends Context { + private class RestoreContext extends FileRestoreContext { - private final IndexShard targetShard; - - private final RecoveryState recoveryState; + private final BlobContainer blobContainer; /** * Constructs new restore context - * - * @param shard shard to restore into - * @param snapshotId snapshot id - * @param indexId id of the index being restored - * @param snapshotShardId shard in the snapshot that data should be restored from - * @param recoveryState recovery state to report progress + * @param indexShard shard to restore into + * @param snapshotId snapshot id + * @param recoveryState recovery state to report progress + * @param blobContainer the blob container to read the files from */ - RestoreContext(IndexShard shard, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) { - super(snapshotId, indexId, shard.shardId(), snapshotShardId); - this.recoveryState = recoveryState; - this.targetShard = shard; - } - - /** - * Performs restore operation - */ - public void restore() throws IOException { - final Store store = targetShard.store(); - store.incRef(); - try { - logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, metadata.name(), shardId); - BlobStoreIndexShardSnapshot snapshot = loadSnapshot(); - - if (snapshot.indexFiles().size() == 1 - && snapshot.indexFiles().get(0).physicalName().startsWith("segments_") - && snapshot.indexFiles().get(0).hasUnknownChecksum()) { - // If the shard has no documents, it will only contain a single segments_N file for the - // shard's snapshot. If we are restoring a snapshot created by a previous supported version, - // it is still possible that in that version, an empty shard has a segments_N file with an unsupported - // version (and no checksum), because we don't know the Lucene version to assign segments_N until we - // have written some data. Since the segments_N for an empty shard could have an incompatible Lucene - // version number and no checksum, even though the index itself is perfectly fine to restore, this - // empty shard would cause exceptions to be thrown. Since there is no data to restore from an empty - // shard anyway, we just create the empty shard here and then exit. - IndexWriter writer = new IndexWriter(store.directory(), new IndexWriterConfig(null) - .setSoftDeletesField(Lucene.SOFT_DELETES_FIELD) - .setOpenMode(IndexWriterConfig.OpenMode.CREATE) - .setCommitOnClose(true)); - writer.close(); - return; - } - - SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); - Store.MetadataSnapshot recoveryTargetMetadata; - try { - // this will throw an IOException if the store has no segments infos file. The - // store can still have existing files but they will be deleted just before being - // restored. - recoveryTargetMetadata = targetShard.snapshotStoreMetadata(); - } catch (IndexNotFoundException e) { - // happens when restore to an empty shard, not a big deal - logger.trace("[{}] [{}] restoring from to an empty shard", shardId, snapshotId); - recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY; - } catch (IOException e) { - logger.warn(() -> new ParameterizedMessage("{} Can't read metadata from store, will not reuse any " + - "local file while restoring", shardId), e); - recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY; - } - - final List filesToRecover = new ArrayList<>(); - final Map snapshotMetaData = new HashMap<>(); - final Map fileInfos = new HashMap<>(); - for (final BlobStoreIndexShardSnapshot.FileInfo fileInfo : snapshot.indexFiles()) { - try { - // in 1.3.3 we added additional hashes for .si / segments_N files - // to ensure we don't double the space in the repo since old snapshots - // don't have this hash we try to read that hash from the blob store - // in a bwc compatible way. - maybeRecalculateMetadataHash(blobContainer, fileInfo, recoveryTargetMetadata); - } catch (Exception e) { - // if the index is broken we might not be able to read it - logger.warn(() -> new ParameterizedMessage("{} Can't calculate hash from blog for file [{}] [{}]", - shardId, fileInfo.physicalName(), fileInfo.metadata()), e); - } - snapshotMetaData.put(fileInfo.metadata().name(), fileInfo.metadata()); - fileInfos.put(fileInfo.metadata().name(), fileInfo); - } - - final Store.MetadataSnapshot sourceMetaData = new Store.MetadataSnapshot(unmodifiableMap(snapshotMetaData), emptyMap(), 0); - - final StoreFileMetaData restoredSegmentsFile = sourceMetaData.getSegmentsFile(); - if (restoredSegmentsFile == null) { - throw new IndexShardRestoreFailedException(shardId, "Snapshot has no segments file"); - } - - final Store.RecoveryDiff diff = sourceMetaData.recoveryDiff(recoveryTargetMetadata); - for (StoreFileMetaData md : diff.identical) { - BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfos.get(md.name()); - recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), true); - if (logger.isTraceEnabled()) { - logger.trace("[{}] [{}] not_recovering [{}] from [{}], exists in local store and is same", - shardId, snapshotId, fileInfo.physicalName(), fileInfo.name()); - } - } - - for (StoreFileMetaData md : Iterables.concat(diff.different, diff.missing)) { - BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfos.get(md.name()); - filesToRecover.add(fileInfo); - recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), false); - if (logger.isTraceEnabled()) { - if (md == null) { - logger.trace("[{}] [{}] recovering [{}] from [{}], does not exists in local store", - shardId, snapshotId, fileInfo.physicalName(), fileInfo.name()); - } else { - logger.trace("[{}] [{}] recovering [{}] from [{}], exists in local store but is different", - shardId, snapshotId, fileInfo.physicalName(), fileInfo.name()); - } - } - } - final RecoveryState.Index index = recoveryState.getIndex(); - if (filesToRecover.isEmpty()) { - logger.trace("no files to recover, all exists within the local store"); - } - - try { - // list of all existing store files - final List deleteIfExistFiles = Arrays.asList(store.directory().listAll()); - - // restore the files from the snapshot to the Lucene store - for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) { - // if a file with a same physical name already exist in the store we need to delete it - // before restoring it from the snapshot. We could be lenient and try to reuse the existing - // store files (and compare their names/length/checksum again with the snapshot files) but to - // avoid extra complexity we simply delete them and restore them again like StoreRecovery - // does with dangling indices. Any existing store file that is not restored from the snapshot - // will be clean up by RecoveryTarget.cleanFiles(). - final String physicalName = fileToRecover.physicalName(); - if (deleteIfExistFiles.contains(physicalName)) { - logger.trace("[{}] [{}] deleting pre-existing file [{}]", shardId, snapshotId, physicalName); - store.directory().deleteFile(physicalName); - } - - logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name()); - restoreFile(fileToRecover, store); - } - } catch (IOException ex) { - throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", ex); - } - - // read the snapshot data persisted - final SegmentInfos segmentCommitInfos; - try { - segmentCommitInfos = Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory()); - } catch (IOException e) { - throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e); - } - recoveryState.getIndex().updateVersion(segmentCommitInfos.getVersion()); - - /// now, go over and clean files that are in the store, but were not in the snapshot - try { - for (String storeFile : store.directory().listAll()) { - if (Store.isAutogenerated(storeFile) || snapshotFiles.containPhysicalIndexFile(storeFile)) { - continue; //skip write.lock, checksum files and files that exist in the snapshot - } - try { - store.deleteQuiet("restore", storeFile); - store.directory().deleteFile(storeFile); - } catch (IOException e) { - logger.warn("[{}] failed to delete file [{}] during snapshot cleanup", snapshotId, storeFile); - } - } - } catch (IOException e) { - logger.warn("[{}] failed to list directory - some of files might not be deleted", snapshotId); - } - } finally { - store.decRef(); - } + RestoreContext(IndexShard indexShard, SnapshotId snapshotId, RecoveryState recoveryState, BlobContainer blobContainer) { + super(metadata.name(), indexShard, snapshotId, recoveryState, BUFFER_SIZE); + this.blobContainer = blobContainer; } - /** - * Restores a file - * This is asynchronous method. Upon completion of the operation latch is getting counted down and any failures are - * added to the {@code failures} list - * - * @param fileInfo file to be restored - */ - private void restoreFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo, final Store store) throws IOException { - boolean success = false; - - try (InputStream partSliceStream = new PartSliceStream(blobContainer, fileInfo)) { - final InputStream stream; - if (restoreRateLimiter == null) { - stream = partSliceStream; - } else { - stream = new RateLimitingInputStream(partSliceStream, restoreRateLimiter, restoreRateLimitingTimeInNanos::inc); - } - - try (IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), - fileInfo.metadata(), IOContext.DEFAULT)) { - final byte[] buffer = new byte[BUFFER_SIZE]; - int length; - while ((length = stream.read(buffer)) > 0) { - indexOutput.writeBytes(buffer, 0, length); - recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.name(), length); - } - Store.verify(indexOutput); - indexOutput.close(); - store.directory().sync(Collections.singleton(fileInfo.physicalName())); - success = true; - } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { - try { - store.markStoreCorrupted(ex); - } catch (IOException e) { - logger.warn("store cannot be marked as corrupted", e); - } - throw ex; - } finally { - if (success == false) { - store.deleteQuiet(fileInfo.physicalName()); - } - } + @Override + protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) { + if (restoreRateLimiter == null) { + return new PartSliceStream(blobContainer, fileInfo); + } else { + RateLimitingInputStream.Listener listener = restoreRateLimitingTimeInNanos::inc; + return new RateLimitingInputStream(new PartSliceStream(blobContainer, fileInfo), restoreRateLimiter, listener); } } } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java new file mode 100644 index 0000000000000..5a99c98db530b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java @@ -0,0 +1,304 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.repositories.blobstore; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.IndexFormatTooNewException; +import org.apache.lucene.index.IndexFormatTooOldException; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.BytesRefBuilder; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.util.iterable.Iterables; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; +import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; +import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.StoreFileMetaData; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.snapshots.SnapshotId; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.unmodifiableMap; + +/** + * This context will execute a file restore of the lucene files. It is primarily designed to be used to + * restore from some form of a snapshot. It will setup a new store, identify files that need to be copied + * for the source, and perform the copies. Implementers must implement the functionality of opening the + * underlying file streams for snapshotted lucene file. + */ +public abstract class FileRestoreContext { + + private static final Logger logger = LogManager.getLogger(FileRestoreContext.class); + + private final String repositoryName; + private final IndexShard indexShard; + private final RecoveryState recoveryState; + private final SnapshotId snapshotId; + private final ShardId shardId; + private final int bufferSize; + + /** + * Constructs new restore context + * + * @param indexShard shard to restore into + * @param snapshotId snapshot id + * @param recoveryState recovery state to report progress + * @param bufferSize buffer size for restore + */ + protected FileRestoreContext(String repositoryName, IndexShard indexShard, SnapshotId snapshotId, RecoveryState recoveryState, + int bufferSize) { + this.repositoryName = repositoryName; + this.recoveryState = recoveryState; + this.indexShard = indexShard; + this.snapshotId = snapshotId; + this.shardId = indexShard.shardId(); + this.bufferSize = bufferSize; + } + + /** + * Performs restore operation + */ + public void restore(SnapshotFiles snapshotFiles) throws IOException { + final Store store = indexShard.store(); + store.incRef(); + try { + logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId); + + if (snapshotFiles.indexFiles().size() == 1 + && snapshotFiles.indexFiles().get(0).physicalName().startsWith("segments_") + && snapshotFiles.indexFiles().get(0).hasUnknownChecksum()) { + // If the shard has no documents, it will only contain a single segments_N file for the + // shard's snapshot. If we are restoring a snapshot created by a previous supported version, + // it is still possible that in that version, an empty shard has a segments_N file with an unsupported + // version (and no checksum), because we don't know the Lucene version to assign segments_N until we + // have written some data. Since the segments_N for an empty shard could have an incompatible Lucene + // version number and no checksum, even though the index itself is perfectly fine to restore, this + // empty shard would cause exceptions to be thrown. Since there is no data to restore from an empty + // shard anyway, we just create the empty shard here and then exit. + store.createEmpty(); + return; + } + + Store.MetadataSnapshot recoveryTargetMetadata; + try { + // this will throw an IOException if the store has no segments infos file. The + // store can still have existing files but they will be deleted just before being + // restored. + recoveryTargetMetadata = indexShard.snapshotStoreMetadata(); + } catch (IndexNotFoundException e) { + // happens when restore to an empty shard, not a big deal + logger.trace("[{}] [{}] restoring from to an empty shard", shardId, snapshotId); + recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY; + } catch (IOException e) { + logger.warn(new ParameterizedMessage("[{}] [{}] Can't read metadata from store, will not reuse local files during restore", + shardId, snapshotId), e); + recoveryTargetMetadata = Store.MetadataSnapshot.EMPTY; + } + + final List filesToRecover = new ArrayList<>(); + final Map snapshotMetaData = new HashMap<>(); + final Map fileInfos = new HashMap<>(); + for (final BlobStoreIndexShardSnapshot.FileInfo fileInfo : snapshotFiles.indexFiles()) { + try { + // in 1.3.3 we added additional hashes for .si / segments_N files + // to ensure we don't double the space in the repo since old snapshots + // don't have this hash we try to read that hash from the blob store + // in a bwc compatible way. + maybeRecalculateMetadataHash(fileInfo, recoveryTargetMetadata); + } catch (Exception e) { + // if the index is broken we might not be able to read it + logger.warn(new ParameterizedMessage("[{}] Can't calculate hash from blog for file [{}] [{}]", shardId, + fileInfo.physicalName(), fileInfo.metadata()), e); + } + snapshotMetaData.put(fileInfo.metadata().name(), fileInfo.metadata()); + fileInfos.put(fileInfo.metadata().name(), fileInfo); + } + + final Store.MetadataSnapshot sourceMetaData = new Store.MetadataSnapshot(unmodifiableMap(snapshotMetaData), emptyMap(), 0); + + final StoreFileMetaData restoredSegmentsFile = sourceMetaData.getSegmentsFile(); + if (restoredSegmentsFile == null) { + throw new IndexShardRestoreFailedException(shardId, "Snapshot has no segments file"); + } + + final Store.RecoveryDiff diff = sourceMetaData.recoveryDiff(recoveryTargetMetadata); + for (StoreFileMetaData md : diff.identical) { + BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfos.get(md.name()); + recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), true); + if (logger.isTraceEnabled()) { + logger.trace("[{}] [{}] not_recovering file [{}] from [{}], exists in local store and is same", shardId, snapshotId, + fileInfo.physicalName(), fileInfo.name()); + } + } + + for (StoreFileMetaData md : concat(diff)) { + BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfos.get(md.name()); + filesToRecover.add(fileInfo); + recoveryState.getIndex().addFileDetail(fileInfo.name(), fileInfo.length(), false); + if (logger.isTraceEnabled()) { + logger.trace("[{}] [{}] recovering [{}] from [{}]", shardId, snapshotId, + fileInfo.physicalName(), fileInfo.name()); + } + } + + if (filesToRecover.isEmpty()) { + logger.trace("[{}] [{}] no files to recover, all exist within the local store", shardId, snapshotId); + } + + try { + // list of all existing store files + final List deleteIfExistFiles = Arrays.asList(store.directory().listAll()); + + // restore the files from the snapshot to the Lucene store + for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) { + // if a file with a same physical name already exist in the store we need to delete it + // before restoring it from the snapshot. We could be lenient and try to reuse the existing + // store files (and compare their names/length/checksum again with the snapshot files) but to + // avoid extra complexity we simply delete them and restore them again like StoreRecovery + // does with dangling indices. Any existing store file that is not restored from the snapshot + // will be clean up by RecoveryTarget.cleanFiles(). + final String physicalName = fileToRecover.physicalName(); + if (deleteIfExistFiles.contains(physicalName)) { + logger.trace("[{}] [{}] deleting pre-existing file [{}]", shardId, snapshotId, physicalName); + store.directory().deleteFile(physicalName); + } + + logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name()); + restoreFile(fileToRecover, store); + } + } catch (IOException ex) { + throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", ex); + } + + // read the snapshot data persisted + final SegmentInfos segmentCommitInfos; + try { + segmentCommitInfos = Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory()); + } catch (IOException e) { + throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e); + } + recoveryState.getIndex().updateVersion(segmentCommitInfos.getVersion()); + + /// now, go over and clean files that are in the store, but were not in the snapshot + try { + for (String storeFile : store.directory().listAll()) { + if (Store.isAutogenerated(storeFile) || snapshotFiles.containPhysicalIndexFile(storeFile)) { + continue; //skip write.lock, checksum files and files that exist in the snapshot + } + try { + store.deleteQuiet("restore", storeFile); + store.directory().deleteFile(storeFile); + } catch (IOException e) { + logger.warn("[{}] [{}] failed to delete file [{}] during snapshot cleanup", shardId, snapshotId, storeFile); + } + } + } catch (IOException e) { + logger.warn("[{}] [{}] failed to list directory - some of files might not be deleted", shardId, snapshotId); + } + } finally { + store.decRef(); + } + } + + protected abstract InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo); + + @SuppressWarnings("unchecked") + private Iterable concat(Store.RecoveryDiff diff) { + return Iterables.concat(diff.different, diff.missing); + } + + /** + * Restores a file + * + * @param fileInfo file to be restored + */ + private void restoreFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo, final Store store) throws IOException { + boolean success = false; + + try (InputStream stream = fileInputStream(fileInfo)) { + try (IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) { + final byte[] buffer = new byte[bufferSize]; + int length; + while ((length = stream.read(buffer)) > 0) { + indexOutput.writeBytes(buffer, 0, length); + recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.name(), length); + } + Store.verify(indexOutput); + indexOutput.close(); + store.directory().sync(Collections.singleton(fileInfo.physicalName())); + success = true; + } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { + try { + store.markStoreCorrupted(ex); + } catch (IOException e) { + logger.warn("store cannot be marked as corrupted", e); + } + throw ex; + } finally { + if (success == false) { + store.deleteQuiet(fileInfo.physicalName()); + } + } + } + } + + /** + * This is a BWC layer to ensure we update the snapshots metadata with the corresponding hashes before we compare them. + * The new logic for StoreFileMetaData reads the entire {@code .si} and {@code segments.n} files to strengthen the + * comparison of the files on a per-segment / per-commit level. + */ + private void maybeRecalculateMetadataHash(final BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store.MetadataSnapshot snapshot) + throws IOException { + final StoreFileMetaData metadata; + if (fileInfo != null && (metadata = snapshot.get(fileInfo.physicalName())) != null) { + if (metadata.hash().length > 0 && fileInfo.metadata().hash().length == 0) { + // we have a hash - check if our repo has a hash too otherwise we have + // to calculate it. + // we might have multiple parts even though the file is small... make sure we read all of it. + try (InputStream stream = fileInputStream(fileInfo)) { + BytesRefBuilder builder = new BytesRefBuilder(); + Store.MetadataSnapshot.hashFile(builder, stream, fileInfo.length()); + BytesRef hash = fileInfo.metadata().hash(); // reset the file infos metadata hash + assert hash.length == 0; + hash.bytes = builder.bytes(); + hash.offset = 0; + hash.length = builder.length(); + } + } + } + } +} diff --git a/server/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTests.java b/server/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTests.java index 0938b70906f52..40e02c560a9c0 100644 --- a/server/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTests.java +++ b/server/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTests.java @@ -120,8 +120,8 @@ public void testEquals() { } // get refs & compare - BytesReference pbr = new PagedBytesReference(bigarrays, ba1, length); - BytesReference pbr2 = new PagedBytesReference(bigarrays, ba2, length); + BytesReference pbr = new PagedBytesReference(ba1, length); + BytesReference pbr2 = new PagedBytesReference(ba2, length); assertEquals(pbr, pbr2); int offsetToFlip = randomIntBetween(0, length - 1); int value = ~Byte.toUnsignedInt(ba1.get(offsetToFlip)); diff --git a/test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java b/test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java index 1a00698325974..1a907feabe24a 100644 --- a/test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/common/bytes/AbstractBytesReferenceTestCase.java @@ -528,7 +528,7 @@ public void testEquals() throws IOException { public void testSliceEquals() { int length = randomIntBetween(100, PAGE_SIZE * randomIntBetween(2, 5)); ByteArray ba1 = bigarrays.newByteArray(length, false); - BytesReference pbr = new PagedBytesReference(bigarrays, ba1, length); + BytesReference pbr = new PagedBytesReference(ba1, length); // test equality of slices int sliceFrom = randomIntBetween(0, pbr.length()); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index c3b9cd2190990..755f5cab88475 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -455,6 +455,12 @@ protected IndexShard newStartedShard(final boolean primary) throws IOException { return newStartedShard(primary, Settings.EMPTY); } + /** + * Creates a new empty shard and starts it. + * + * @param primary controls whether the shard will be a primary or a replica. + * @param settings the settings to use for this shard + */ protected IndexShard newStartedShard(final boolean primary, Settings settings) throws IOException { return newStartedShard(primary, settings, new InternalEngineFactory()); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 54b3a1391246c..a68b20ac2a112 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -63,6 +63,7 @@ import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction; import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction; import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryAction; +import org.elasticsearch.xpack.ccr.action.repositories.GetCcrRestoreFileChunkAction; import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction; import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryAction; import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; @@ -203,6 +204,8 @@ public List> getPersistentTasksExecutor(ClusterServic PutCcrRestoreSessionAction.TransportPutCcrRestoreSessionAction.class), new ActionHandler<>(ClearCcrRestoreSessionAction.INSTANCE, ClearCcrRestoreSessionAction.TransportDeleteCcrRestoreSessionAction.class), + new ActionHandler<>(GetCcrRestoreFileChunkAction.INSTANCE, + GetCcrRestoreFileChunkAction.TransportGetCcrRestoreFileChunkAction.class), // stats action new ActionHandler<>(FollowStatsAction.INSTANCE, TransportFollowStatsAction.class), new ActionHandler<>(CcrStatsAction.INSTANCE, TransportCcrStatsAction.class), diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java new file mode 100644 index 0000000000000..a1bdc0da238fd --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java @@ -0,0 +1,138 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action.repositories; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.bytes.ReleasablePagedBytesReference; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.ByteArray; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportActionProxy; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; + +import java.io.IOException; + +public class GetCcrRestoreFileChunkAction extends Action { + + public static final GetCcrRestoreFileChunkAction INSTANCE = new GetCcrRestoreFileChunkAction(); + public static final String NAME = "internal:admin/ccr/restore/file_chunk/get"; + + private GetCcrRestoreFileChunkAction() { + super(NAME); + } + + @Override + public GetCcrRestoreFileChunkResponse newResponse() { + throw new UnsupportedOperationException(); + } + + @Override + public Writeable.Reader getResponseReader() { + return GetCcrRestoreFileChunkResponse::new; + } + + @Override + public GetCcrRestoreFileChunkRequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new GetCcrRestoreFileChunkRequestBuilder(client); + } + + + public static class TransportGetCcrRestoreFileChunkAction + extends HandledTransportAction { + + private final CcrRestoreSourceService restoreSourceService; + private final ThreadPool threadPool; + private final BigArrays bigArrays; + + @Inject + public TransportGetCcrRestoreFileChunkAction(Settings settings, BigArrays bigArrays, TransportService transportService, + IndexNameExpressionResolver resolver, + ActionFilters actionFilters, CcrRestoreSourceService restoreSourceService) { + super(settings, NAME, transportService.getThreadPool(), transportService, actionFilters, resolver, + GetCcrRestoreFileChunkRequest::new); + TransportActionProxy.registerProxyAction(transportService, NAME, GetCcrRestoreFileChunkResponse::new); + this.threadPool = transportService.getThreadPool(); + this.restoreSourceService = restoreSourceService; + this.bigArrays = bigArrays; + } + + @Override + protected void doExecute(GetCcrRestoreFileChunkRequest request, ActionListener listener) { + threadPool.generic().execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + @Override + protected void doRun() throws Exception { + int bytesRequested = request.getSize(); + ByteArray array = bigArrays.newByteArray(bytesRequested, false); + String fileName = request.getFileName(); + String sessionUUID = request.getSessionUUID(); + // This is currently safe to do because calling `onResponse` will serialize the bytes to the network layer data + // structure on the same thread. So the bytes will be copied before the reference is released. + try (ReleasablePagedBytesReference reference = new ReleasablePagedBytesReference(array, bytesRequested, array)) { + try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID)) { + long offsetAfterRead = sessionReader.readFileBytes(fileName, reference); + long offsetBeforeRead = offsetAfterRead - reference.length(); + listener.onResponse(new GetCcrRestoreFileChunkResponse(offsetBeforeRead, reference)); + } + } + } + }); + } + } + + public static class GetCcrRestoreFileChunkResponse extends ActionResponse { + + private final long offset; + private final BytesReference chunk; + + GetCcrRestoreFileChunkResponse(StreamInput streamInput) throws IOException { + super(streamInput); + offset = streamInput.readVLong(); + chunk = streamInput.readBytesReference(); + } + + GetCcrRestoreFileChunkResponse(long offset, BytesReference chunk) { + this.offset = offset; + this.chunk = chunk; + } + + public long getOffset() { + return offset; + } + + public BytesReference getChunk() { + return chunk; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVLong(offset); + out.writeBytesReference(chunk); + } + + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkRequest.java new file mode 100644 index 0000000000000..9360587cb60c1 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkRequest.java @@ -0,0 +1,83 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action.repositories; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.transport.RemoteClusterAwareRequest; + +import java.io.IOException; + +public class GetCcrRestoreFileChunkRequest extends ActionRequest implements RemoteClusterAwareRequest { + + private DiscoveryNode node; + private String sessionUUID; + private String fileName; + private int size; + + GetCcrRestoreFileChunkRequest() { + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public GetCcrRestoreFileChunkRequest(DiscoveryNode node, String sessionUUID, String fileName, int size) { + this.node = node; + this.sessionUUID = sessionUUID; + this.fileName = fileName; + this.size = size; + assert size > -1 : "The file chunk request size must be positive. Found: [" + size + "]."; + } + + GetCcrRestoreFileChunkRequest(StreamInput in) throws IOException { + super(in); + node = null; + sessionUUID = in.readString(); + fileName = in.readString(); + size = in.readVInt(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(sessionUUID); + out.writeString(fileName); + out.writeVInt(size); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + node = null; + sessionUUID = in.readString(); + fileName = in.readString(); + size = in.readVInt(); + } + + String getSessionUUID() { + return sessionUUID; + } + + String getFileName() { + return fileName; + } + + int getSize() { + return size; + } + + @Override + public DiscoveryNode getPreferredTargetNode() { + assert node != null : "Target node is null"; + return node; + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkRequestBuilder.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkRequestBuilder.java new file mode 100644 index 0000000000000..60aeb00829593 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkRequestBuilder.java @@ -0,0 +1,18 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action.repositories; + +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.client.ElasticsearchClient; + +class GetCcrRestoreFileChunkRequestBuilder extends ActionRequestBuilder { + + GetCcrRestoreFileChunkRequestBuilder(ElasticsearchClient client) { + super(client, GetCcrRestoreFileChunkAction.INSTANCE, new GetCcrRestoreFileChunkRequest()); + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java index dd0d523cea9c1..be116b523b3a0 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; +import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -79,8 +80,8 @@ protected PutCcrRestoreSessionResponse shardOperation(PutCcrRestoreSessionReques if (indexShard == null) { throw new ShardNotFoundException(shardId); } - ccrRestoreService.openSession(request.getSessionUUID(), indexShard); - return new PutCcrRestoreSessionResponse(clusterService.localNode()); + Store.MetadataSnapshot storeFileMetaData = ccrRestoreService.openSession(request.getSessionUUID(), indexShard); + return new PutCcrRestoreSessionResponse(clusterService.localNode(), storeFileMetaData); } @Override @@ -104,33 +105,42 @@ protected ShardsIterator shards(ClusterState state, InternalRequest request) { public static class PutCcrRestoreSessionResponse extends ActionResponse { private DiscoveryNode node; + private Store.MetadataSnapshot storeFileMetaData; PutCcrRestoreSessionResponse() { } - PutCcrRestoreSessionResponse(DiscoveryNode node) { + PutCcrRestoreSessionResponse(DiscoveryNode node, Store.MetadataSnapshot storeFileMetaData) { this.node = node; + this.storeFileMetaData = storeFileMetaData; } PutCcrRestoreSessionResponse(StreamInput in) throws IOException { super(in); node = new DiscoveryNode(in); + storeFileMetaData = new Store.MetadataSnapshot(in); } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); node = new DiscoveryNode(in); + storeFileMetaData = new Store.MetadataSnapshot(in); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); node.writeTo(out); + storeFileMetaData.writeTo(out); } public DiscoveryNode getNode() { return node; } + + public Store.MetadataSnapshot getStoreFileMetaData() { + return storeFileMetaData; + } } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionRequest.java index 2b94193f674af..da0c43116ee76 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionRequest.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionRequest.java @@ -11,7 +11,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.store.Store; import java.io.IOException; @@ -19,16 +18,14 @@ public class PutCcrRestoreSessionRequest extends SingleShardRequest ccrMetaData = indexShard.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY); String leaderUUID = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY); Index leaderIndex = new Index(shardId.getIndexName(), leaderUUID); ShardId leaderShardId = new ShardId(leaderIndex, shardId.getId()); Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); - String sessionUUID = UUIDs.randomBase64UUID(); - PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE, - new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId, recoveryMetadata)).actionGet(); - DiscoveryNode node = response.getNode(); - // TODO: Implement file restore - closeSession(remoteClient, node, sessionUUID); + // TODO: There should be some local timeout. And if the remote cluster returns an unknown session + // response, we should be able to retry by creating a new session. + String name = metadata.name(); + try (RestoreSession restoreSession = RestoreSession.openSession(name, remoteClient, leaderShardId, indexShard, recoveryState)) { + restoreSession.restoreFiles(); + } catch (Exception e) { + throw new IndexShardRestoreFailedException(indexShard.shardId(), "failed to restore snapshot [" + snapshotId + "]", e); + } + maybeUpdateMappings(client, remoteClient, leaderIndex, indexShard.indexSettings()); } @@ -279,9 +287,108 @@ private void maybeUpdateMappings(Client localClient, Client remoteClient, Index } } - private void closeSession(Client remoteClient, DiscoveryNode node, String sessionUUID) { - ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(sessionUUID, node); - ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse response = - remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet(); + private static class RestoreSession extends FileRestoreContext implements Closeable { + + private static final int BUFFER_SIZE = 1 << 16; + + private final Client remoteClient; + private final String sessionUUID; + private final DiscoveryNode node; + private final Store.MetadataSnapshot sourceMetaData; + + RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, IndexShard indexShard, + RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData) { + super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, BUFFER_SIZE); + this.remoteClient = remoteClient; + this.sessionUUID = sessionUUID; + this.node = node; + this.sourceMetaData = sourceMetaData; + } + + static RestoreSession openSession(String repositoryName, Client remoteClient, ShardId leaderShardId, IndexShard indexShard, + RecoveryState recoveryState) { + String sessionUUID = UUIDs.randomBase64UUID(); + PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE, + new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet(); + return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShard, recoveryState, + response.getStoreFileMetaData()); + } + + void restoreFiles() throws IOException { + ArrayList fileInfos = new ArrayList<>(); + for (StoreFileMetaData fileMetaData : sourceMetaData) { + ByteSizeValue fileSize = new ByteSizeValue(fileMetaData.length()); + fileInfos.add(new BlobStoreIndexShardSnapshot.FileInfo(fileMetaData.name(), fileMetaData, fileSize)); + } + SnapshotFiles snapshotFiles = new SnapshotFiles(LATEST, fileInfos); + restore(snapshotFiles); + } + + @Override + protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) { + return new RestoreFileInputStream(remoteClient, sessionUUID, node, fileInfo.metadata()); + } + + @Override + public void close() { + ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(sessionUUID, node); + ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse response = + remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet(); + } + } + + private static class RestoreFileInputStream extends InputStream { + + private final Client remoteClient; + private final String sessionUUID; + private final DiscoveryNode node; + private final StoreFileMetaData fileToRecover; + + private long pos = 0; + + private RestoreFileInputStream(Client remoteClient, String sessionUUID, DiscoveryNode node, StoreFileMetaData fileToRecover) { + this.remoteClient = remoteClient; + this.sessionUUID = sessionUUID; + this.node = node; + this.fileToRecover = fileToRecover; + } + + + @Override + public int read() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int read(byte[] bytes, int off, int len) throws IOException { + long remainingBytes = fileToRecover.length() - pos; + if (remainingBytes <= 0) { + return 0; + } + + int bytesRequested = (int) Math.min(remainingBytes, len); + String fileName = fileToRecover.name(); + GetCcrRestoreFileChunkRequest request = new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileName, bytesRequested); + GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse response = + remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request).actionGet(); + BytesReference fileChunk = response.getChunk(); + + int bytesReceived = fileChunk.length(); + if (bytesReceived > bytesRequested) { + throw new IOException("More bytes [" + bytesReceived + "] received than requested [" + bytesRequested + "]"); + } + + long leaderOffset = response.getOffset(); + assert pos == leaderOffset : "Position [" + pos + "] should be equal to the leader file offset [" + leaderOffset + "]."; + + try (StreamInput streamInput = fileChunk.streamInput()) { + int bytesRead = streamInput.read(bytes, 0, bytesReceived); + assert bytesRead == bytesReceived : "Did not read the correct number of bytes"; + } + + pos += bytesReceived; + + return bytesReceived; + } } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java index c7a022ae34569..785600dd5f8fc 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java @@ -8,10 +8,18 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.BytesRefIterator; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexEventListener; @@ -23,9 +31,11 @@ import java.io.Closeable; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; @@ -33,7 +43,7 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen private static final Logger logger = LogManager.getLogger(CcrRestoreSourceService.class); - private final Map onGoingRestores = ConcurrentCollections.newConcurrentMap(); + private final Map onGoingRestores = ConcurrentCollections.newConcurrentMap(); private final Map> sessionsForShard = new HashMap<>(); private final CopyOnWriteArrayList> openSessionListeners = new CopyOnWriteArrayList<>(); private final CopyOnWriteArrayList> closeSessionListeners = new CopyOnWriteArrayList<>(); @@ -44,8 +54,9 @@ public synchronized void afterIndexShardClosed(ShardId shardId, @Nullable IndexS HashSet sessions = sessionsForShard.remove(indexShard); if (sessions != null) { for (String sessionUUID : sessions) { - RestoreContext restore = onGoingRestores.remove(sessionUUID); - IOUtils.closeWhileHandlingException(restore); + RestoreSession restore = onGoingRestores.remove(sessionUUID); + assert restore != null : "Session UUID [" + sessionUUID + "] registered for shard but not found in ongoing restores"; + restore.decRef(); } } } @@ -64,7 +75,7 @@ protected void doStop() { @Override protected synchronized void doClose() throws IOException { sessionsForShard.clear(); - IOUtils.closeWhileHandlingException(onGoingRestores.values()); + onGoingRestores.values().forEach(AbstractRefCounted::decRef); onGoingRestores.clear(); } @@ -84,7 +95,7 @@ synchronized HashSet getSessionsForShard(IndexShard indexShard) { } // default visibility for testing - synchronized RestoreContext getOngoingRestore(String sessionUUID) { + synchronized RestoreSession getOngoingRestore(String sessionUUID) { return onGoingRestores.get(sessionUUID); } @@ -92,7 +103,7 @@ synchronized RestoreContext getOngoingRestore(String sessionUUID) { // complete. Or it could be for session to have been touched. public synchronized Store.MetadataSnapshot openSession(String sessionUUID, IndexShard indexShard) throws IOException { boolean success = false; - RestoreContext restore = null; + RestoreSession restore = null; try { if (onGoingRestores.containsKey(sessionUUID)) { logger.debug("not opening new session [{}] as it already exists", sessionUUID); @@ -102,46 +113,72 @@ public synchronized Store.MetadataSnapshot openSession(String sessionUUID, Index if (indexShard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(indexShard.shardId(), "cannot open ccr restore session if shard closed"); } - restore = new RestoreContext(sessionUUID, indexShard, indexShard.acquireSafeIndexCommit()); + restore = new RestoreSession(sessionUUID, indexShard, indexShard.acquireSafeIndexCommit()); onGoingRestores.put(sessionUUID, restore); openSessionListeners.forEach(c -> c.accept(sessionUUID)); - HashSet sessions = sessionsForShard.computeIfAbsent(indexShard, (s) -> new HashSet<>()); + HashSet sessions = sessionsForShard.computeIfAbsent(indexShard, (s) -> new HashSet<>()); sessions.add(sessionUUID); } Store.MetadataSnapshot metaData = restore.getMetaData(); success = true; return metaData; } finally { - if (success == false) { + if (success == false) { onGoingRestores.remove(sessionUUID); - IOUtils.closeWhileHandlingException(restore); + if (restore != null) { + restore.decRef(); + } + } + } + } + + public void closeSession(String sessionUUID) { + final RestoreSession restore; + synchronized (this) { + closeSessionListeners.forEach(c -> c.accept(sessionUUID)); + restore = onGoingRestores.remove(sessionUUID); + if (restore == null) { + logger.debug("could not close session [{}] because session not found", sessionUUID); + throw new IllegalArgumentException("session [" + sessionUUID + "] not found"); + } + HashSet sessions = sessionsForShard.get(restore.indexShard); + assert sessions != null : "No session UUIDs for shard even though one [" + sessionUUID + "] is active in ongoing restores"; + if (sessions != null) { + boolean removed = sessions.remove(sessionUUID); + assert removed : "No session found for UUID [" + sessionUUID +"]"; + if (sessions.isEmpty()) { + sessionsForShard.remove(restore.indexShard); + } } } + restore.decRef(); } - public synchronized void closeSession(String sessionUUID) { - closeSessionListeners.forEach(c -> c.accept(sessionUUID)); - RestoreContext restore = onGoingRestores.remove(sessionUUID); + public synchronized SessionReader getSessionReader(String sessionUUID) { + RestoreSession restore = onGoingRestores.get(sessionUUID); if (restore == null) { - logger.info("could not close session [{}] because session not found", sessionUUID); + logger.debug("could not get session [{}] because session not found", sessionUUID); throw new IllegalArgumentException("session [" + sessionUUID + "] not found"); } - IOUtils.closeWhileHandlingException(restore); + return new SessionReader(restore); } - private class RestoreContext implements Closeable { + private static class RestoreSession extends AbstractRefCounted { private final String sessionUUID; private final IndexShard indexShard; private final Engine.IndexCommitRef commitRef; + private final KeyedLock keyedLock = new KeyedLock<>(); + private final Map cachedInputs = new ConcurrentHashMap<>(); - private RestoreContext(String sessionUUID, IndexShard indexShard, Engine.IndexCommitRef commitRef) { + private RestoreSession(String sessionUUID, IndexShard indexShard, Engine.IndexCommitRef commitRef) { + super("restore-session"); this.sessionUUID = sessionUUID; this.indexShard = indexShard; this.commitRef = commitRef; } - Store.MetadataSnapshot getMetaData() throws IOException { + private Store.MetadataSnapshot getMetaData() throws IOException { indexShard.store().incRef(); try { return indexShard.store().getMetadata(commitRef.getIndexCommit()); @@ -150,22 +187,71 @@ Store.MetadataSnapshot getMetaData() throws IOException { } } - @Override - public void close() { - assert Thread.holdsLock(CcrRestoreSourceService.this); - removeSessionForShard(sessionUUID, indexShard); - IOUtils.closeWhileHandlingException(commitRef); - } + private long readFileBytes(String fileName, BytesReference reference) throws IOException { + Releasable lock = keyedLock.tryAcquire(fileName); + if (lock == null) { + throw new IllegalStateException("can't read from the same file on the same session concurrently"); + } + try (Releasable releasable = lock) { + final IndexInput indexInput = cachedInputs.computeIfAbsent(fileName, f -> { + try { + return commitRef.getIndexCommit().getDirectory().openInput(fileName, IOContext.READONCE); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); - private void removeSessionForShard(String sessionUUID, IndexShard indexShard) { - logger.debug("closing session [{}] for shard [{}]", sessionUUID, indexShard.shardId()); - HashSet sessions = sessionsForShard.get(indexShard); - if (sessions != null) { - sessions.remove(sessionUUID); - if (sessions.isEmpty()) { - sessionsForShard.remove(indexShard); + BytesRefIterator refIterator = reference.iterator(); + BytesRef ref; + while ((ref = refIterator.next()) != null) { + byte[] refBytes = ref.bytes; + indexInput.readBytes(refBytes, 0, refBytes.length); } + + long offsetAfterRead = indexInput.getFilePointer(); + + if (offsetAfterRead == indexInput.length()) { + cachedInputs.remove(fileName); + IOUtils.close(indexInput); + } + + return offsetAfterRead; } } + + @Override + protected void closeInternal() { + logger.debug("closing session [{}] for shard [{}]", sessionUUID, indexShard.shardId()); + assert keyedLock.hasLockedKeys() == false : "Should not hold any file locks when closing"; + IOUtils.closeWhileHandlingException(cachedInputs.values()); + } + } + + public static class SessionReader implements Closeable { + + private final RestoreSession restoreSession; + + private SessionReader(RestoreSession restoreSession) { + this.restoreSession = restoreSession; + restoreSession.incRef(); + } + + /** + * Read bytes into the reference from the file. This method will return the offset in the file where + * the read completed. + * + * @param fileName to read + * @param reference to read bytes into + * @return the offset of the file after the read is complete + * @throws IOException if the read fails + */ + public long readFileBytes(String fileName, BytesReference reference) throws IOException { + return restoreSession.readFileBytes(fileName, reference); + } + + @Override + public void close() { + restoreSession.decRef(); + } } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java index 2d3ca857ff848..36e1027dc5f87 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -21,7 +22,6 @@ import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.index.IndexSettings; @@ -40,8 +40,8 @@ import java.io.IOException; import java.util.Locale; import java.util.Map; -import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Collections.singletonMap; import static org.elasticsearch.snapshots.RestoreService.restoreInProgress; @@ -159,7 +159,7 @@ public void testThatRepositoryRecoversEmptyIndexBasedOnLeaderSettings() throws I assertNotEquals(leaderMetadata.getIndexUUID(), followerMetadata.getIndexUUID()); } - public void testThatSessionIsRegisteredWithPrimaryShard() throws IOException { + public void testDocsAreRecovered() throws Exception { String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster"; String leaderIndex = "index1"; String followerIndex = "index2"; @@ -173,6 +173,45 @@ public void testThatSessionIsRegisteredWithPrimaryShard() throws IOException { final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class); final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class); + final int firstBatchNumDocs = randomIntBetween(1, 64); + logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs); + for (int i = 0; i < firstBatchNumDocs; i++) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); + leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); + } + + leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).setWaitIfOngoing(true).get(); + + AtomicBoolean isRunning = new AtomicBoolean(true); + + // Concurrently index new docs with mapping changes + Thread thread = new Thread(() -> { + char[] chars = "abcdeghijklmnopqrstuvwxyz".toCharArray(); + for (char c : chars) { + if (isRunning.get() == false) { + break; + } + final String source; + long l = randomLongBetween(0, 50000); + if (randomBoolean()) { + source = String.format(Locale.ROOT, "{\"%c\":%d}", c, l); + } else { + source = String.format(Locale.ROOT, "{\"%c\":\"%d\"}", c, l); + } + for (int i = 64; i < 150; i++) { + if (isRunning.get() == false) { + break; + } + leaderClient().prepareIndex("index1", "doc", Long.toString(i)).setSource(source, XContentType.JSON).get(); + if (rarely()) { + leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).get(); + } + } + leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).setWaitIfOngoing(true).get(); + } + }); + thread.start(); + Settings.Builder settingsBuilder = Settings.builder() .put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex) .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true); @@ -182,22 +221,18 @@ public void testThatSessionIsRegisteredWithPrimaryShard() throws IOException { false, true, settingsBuilder.build(), new String[0], "restore_snapshot[" + leaderClusterRepoName + ":" + leaderIndex + "]"); - Set sessionsOpened = ConcurrentCollections.newConcurrentSet(); - Set sessionsClosed = ConcurrentCollections.newConcurrentSet(); - for (CcrRestoreSourceService restoreSourceService : getLeaderCluster().getDataNodeInstances(CcrRestoreSourceService.class)) { - restoreSourceService.addOpenSessionListener(sessionsOpened::add); - restoreSourceService.addCloseSessionListener(sessionsClosed::add); - } - PlainActionFuture future = PlainActionFuture.newFuture(); restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future)); RestoreInfo restoreInfo = future.actionGet(); - assertEquals(numberOfPrimaryShards, sessionsOpened.size()); - assertEquals(numberOfPrimaryShards, sessionsClosed.size()); - assertEquals(restoreInfo.totalShards(), restoreInfo.successfulShards()); assertEquals(0, restoreInfo.failedShards()); + for (int i = 0; i < firstBatchNumDocs; ++i) { + assertExpectedDocument(followerIndex, i); + } + + isRunning.set(false); + thread.join(); } public void testFollowerMappingIsUpdated() throws IOException { @@ -254,6 +289,13 @@ public void testFollowerMappingIsUpdated() throws IOException { assertThat(XContentMapValues.extractValue("properties.k.type", mappingMetaData.sourceAsMap()), equalTo("long")); } + private void assertExpectedDocument(String followerIndex, final int value) { + final GetResponse getResponse = followerClient().prepareGet(followerIndex, "doc", Integer.toString(value)).get(); + assertTrue("Doc with id [" + value + "] is missing", getResponse.isExists()); + assertTrue((getResponse.getSource().containsKey("f"))); + assertThat(getResponse.getSource().get("f"), equalTo(value)); + } + private ActionListener waitForRestore(ClusterService clusterService, ActionListener listener) { return new ActionListener() { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java index 0ab03a93995f8..c0b7863edf25a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java @@ -6,14 +6,20 @@ package org.elasticsearch.xpack.ccr.repository; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; +import org.elasticsearch.index.store.StoreFileMetaData; import org.junit.Before; import java.io.IOException; +import java.util.ArrayList; import java.util.HashSet; public class CcrRestoreSourceServiceTests extends IndexShardTestCase { @@ -122,4 +128,63 @@ public void testCloseShardListenerFunctionality() throws IOException { restoreSourceService.closeSession(sessionUUID3); closeShards(indexShard1, indexShard2); } + + public void testGetSessionReader() throws IOException { + IndexShard indexShard1 = newStartedShard(true); + final String sessionUUID1 = UUIDs.randomBase64UUID(); + + restoreSourceService.openSession(sessionUUID1, indexShard1); + + ArrayList files = new ArrayList<>(); + indexShard1.snapshotStoreMetadata().forEach(files::add); + + StoreFileMetaData fileMetaData = files.get(0); + String fileName = fileMetaData.name(); + + byte[] expectedBytes = new byte[(int) fileMetaData.length()]; + byte[] actualBytes = new byte[(int) fileMetaData.length()]; + Engine.IndexCommitRef indexCommitRef = indexShard1.acquireSafeIndexCommit(); + try (IndexInput indexInput = indexCommitRef.getIndexCommit().getDirectory().openInput(fileName, IOContext.READONCE)) { + indexInput.seek(0); + indexInput.readBytes(expectedBytes, 0, (int) fileMetaData.length()); + } + + BytesArray byteArray = new BytesArray(actualBytes); + try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID1)) { + long offset = sessionReader.readFileBytes(fileName, byteArray); + assertEquals(offset, fileMetaData.length()); + } + + assertArrayEquals(expectedBytes, actualBytes); + restoreSourceService.closeSession(sessionUUID1); + closeShards(indexShard1); + } + + public void testGetSessionDoesNotLeakFileIfClosed() throws IOException { + Settings settings = Settings.builder().put("index.merge.enabled", false).build(); + IndexShard indexShard = newStartedShard(true, settings); + for (int i = 0; i < 5; i++) { + indexDoc(indexShard, "_doc", Integer.toString(i)); + flushShard(indexShard, true); + } + final String sessionUUID1 = UUIDs.randomBase64UUID(); + + restoreSourceService.openSession(sessionUUID1, indexShard); + + ArrayList files = new ArrayList<>(); + indexShard.snapshotStoreMetadata().forEach(files::add); + + try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID1)) { + sessionReader.readFileBytes(files.get(0).name(), new BytesArray(new byte[10])); + } + + // Request a second file to ensure that original file is not leaked + try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID1)) { + sessionReader.readFileBytes(files.get(1).name(), new BytesArray(new byte[10])); + } + + restoreSourceService.closeSession(sessionUUID1); + closeShards(indexShard); + // Exception will be thrown if file is not closed. + } }