From afbf6fda5a499887f5c2c6215df3d6dd88916a3c Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 9 Dec 2020 17:07:41 +0000 Subject: [PATCH 01/10] Snapshot of a searchable snapshot should be empty Today if you take a snapshot of a searchable snapshot index then we treat it like a normal index and copy (any unchanged parts of) its contents the the repository. This is often a complete copy, doubling the snapshot storage required, since a searchable snapshot index typically has a different name from the original index; it may also be that we are taking a snapshot into a different repository. The content of a searchable snapshot is already held in a snapshot, and its index metadata indicates how to find this content, so it is wasteful to copy anything new into the repository. This commit adjusts things so that a searchable snapshot shard presents itself to the snapshotter as if it contained no segments, and adjusts things to account for the consequent mismatch at restore time. Closes #66110 --- .../elasticsearch/index/engine/Engine.java | 8 ++ .../elasticsearch/index/shard/IndexShard.java | 13 +++ .../snapshots/blobstore/SnapshotFiles.java | 4 + .../store/ImmutableDirectoryException.java | 29 ++++++ .../blobstore/FileRestoreContext.java | 5 ++ .../snapshots/SnapshotShardsService.java | 3 +- .../SearchableSnapshotsIntegTests.java | 89 ++++++++++++++++++- .../store/InMemoryNoOpCommitDirectory.java | 3 +- .../SearchableSnapshots.java | 38 +++++++- 9 files changed, 186 insertions(+), 6 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/store/ImmutableDirectoryException.java diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 5259c1da80a48..6d92e07ef26d2 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1076,6 +1076,14 @@ public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyE */ public abstract IndexCommitRef acquireSafeIndexCommit() throws EngineException; + /** + * Acquires the index commit that should be included in a snapshot. + */ + public IndexCommitRef acquireIndexCommitForSnapshot() throws EngineException { + // by default we flush first so that the snapshot is as up-to-date as possible. + return acquireLastIndexCommit(true); + } + /** * @return a summary of the contents of the current safe commit */ diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index dc7fb0ed6654d..86e0b5756350e 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1179,6 +1179,19 @@ public Engine.IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws E } } + /** + * Acquires the {@link IndexCommit} which should be included in a snapshot. + */ + public Engine.IndexCommitRef acquireIndexCommitForSnapshot() throws EngineException { + final IndexShardState state = this.state; // one time volatile read + if (state == IndexShardState.STARTED) { + // unlike acquireLastIndexCommit(), there's no need to acquire a snapshot on a shard that is shutting down + return getEngine().acquireIndexCommitForSnapshot(); + } else { + throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed"); + } + } + /** * Snapshots the most recent safe index commit from the currently running engine. * All index files referenced by this index commit won't be freed until the commit/snapshot is closed. diff --git a/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/SnapshotFiles.java b/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/SnapshotFiles.java index bde19406a5e80..a582e9cf3e3f9 100644 --- a/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/SnapshotFiles.java +++ b/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/SnapshotFiles.java @@ -128,4 +128,8 @@ private FileInfo findPhysicalIndexFile(String physicalName) { return physicalFiles.get(physicalName); } + @Override + public String toString() { + return "SnapshotFiles{" + indexFiles.toString() + "}"; + } } diff --git a/server/src/main/java/org/elasticsearch/index/store/ImmutableDirectoryException.java b/server/src/main/java/org/elasticsearch/index/store/ImmutableDirectoryException.java new file mode 100644 index 0000000000000..c49b22c69aa6a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/store/ImmutableDirectoryException.java @@ -0,0 +1,29 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.store; + +/** + * Exception thrown if trying to mutate files in an immutable directory. + */ +public class ImmutableDirectoryException extends IllegalArgumentException { + public ImmutableDirectoryException(String message) { + super(message); + } +} diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java index 4faea615a38ef..a18691814793d 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java @@ -28,6 +28,7 @@ import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles; +import org.elasticsearch.index.store.ImmutableDirectoryException; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetadata; import org.elasticsearch.indices.recovery.RecoveryState; @@ -190,6 +191,10 @@ private void afterRestore(SnapshotFiles snapshotFiles, Store store, StoreFileMet try { store.deleteQuiet("restore", storeFile); store.directory().deleteFile(storeFile); + } catch (ImmutableDirectoryException e) { + // snapshots of immutable directories only contain an empty `segments_N` file since the data lives elsewhere, and if we + // restore such a snapshot then the real data is already present in the directory and cannot be removed. + assert snapshotFiles.indexFiles().size() == 1 : snapshotFiles; } catch (IOException e) { logger.warn("[{}] [{}] failed to delete file [{}] during snapshot cleanup", shardId, snapshotId, storeFile); } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 1bd999d7bd37b..b2f010774dff8 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -339,8 +339,7 @@ private void snapshot(final ShardId shardId, final Snapshot snapshot, final Inde final Repository repository = repositoriesService.repository(snapshot.getRepository()); Engine.IndexCommitRef snapshotRef = null; try { - // we flush first to make sure we get the latest writes snapshotted - snapshotRef = indexShard.acquireLastIndexCommit(true); + snapshotRef = indexShard.acquireIndexCommitForSnapshot(); final IndexCommit snapshotIndexCommit = snapshotRef.getIndexCommit(); repository.snapshotShard(indexShard.store(), indexShard.mapperService(), snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(), getShardStateId(indexShard, snapshotIndexCommit), snapshotStatus, version, userMetadata, diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java index f12a7d87df671..a4680d1464885 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java @@ -10,6 +10,8 @@ import org.apache.lucene.search.TotalHits; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStatus; +import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStats; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; @@ -40,6 +42,7 @@ import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider; @@ -51,7 +54,6 @@ import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsRequest; import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsResponse; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; -import org.hamcrest.Matchers; import org.joda.time.Instant; import java.io.IOException; @@ -75,6 +77,7 @@ import java.util.stream.Stream; import java.util.stream.StreamSupport; +import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING; import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING; import static org.elasticsearch.index.query.QueryBuilders.matchQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -86,6 +89,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.sameInstance; @@ -785,6 +789,87 @@ public void testSnapshotMountedIndexWithTimestampsRecordsTimestampRangeInIndexMe } } + public void testSnapshotOfSearchableSnapshotIncludesNoDataButCanBeRestored() throws Exception { + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createAndPopulateIndex( + indexName, + Settings.builder().put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1).put(INDEX_SOFT_DELETES_SETTING.getKey(), true) + ); + + final TotalHits originalAllHits = internalCluster().client() + .prepareSearch(indexName) + .setTrackTotalHits(true) + .get() + .getHits() + .getTotalHits(); + final TotalHits originalBarHits = internalCluster().client() + .prepareSearch(indexName) + .setTrackTotalHits(true) + .setQuery(matchQuery("foo", "bar")) + .get() + .getHits() + .getTotalHits(); + logger.info("--> [{}] in total, of which [{}] match the query", originalAllHits, originalBarHits); + + final String repositoryName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createRepository(repositoryName, "fs"); + + final SnapshotId snapshotOne = createSnapshot(repositoryName, "snapshot-1", List.of(indexName)).snapshotId(); + for (final SnapshotStatus snapshotStatus : client().admin() + .cluster() + .prepareSnapshotStatus(repositoryName) + .setSnapshots(snapshotOne.getName()) + .get() + .getSnapshots()) { + for (final SnapshotIndexShardStatus snapshotIndexShardStatus : snapshotStatus.getShards()) { + final SnapshotStats stats = snapshotIndexShardStatus.getStats(); + assertThat(stats.getIncrementalFileCount(), greaterThan(1)); + assertThat(stats.getProcessedFileCount(), greaterThan(1)); + assertThat(stats.getTotalFileCount(), greaterThan(1)); + } + } + assertAcked(client().admin().indices().prepareDelete(indexName)); + + final String restoredIndexName = randomValueOtherThan(indexName, () -> randomAlphaOfLength(10).toLowerCase(Locale.ROOT)); + mountSnapshot(repositoryName, snapshotOne.getName(), indexName, restoredIndexName, Settings.EMPTY); + ensureGreen(restoredIndexName); + + logger.info("--> starting to take snapshot-2"); + final SnapshotId snapshotTwo = createSnapshot(repositoryName, "snapshot-2", List.of(restoredIndexName)).snapshotId(); + logger.info("--> finished taking snapshot-2"); + for (final SnapshotStatus snapshotStatus : client().admin() + .cluster() + .prepareSnapshotStatus(repositoryName) + .setSnapshots(snapshotTwo.getName()) + .get() + .getSnapshots()) { + assertThat(snapshotStatus.getIndices().size(), equalTo(1)); + assertTrue(snapshotStatus.getIndices().containsKey(restoredIndexName)); + for (final SnapshotIndexShardStatus snapshotIndexShardStatus : snapshotStatus.getShards()) { + final SnapshotStats stats = snapshotIndexShardStatus.getStats(); + assertThat(stats.getIncrementalFileCount(), equalTo(1)); + assertThat(stats.getProcessedFileCount(), equalTo(1)); + assertThat(stats.getTotalFileCount(), equalTo(1)); + } + } + assertAcked(client().admin().indices().prepareDelete(restoredIndexName)); + + logger.info("--> starting to restore snapshot-2"); + assertThat( + client().admin() + .cluster() + .prepareRestoreSnapshot(repositoryName, snapshotTwo.getName()) + .setIndices(restoredIndexName) + .get() + .status(), + equalTo(RestStatus.ACCEPTED) + ); + ensureGreen(restoredIndexName); + logger.info("--> finished restoring snapshot-2"); + + assertTotalHits(restoredIndexName, originalAllHits, originalBarHits); + } + private void assertTotalHits(String indexName, TotalHits originalAllHits, TotalHits originalBarHits) throws Exception { final Thread[] threads = new Thread[between(1, 5)]; final AtomicArray allHits = new AtomicArray<>(threads.length); @@ -864,7 +949,7 @@ private void assertSearchableSnapshotStats(String indexName, boolean cacheEnable new SearchableSnapshotsStatsRequest(indexName) ).actionGet(); final NumShards restoredNumShards = getNumShards(indexName); - assertThat(statsResponse.getStats(), Matchers.hasSize(restoredNumShards.totalNumShards)); + assertThat(statsResponse.getStats(), hasSize(restoredNumShards.totalNumShards)); final long totalSize = statsResponse.getStats() .stream() diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/InMemoryNoOpCommitDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/InMemoryNoOpCommitDirectory.java index 609e872ebdb46..c6fa167f5ea62 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/InMemoryNoOpCommitDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/InMemoryNoOpCommitDirectory.java @@ -119,7 +119,8 @@ private static void ensureMutable(String name) { || name.startsWith("pending_segments_") || name.matches("^recovery\\..*\\.segments_.*$")) == false) { - throw new IllegalArgumentException("file [" + name + "] is not mutable"); + throw new ImmutableDirectoryException("file [" + name + "] is not mutable"); } } + } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index bf5a869d44fe6..5178983239676 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -5,7 +5,12 @@ */ package org.elasticsearch.xpack.searchablesnapshots; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.store.Directory; import org.apache.lucene.util.SetOnce; +import org.apache.lucene.util.Version; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.blobstore.cache.BlobStoreCacheService; @@ -17,6 +22,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Setting; @@ -30,6 +36,7 @@ import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.ReadOnlyEngine; import org.elasticsearch.index.store.SearchableSnapshotDirectory; @@ -68,9 +75,13 @@ import org.elasticsearch.xpack.searchablesnapshots.rest.RestMountSearchableSnapshotAction; import org.elasticsearch.xpack.searchablesnapshots.rest.RestSearchableSnapshotsStatsAction; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -271,7 +282,32 @@ public Optional getEngineFactory(IndexSettings indexSettings) { if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexSettings.getSettings()) && indexSettings.getSettings().getAsBoolean("index.frozen", false) == false) { return Optional.of( - engineConfig -> new ReadOnlyEngine(engineConfig, null, new TranslogStats(), false, Function.identity(), false) + engineConfig -> new ReadOnlyEngine(engineConfig, null, new TranslogStats(), false, Function.identity(), false) { + + // present an empty IndexCommit to the snapshot mechanism so that we copy no shard data to the repository + private final IndexCommit emptyIndexCommit; + + { + try { + final Directory directory = engineConfig.getStore().directory(); + final String oldestSegmentsFile = Arrays.stream(directory.listAll()) + .filter(s -> s.startsWith(IndexFileNames.SEGMENTS + "_")) + .min(Comparator.naturalOrder()) + .orElseThrow(() -> new IOException("segments_N file not found")); + final SegmentInfos segmentInfos = new SegmentInfos(Version.LATEST.major); + segmentInfos.updateGeneration(SegmentInfos.readCommit(directory, oldestSegmentsFile)); + emptyIndexCommit = Lucene.getIndexCommit(segmentInfos, directory); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public IndexCommitRef acquireIndexCommitForSnapshot() throws EngineException { + store.incRef(); + return new IndexCommitRef(emptyIndexCommit, store::decRef); + } + } ); } return Optional.empty(); From cc3403a199e26d288a5b0ac029678a24623bddc1 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 10 Dec 2020 12:45:50 +0000 Subject: [PATCH 02/10] Sort the segments files properly --- .../SearchableSnapshots.java | 4 +- .../SearchableSnapshotsTests.java | 39 +++++++++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) create mode 100644 x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsTests.java diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index 5178983239676..5cd2be64d0d43 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -277,6 +277,8 @@ public Map getDirectoryFactories() { }); } + static final Comparator SEGMENT_FILENAME_COMPARATOR = Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName); + @Override public Optional getEngineFactory(IndexSettings indexSettings) { if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexSettings.getSettings()) @@ -292,7 +294,7 @@ public Optional getEngineFactory(IndexSettings indexSettings) { final Directory directory = engineConfig.getStore().directory(); final String oldestSegmentsFile = Arrays.stream(directory.listAll()) .filter(s -> s.startsWith(IndexFileNames.SEGMENTS + "_")) - .min(Comparator.naturalOrder()) + .min(SEGMENT_FILENAME_COMPARATOR) .orElseThrow(() -> new IOException("segments_N file not found")); final SegmentInfos segmentInfos = new SegmentInfos(Version.LATEST.major); segmentInfos.updateGeneration(SegmentInfos.readCommit(directory, oldestSegmentsFile)); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsTests.java new file mode 100644 index 0000000000000..e8926ed13cde2 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsTests.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.searchablesnapshots; + +import org.apache.lucene.index.IndexFileNames; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +import static org.hamcrest.Matchers.greaterThanOrEqualTo; + +public class SearchableSnapshotsTests extends ESTestCase { + + public void testSegmentFilenameComparator() { + + List> fileNames = new ArrayList<>(); + while (fileNames.size() < 100) { + final long generation = randomLongBetween(1, 10000); + fileNames.add(Tuple.tuple(generation, IndexFileNames.fileNameFromGeneration(IndexFileNames.SEGMENTS, "", generation))); + } + + fileNames.sort(Comparator.comparing(Tuple::v2, SearchableSnapshots.SEGMENT_FILENAME_COMPARATOR)); + long previousGeneration = 0; + for (Tuple fileName : fileNames) { + assertThat(fileNames + " should be sorted, found disorder at " + fileName, + fileName.v1(), greaterThanOrEqualTo(previousGeneration)); + previousGeneration = fileName.v1(); + } + + } + +} From 68971b58e418000398ceddae02c4f639165ded77 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 10 Dec 2020 12:49:36 +0000 Subject: [PATCH 03/10] Fix test that already tested this path --- .../searchablesnapshots/SearchableSnapshotsIntegTests.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java index a4680d1464885..d49490ab632d7 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java @@ -703,9 +703,9 @@ public void testSnapshotMountedIndexLeavesBlobsUntouched() throws Exception { .get() .getSnapshots() .get(0); - assertThat(snapshotTwoStatus.getStats().getTotalFileCount(), equalTo(snapshotOneTotalFileCount)); - assertThat(snapshotTwoStatus.getStats().getIncrementalFileCount(), equalTo(numShards)); // one segment_N per shard - assertThat(snapshotTwoStatus.getStats().getProcessedFileCount(), equalTo(numShards)); // one segment_N per shard + assertThat(snapshotTwoStatus.getStats().getTotalFileCount(), equalTo(numShards)); // one segment_N per shard + assertThat(snapshotTwoStatus.getStats().getIncrementalFileCount(), equalTo(0)); + assertThat(snapshotTwoStatus.getStats().getProcessedFileCount(), equalTo(0)); } public void testSnapshotMountedIndexWithTimestampsRecordsTimestampRangeInIndexMetadata() throws Exception { From a04c04c2af47f68850a904a34456851475a04cf1 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 10 Dec 2020 13:03:56 +0000 Subject: [PATCH 04/10] Add comment on why we futz around with generations --- .../xpack/searchablesnapshots/SearchableSnapshots.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index 5cd2be64d0d43..db229b69d6559 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -291,6 +291,12 @@ public Optional getEngineFactory(IndexSettings indexSettings) { { try { + // We have to use a generation number that corresponds to a real segments_N file since we read this file from + // the directory during the snapshotting process. The oldest segments_N file is the one in the snapshot + // (recalling that we may perform some no-op commits which make newer segments_N files too). The good thing + // about using the oldest segments_N file is that a restore will find that we already have this file "locally", + // avoid overwriting the real one with the bogus one, and then use the real one for the rest of the recovery. + final Directory directory = engineConfig.getStore().directory(); final String oldestSegmentsFile = Arrays.stream(directory.listAll()) .filter(s -> s.startsWith(IndexFileNames.SEGMENTS + "_")) From b0e3567100e31b4b6a774a90f7c8a1eeac0c7ba3 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 10 Dec 2020 13:04:06 +0000 Subject: [PATCH 05/10] Assert we don't overwrite the existing segments_N file --- .../index/store/InMemoryNoOpCommitDirectory.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/InMemoryNoOpCommitDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/InMemoryNoOpCommitDirectory.java index c6fa167f5ea62..e9488d0b94dda 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/InMemoryNoOpCommitDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/InMemoryNoOpCommitDirectory.java @@ -17,6 +17,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.nio.file.NoSuchFileException; +import java.util.Arrays; import java.util.Collection; import java.util.Set; @@ -75,6 +76,7 @@ public void syncMetaData() {} @Override public IndexOutput createOutput(String name, IOContext context) throws IOException { ensureMutable(name); + assert notOverwritingRealSegmentsFile(name) : name; return super.createOutput(name, context); } @@ -82,6 +84,7 @@ public IndexOutput createOutput(String name, IOContext context) throws IOExcepti public void rename(String source, String dest) throws IOException { ensureMutable(source); ensureMutable(dest); + assert notOverwritingRealSegmentsFile(dest) : dest; super.rename(source, dest); } @@ -123,4 +126,8 @@ private static void ensureMutable(String name) { } } + private boolean notOverwritingRealSegmentsFile(String name) throws IOException { + return name.startsWith("segments_") == false || Arrays.stream(realDirectory.listAll()).noneMatch(s -> s.equals(name)); + } + } From 63383c77547a0760a274d83ab026dd62eb6fffc9 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 10 Dec 2020 13:05:27 +0000 Subject: [PATCH 06/10] More fields and less toString in toString --- .../elasticsearch/index/snapshots/blobstore/SnapshotFiles.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/SnapshotFiles.java b/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/SnapshotFiles.java index a582e9cf3e3f9..fcc92294b4a81 100644 --- a/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/SnapshotFiles.java +++ b/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/SnapshotFiles.java @@ -130,6 +130,7 @@ private FileInfo findPhysicalIndexFile(String physicalName) { @Override public String toString() { - return "SnapshotFiles{" + indexFiles.toString() + "}"; + return "SnapshotFiles{snapshot=[" + snapshot + "], shardStateIdentifier=[" + shardStateIdentifier + "], indexFiles=" + indexFiles + + "}"; } } From ea23c8dabe493a6442ae7cc2aeb1788683a0a4dd Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 10 Dec 2020 13:14:32 +0000 Subject: [PATCH 07/10] Spotless --- .../searchablesnapshots/SearchableSnapshotsTests.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsTests.java index e8926ed13cde2..9a72f4a5bac19 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsTests.java @@ -29,8 +29,11 @@ public void testSegmentFilenameComparator() { fileNames.sort(Comparator.comparing(Tuple::v2, SearchableSnapshots.SEGMENT_FILENAME_COMPARATOR)); long previousGeneration = 0; for (Tuple fileName : fileNames) { - assertThat(fileNames + " should be sorted, found disorder at " + fileName, - fileName.v1(), greaterThanOrEqualTo(previousGeneration)); + assertThat( + fileNames + " should be sorted, found disorder at " + fileName, + fileName.v1(), + greaterThanOrEqualTo(previousGeneration) + ); previousGeneration = fileName.v1(); } From f0ef5056b21c728aa3ec01bc1553e688472dec7d Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 10 Dec 2020 15:20:57 +0000 Subject: [PATCH 08/10] Extract method for constructing empty index commit --- .../SearchableSnapshotsConstants.java | 12 +---- .../SearchableSnapshotsUtils.java | 54 +++++++++++++++++++ .../SearchableSnapshotsUtilsTests.java} | 4 +- .../cache/CachedBlobContainerIndexInput.java | 2 +- .../ChecksumBlobContainerIndexInput.java | 2 +- .../direct/DirectBlobContainerIndexInput.java | 2 +- .../SearchableSnapshots.java | 35 +----------- .../cache/CacheService.java | 2 +- .../index/store/IndexInputStatsTests.java | 2 +- ...SearchableSnapshotDirectoryStatsTests.java | 2 +- .../SearchableSnapshotDirectoryTests.java | 2 +- .../store/cache/SparseFileTrackerTests.java | 2 +- .../index/store/cache/TestUtils.java | 2 +- .../DirectBlobContainerIndexInputTests.java | 2 +- 14 files changed, 69 insertions(+), 56 deletions(-) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsUtils.java rename x-pack/plugin/{searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsTests.java => core/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsUtilsTests.java} (92%) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsConstants.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsConstants.java index 211f870dc2a11..919ef5a3ba721 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsConstants.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsConstants.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.searchablesnapshots; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeUnit; import static org.elasticsearch.index.IndexModule.INDEX_STORE_TYPE_SETTING; @@ -27,14 +26,5 @@ public static boolean isSearchableSnapshotStore(Settings indexSettings) { public static final String SNAPSHOT_BLOB_CACHE_INDEX = ".snapshot-blob-cache"; - /** - * We use {@code long} to represent offsets and lengths of files since they may be larger than 2GB, but {@code int} to represent - * offsets and lengths of arrays in memory which are limited to 2GB in size. We quite often need to convert from the file-based world - * of {@code long}s into the memory-based world of {@code int}s, knowing for certain that the result will not overflow. This method - * should be used to clarify that we're doing this. - */ - public static int toIntBytes(long l) { - return ByteSizeUnit.BYTES.toIntBytes(l); - } - } + diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsUtils.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsUtils.java new file mode 100644 index 0000000000000..784c92a2dcf6e --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsUtils.java @@ -0,0 +1,54 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.searchablesnapshots; + +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.Version; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.unit.ByteSizeUnit; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Comparator; + +public class SearchableSnapshotsUtils { + + static final Comparator SEGMENT_FILENAME_COMPARATOR = Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName); + + public static IndexCommit emptyIndexCommit(Directory directory) { + try { + // We have to use a generation number that corresponds to a real segments_N file since we read this file from + // the directory during the snapshotting process. The oldest segments_N file is the one in the snapshot + // (recalling that we may perform some no-op commits which make newer segments_N files too). The good thing + // about using the oldest segments_N file is that a restore will find that we already have this file "locally", + // avoid overwriting the real one with the bogus one, and then use the real one for the rest of the recovery. + final String oldestSegmentsFile = Arrays.stream(directory.listAll()) + .filter(s -> s.startsWith(IndexFileNames.SEGMENTS + "_")) + .min(SEGMENT_FILENAME_COMPARATOR) + .orElseThrow(() -> new IOException("segments_N file not found")); + final SegmentInfos segmentInfos = new SegmentInfos(Version.LATEST.major); + segmentInfos.updateGeneration(SegmentInfos.readCommit(directory, oldestSegmentsFile)); + return Lucene.getIndexCommit(segmentInfos, directory); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + /** + * We use {@code long} to represent offsets and lengths of files since they may be larger than 2GB, but {@code int} to represent + * offsets and lengths of arrays in memory which are limited to 2GB in size. We quite often need to convert from the file-based world + * of {@code long}s into the memory-based world of {@code int}s, knowing for certain that the result will not overflow. This method + * should be used to clarify that we're doing this. + */ + public static int toIntBytes(long l) { + return ByteSizeUnit.BYTES.toIntBytes(l); + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsUtilsTests.java similarity index 92% rename from x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsTests.java rename to x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsUtilsTests.java index 9a72f4a5bac19..a2f2ade1cce7e 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsUtilsTests.java @@ -16,7 +16,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; -public class SearchableSnapshotsTests extends ESTestCase { +public class SearchableSnapshotsUtilsTests extends ESTestCase { public void testSegmentFilenameComparator() { @@ -26,7 +26,7 @@ public void testSegmentFilenameComparator() { fileNames.add(Tuple.tuple(generation, IndexFileNames.fileNameFromGeneration(IndexFileNames.SEGMENTS, "", generation))); } - fileNames.sort(Comparator.comparing(Tuple::v2, SearchableSnapshots.SEGMENT_FILENAME_COMPARATOR)); + fileNames.sort(Comparator.comparing(Tuple::v2, SearchableSnapshotsUtils.SEGMENT_FILENAME_COMPARATOR)); long previousGeneration = 0; for (Tuple fileName : fileNames) { assertThat( diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java index 09b692a178dc4..aeba64ee12468 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java @@ -45,7 +45,7 @@ import java.util.stream.IntStream; import static org.elasticsearch.index.store.checksum.ChecksumBlobContainerIndexInput.checksumToBytesArray; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.toIntBytes; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsUtils.toIntBytes; public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexInput { diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/checksum/ChecksumBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/checksum/ChecksumBlobContainerIndexInput.java index 254a79750d339..b34f5c1ce7dac 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/checksum/ChecksumBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/checksum/ChecksumBlobContainerIndexInput.java @@ -17,7 +17,7 @@ import java.util.Arrays; import java.util.Objects; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.toIntBytes; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsUtils.toIntBytes; /** * A {@link IndexInput} that can only be used to verify footer checksums. diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java index 11cb7ebc813f2..a4eaff0eee49e 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java @@ -25,7 +25,7 @@ import java.util.Objects; import java.util.concurrent.atomic.LongAdder; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.toIntBytes; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsUtils.toIntBytes; /** * A {@link DirectBlobContainerIndexInput} instance corresponds to a single file from a Lucene directory that has been snapshotted. Because diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index db229b69d6559..0b5ee5964a3a8 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -6,11 +6,7 @@ package org.elasticsearch.xpack.searchablesnapshots; import org.apache.lucene.index.IndexCommit; -import org.apache.lucene.index.IndexFileNames; -import org.apache.lucene.index.SegmentInfos; -import org.apache.lucene.store.Directory; import org.apache.lucene.util.SetOnce; -import org.apache.lucene.util.Version; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.blobstore.cache.BlobStoreCacheService; @@ -22,7 +18,6 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Setting; @@ -75,13 +70,9 @@ import org.elasticsearch.xpack.searchablesnapshots.rest.RestMountSearchableSnapshotAction; import org.elasticsearch.xpack.searchablesnapshots.rest.RestSearchableSnapshotsStatsAction; -import java.io.IOException; -import java.io.UncheckedIOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -95,6 +86,7 @@ import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_BLOB_CACHE_INDEX; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_RECOVERY_STATE_FACTORY_KEY; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsUtils.emptyIndexCommit; /** * Plugin for Searchable Snapshots feature @@ -277,8 +269,6 @@ public Map getDirectoryFactories() { }); } - static final Comparator SEGMENT_FILENAME_COMPARATOR = Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName); - @Override public Optional getEngineFactory(IndexSettings indexSettings) { if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexSettings.getSettings()) @@ -287,28 +277,7 @@ public Optional getEngineFactory(IndexSettings indexSettings) { engineConfig -> new ReadOnlyEngine(engineConfig, null, new TranslogStats(), false, Function.identity(), false) { // present an empty IndexCommit to the snapshot mechanism so that we copy no shard data to the repository - private final IndexCommit emptyIndexCommit; - - { - try { - // We have to use a generation number that corresponds to a real segments_N file since we read this file from - // the directory during the snapshotting process. The oldest segments_N file is the one in the snapshot - // (recalling that we may perform some no-op commits which make newer segments_N files too). The good thing - // about using the oldest segments_N file is that a restore will find that we already have this file "locally", - // avoid overwriting the real one with the bogus one, and then use the real one for the rest of the recovery. - - final Directory directory = engineConfig.getStore().directory(); - final String oldestSegmentsFile = Arrays.stream(directory.listAll()) - .filter(s -> s.startsWith(IndexFileNames.SEGMENTS + "_")) - .min(SEGMENT_FILENAME_COMPARATOR) - .orElseThrow(() -> new IOException("segments_N file not found")); - final SegmentInfos segmentInfos = new SegmentInfos(Version.LATEST.major); - segmentInfos.updateGeneration(SegmentInfos.readCommit(directory, oldestSegmentsFile)); - emptyIndexCommit = Lucene.getIndexCommit(segmentInfos, directory); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } + private final IndexCommit emptyIndexCommit = emptyIndexCommit(engineConfig.getStore().directory()); @Override public IndexCommitRef acquireIndexCommitForSnapshot() throws EngineException { diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java index b3bdaafd96cc0..8c674e8936045 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java @@ -41,7 +41,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.toIntBytes; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsUtils.toIntBytes; /** * {@link CacheService} maintains a cache entry for all files read from searchable snapshot directories (see diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/IndexInputStatsTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/IndexInputStatsTests.java index 03ce4d3c6d2f6..8a77b32331f55 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/IndexInputStatsTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/IndexInputStatsTests.java @@ -11,7 +11,7 @@ import static org.elasticsearch.index.store.IndexInputStats.SEEKING_THRESHOLD; import static org.elasticsearch.index.store.cache.TestUtils.assertCounter; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.toIntBytes; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsUtils.toIntBytes; public class IndexInputStatsTests extends ESTestCase { diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java index 1d45ccd3f0d0d..6eaf15baa2e5f 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java @@ -50,7 +50,7 @@ import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.toIntBytes; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsUtils.toIntBytes; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java index 462f741edf859..6b45332338d6f 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java @@ -124,7 +124,7 @@ import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_REPOSITORY_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.toIntBytes; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsUtils.toIntBytes; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java index c63ab172dd896..cb54762d34f76 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/SparseFileTrackerTests.java @@ -27,7 +27,7 @@ import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet; import static org.elasticsearch.index.store.cache.TestUtils.mergeContiguousRanges; import static org.elasticsearch.index.store.cache.TestUtils.randomRanges; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.toIntBytes; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsUtils.toIntBytes; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java index 6073e040bf668..8f6da5eac6d11 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java @@ -52,7 +52,7 @@ import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.elasticsearch.test.ESTestCase.between; import static org.elasticsearch.test.ESTestCase.randomLongBetween; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.toIntBytes; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsUtils.toIntBytes; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertTrue; diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java index f9bfb50312f5a..65b3bef0d65b3 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java @@ -24,7 +24,7 @@ import java.nio.charset.StandardCharsets; import java.util.concurrent.atomic.AtomicInteger; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.toIntBytes; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsUtils.toIntBytes; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; From 794e93534b2edc211487725f841da7c4c902b41b Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 10 Dec 2020 16:36:57 +0000 Subject: [PATCH 09/10] Add REST test for snapshotting a searchable snapshot --- .../SearchableSnapshotsIntegTests.java | 6 + ...stractSearchableSnapshotsRestTestCase.java | 104 ++++++++++++------ 2 files changed, 78 insertions(+), 32 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java index d49490ab632d7..2e293d75448fe 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java @@ -834,6 +834,12 @@ public void testSnapshotOfSearchableSnapshotIncludesNoDataButCanBeRestored() thr mountSnapshot(repositoryName, snapshotOne.getName(), indexName, restoredIndexName, Settings.EMPTY); ensureGreen(restoredIndexName); + if (randomBoolean() && false) { + // NB skipped as it doesn't work today + logger.info("--> closing index before snapshot"); + assertAcked(client().admin().indices().prepareClose(restoredIndexName)); + } + logger.info("--> starting to take snapshot-2"); final SnapshotId snapshotTwo = createSnapshot(repositoryName, "snapshot-2", List.of(restoredIndexName)).snapshotId(); logger.info("--> finished taking snapshot-2"); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsRestTestCase.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsRestTestCase.java index 0dd84a588c00c..2d3728f16ad50 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsRestTestCase.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsRestTestCase.java @@ -14,7 +14,6 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.support.XContentMapValues; @@ -32,9 +31,13 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; public abstract class AbstractSearchableSnapshotsRestTestCase extends ESRestTestCase { + private static final String REPOSITORY_NAME = "repository"; + private static final String SNAPSHOT_NAME = "searchable-snapshot"; + protected abstract String repositoryType(); protected abstract Settings repositorySettings(); @@ -43,9 +46,8 @@ private void runSearchableSnapshotsTest(SearchableSnapshotsTestCaseBody testCase final String repositoryType = repositoryType(); final Settings repositorySettings = repositorySettings(); - final String repository = "repository"; - logger.info("creating repository [{}] of type [{}]", repository, repositoryType); - registerRepository(repository, repositoryType, true, repositorySettings); + logger.info("creating repository [{}] of type [{}]", REPOSITORY_NAME, repositoryType); + registerRepository(REPOSITORY_NAME, repositoryType, true, repositorySettings); final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); final int numberOfShards = randomIntBetween(1, 5); @@ -90,21 +92,19 @@ private void runSearchableSnapshotsTest(SearchableSnapshotsTestCaseBody testCase logger.info("force merging index [{}]", indexName); forceMerge(indexName, randomBoolean(), randomBoolean()); - final String snapshot = "searchable-snapshot"; - // Remove the snapshots, if a previous test failed to delete them. This is // useful for third party tests that runs the test against a real external service. - deleteSnapshot(repository, snapshot, true); + deleteSnapshot(SNAPSHOT_NAME, true); - logger.info("creating snapshot [{}]", snapshot); - createSnapshot(repository, snapshot, true); + logger.info("creating snapshot [{}]", SNAPSHOT_NAME); + createSnapshot(REPOSITORY_NAME, SNAPSHOT_NAME, true); logger.info("deleting index [{}]", indexName); deleteIndex(indexName); final String restoredIndexName = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - logger.info("restoring index [{}] from snapshot [{}] as [{}]", indexName, snapshot, restoredIndexName); - mountSnapshot(repository, snapshot, true, indexName, restoredIndexName, Settings.EMPTY); + logger.info("restoring index [{}] from snapshot [{}] as [{}]", indexName, SNAPSHOT_NAME, restoredIndexName); + mountSnapshot(indexName, restoredIndexName); ensureGreen(restoredIndexName); @@ -113,8 +113,8 @@ private void runSearchableSnapshotsTest(SearchableSnapshotsTestCaseBody testCase testCaseBody.runTest(restoredIndexName, numDocs); - logger.info("deleting snapshot [{}]", snapshot); - deleteSnapshot(repository, snapshot, false); + logger.info("deleting snapshot [{}]", SNAPSHOT_NAME); + deleteSnapshot(SNAPSHOT_NAME, false); } public void testSearchResults() throws Exception { @@ -202,6 +202,58 @@ public void testClearCache() throws Exception { }); } + public void testSnapshotOfSearchableSnapshot() throws Exception { + runSearchableSnapshotsTest((restoredIndexName, numDocs) -> { + + if (randomBoolean() && false) { + // NB skipped as it doesn't work today + logger.info("--> freezing index [{}]", restoredIndexName); + final Request freezeRequest = new Request(HttpPost.METHOD_NAME, restoredIndexName + "/_freeze"); + assertOK(client().performRequest(freezeRequest)); + } + + if (randomBoolean() && false) { + // NB skipped as it doesn't work today + logger.info("--> closing index [{}]", restoredIndexName); + final Request closeRequest = new Request(HttpPost.METHOD_NAME, restoredIndexName + "/_close"); + assertOK(client().performRequest(closeRequest)); + } + + ensureGreen(restoredIndexName); + + final String snapshot2Name = "snapshotception"; + + // Remove the snapshots, if a previous test failed to delete them. This is + // useful for third party tests that runs the test against a real external service. + deleteSnapshot(snapshot2Name, true); + createSnapshot(REPOSITORY_NAME, snapshot2Name, true); + + final List>> snapshotShardsStats = extractValue( + responseAsMap( + client().performRequest( + new Request(HttpGet.METHOD_NAME, "/_snapshot/" + REPOSITORY_NAME + "/" + snapshot2Name + "/_status") + ) + ), + "snapshots.indices." + restoredIndexName + ".shards" + ); + + assertThat(snapshotShardsStats.size(), equalTo(1)); + for (Map value : snapshotShardsStats.get(0).values()) { + assertThat(extractValue(value, "stats.total.file_count"), equalTo(1)); + assertThat(extractValue(value, "stats.incremental.file_count"), lessThanOrEqualTo(1)); + } + + deleteIndex(restoredIndexName); + + restoreSnapshot(REPOSITORY_NAME, snapshot2Name, true); + ensureGreen(restoredIndexName); + + deleteSnapshot(snapshot2Name, false); + + assertSearchResults(restoredIndexName, numDocs, randomFrom(Boolean.TRUE, Boolean.FALSE, null)); + }); + } + private void clearCache(String restoredIndexName) throws IOException { final Request request = new Request(HttpPost.METHOD_NAME, restoredIndexName + "/_searchable_snapshots/cache/clear"); assertOK(client().performRequest(request)); @@ -242,11 +294,11 @@ public void assertSearchResults(String indexName, int numDocs, Boolean ignoreThr } } - protected static void deleteSnapshot(String repository, String snapshot, boolean ignoreMissing) throws IOException { - final Request request = new Request(HttpDelete.METHOD_NAME, "_snapshot/" + repository + '/' + snapshot); + protected static void deleteSnapshot(String snapshot, boolean ignoreMissing) throws IOException { + final Request request = new Request(HttpDelete.METHOD_NAME, "_snapshot/" + REPOSITORY_NAME + '/' + snapshot); try { final Response response = client().performRequest(request); - assertAcked("Failed to delete snapshot [" + snapshot + "] in repository [" + repository + "]: " + response, response); + assertAcked("Failed to delete snapshot [" + snapshot + "] in repository [" + REPOSITORY_NAME + "]: " + response, response); } catch (IOException e) { if (ignoreMissing && e instanceof ResponseException) { Response response = ((ResponseException) e).getResponse(); @@ -257,32 +309,20 @@ protected static void deleteSnapshot(String repository, String snapshot, boolean } } - protected static void mountSnapshot( - String repository, - String snapshot, - boolean waitForCompletion, - String snapshotIndexName, - String mountIndexName, - Settings indexSettings - ) throws IOException { - final Request request = new Request(HttpPost.METHOD_NAME, "/_snapshot/" + repository + "/" + snapshot + "/_mount"); - request.addParameter("wait_for_completion", Boolean.toString(waitForCompletion)); + protected static void mountSnapshot(String snapshotIndexName, String mountIndexName) throws IOException { + final Request request = new Request(HttpPost.METHOD_NAME, "/_snapshot/" + REPOSITORY_NAME + "/" + SNAPSHOT_NAME + "/_mount"); + request.addParameter("wait_for_completion", Boolean.toString(true)); final XContentBuilder builder = JsonXContent.contentBuilder().startObject().field("index", snapshotIndexName); if (snapshotIndexName.equals(mountIndexName) == false || randomBoolean()) { builder.field("renamed_index", mountIndexName); } - if (indexSettings.isEmpty() == false) { - builder.startObject("index_settings"); - indexSettings.toXContent(builder, ToXContent.EMPTY_PARAMS); - builder.endObject(); - } builder.endObject(); request.setJsonEntity(Strings.toString(builder)); final Response response = client().performRequest(request); assertThat( - "Failed to restore snapshot [" + snapshot + "] in repository [" + repository + "]: " + response, + "Failed to restore snapshot [" + SNAPSHOT_NAME + "] in repository [" + REPOSITORY_NAME + "]: " + response, response.getStatusLine().getStatusCode(), equalTo(RestStatus.OK.getStatus()) ); From 0018295a037d3e4d3873a0aecf0f02d15f3f1057 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 14 Dec 2020 09:25:06 +0000 Subject: [PATCH 10/10] Allow plugins to customise the snapshot commit independently of the engine impl --- .../index/shard/IndexShardIT.java | 4 +- .../indices/IndexingMemoryControllerIT.java | 2 +- .../org/elasticsearch/index/IndexModule.java | 19 +++++++- .../org/elasticsearch/index/IndexService.java | 9 ++-- .../elasticsearch/index/engine/Engine.java | 5 +-- .../index/engine/EngineConfig.java | 43 +++++++++++++------ .../elasticsearch/index/shard/IndexShard.java | 32 +++++++++++--- .../elasticsearch/indices/IndicesService.java | 8 +++- .../java/org/elasticsearch/node/Node.java | 10 ++++- .../plugins/IndexStorePlugin.java | 24 +++++++++++ .../elasticsearch/index/IndexModuleTests.java | 3 +- .../index/engine/InternalEngineTests.java | 6 ++- .../index/shard/IndexShardTests.java | 3 +- .../index/shard/RefreshListenersTests.java | 4 +- .../IndexingMemoryControllerTests.java | 2 +- .../snapshots/SnapshotResiliencyTests.java | 4 +- .../index/engine/EngineTestCase.java | 12 +++--- .../index/shard/IndexShardTestCase.java | 4 +- .../index/engine/FollowingEngineTests.java | 4 +- .../SearchableSnapshotsConstants.java | 1 - .../core/LocalStateCompositeXPackPlugin.java | 7 +++ .../SearchableSnapshotsIntegTests.java | 3 +- .../SearchableSnapshots.java | 25 ++++++----- ...stractSearchableSnapshotsRestTestCase.java | 24 ++++++++--- 24 files changed, 189 insertions(+), 69 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index a91fdc07e2d44..459676087a370 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -56,6 +56,7 @@ import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.ShardLock; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; @@ -653,7 +654,8 @@ public static final IndexShard newIndexShard( Arrays.asList(listeners), () -> {}, RetentionLeaseSyncer.EMPTY, - cbs); + cbs, + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); } private static ShardRouting getInitializingShardRouting(ShardRouting existingShardRouting) { diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java index ff13d7d1d02d0..45903ee9da253 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java @@ -68,7 +68,7 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) { config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier()); + config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier(), config.getSnapshotCommitSupplier()); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/IndexModule.java b/server/src/main/java/org/elasticsearch/index/IndexModule.java index ad6fb2205e4ae..b240440ab8439 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexModule.java +++ b/server/src/main/java/org/elasticsearch/index/IndexModule.java @@ -414,13 +414,16 @@ public IndexService newIndexService(IndexService.IndexCreationContext indexCreat NamedWriteableRegistry namedWriteableRegistry, BooleanSupplier idFieldDataEnabled, ValuesSourceRegistry valuesSourceRegistry, - IndexStorePlugin.IndexFoldersDeletionListener indexFoldersDeletionListener) throws IOException { + IndexStorePlugin.IndexFoldersDeletionListener indexFoldersDeletionListener, + Map snapshotCommitSuppliers) throws IOException { final IndexEventListener eventListener = freeze(); Function> readerWrapperFactory = indexReaderWrapper.get() == null ? (shard) -> null : indexReaderWrapper.get(); eventListener.beforeIndexCreated(indexSettings.getIndex(), indexSettings.getSettings()); final IndexStorePlugin.DirectoryFactory directoryFactory = getDirectoryFactory(indexSettings, directoryFactories); final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory = getRecoveryStateFactory(indexSettings, recoveryStateFactories); + final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier + = getSnapshotCommitSupplier(indexSettings, snapshotCommitSuppliers); QueryCache queryCache = null; IndexAnalyzers indexAnalyzers = null; boolean success = false; @@ -443,7 +446,7 @@ public IndexService newIndexService(IndexService.IndexCreationContext indexCreat engineFactory, circuitBreakerService, bigArrays, threadPool, scriptService, clusterService, client, queryCache, directoryFactory, eventListener, readerWrapperFactory, mapperRegistry, indicesFieldDataCache, searchOperationListeners, indexOperationListeners, namedWriteableRegistry, idFieldDataEnabled, allowExpensiveQueries, expressionResolver, - valuesSourceRegistry, recoveryStateFactory, indexFoldersDeletionListener); + valuesSourceRegistry, recoveryStateFactory, indexFoldersDeletionListener, snapshotCommitSupplier); success = true; return indexService; } finally { @@ -498,6 +501,18 @@ private static IndexStorePlugin.RecoveryStateFactory getRecoveryStateFactory( return factory; } + public static final IndexStorePlugin.SnapshotCommitSupplier DEFAULT_SNAPSHOT_COMMIT_SUPPLIER + = e -> e.acquireLastIndexCommit(true); // by default we flush first so that the snapshot is as up-to-date as possible. + + private static IndexStorePlugin.SnapshotCommitSupplier getSnapshotCommitSupplier( + final IndexSettings indexSettings, + final Map snapshotCommitSuppliers) { + final String storeType = indexSettings.getValue(INDEX_STORE_TYPE_SETTING); + // we check that storeType refers to a valid store type in getDirectoryFactory() so there's no need for strictness here too. + final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier = snapshotCommitSuppliers.get(storeType); + return snapshotCommitSupplier == null ? DEFAULT_SNAPSHOT_COMMIT_SUPPLIER : snapshotCommitSupplier; + } + /** * creates a new mapper service to do administrative work like mapping updates. This *should not* be used for document parsing. * doing so will result in an exception. diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 785c760b0e5c2..6438bfdd8abd8 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -118,6 +118,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final IndexStorePlugin.IndexFoldersDeletionListener indexFoldersDeletionListener; private final IndexStorePlugin.DirectoryFactory directoryFactory; private final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory; + private final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier; private final CheckedFunction readerWrapper; private final IndexCache indexCache; private final MapperService mapperService; @@ -180,8 +181,8 @@ public IndexService( IndexNameExpressionResolver expressionResolver, ValuesSourceRegistry valuesSourceRegistry, IndexStorePlugin.RecoveryStateFactory recoveryStateFactory, - IndexStorePlugin.IndexFoldersDeletionListener indexFoldersDeletionListener - ) { + IndexStorePlugin.IndexFoldersDeletionListener indexFoldersDeletionListener, + IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier) { super(indexSettings); this.allowExpensiveQueries = allowExpensiveQueries; this.indexSettings = indexSettings; @@ -191,6 +192,7 @@ public IndexService( this.circuitBreakerService = circuitBreakerService; this.expressionResolver = expressionResolver; this.valuesSourceRegistry = valuesSourceRegistry; + this.snapshotCommitSupplier = snapshotCommitSupplier; if (needsMapperService(indexSettings, indexCreationContext)) { assert indexAnalyzers != null; this.mapperService = new MapperService(indexSettings, indexAnalyzers, xContentRegistry, similarityService, mapperRegistry, @@ -484,7 +486,8 @@ public synchronized IndexShard createShard( indexingOperationListeners, () -> globalCheckpointSyncer.accept(shardId), retentionLeaseSyncer, - circuitBreakerService); + circuitBreakerService, + snapshotCommitSupplier); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); shards = Maps.copyMapWithAddedEntry(shards, shardId.id(), indexShard); diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 6bbeafde6c688..f12f4245eac9c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1079,9 +1079,8 @@ public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyE /** * Acquires the index commit that should be included in a snapshot. */ - public IndexCommitRef acquireIndexCommitForSnapshot() throws EngineException { - // by default we flush first so that the snapshot is as up-to-date as possible. - return acquireLastIndexCommit(true); + public final IndexCommitRef acquireIndexCommitForSnapshot() throws EngineException { + return engineConfig.getSnapshotCommitSupplier().acquireIndexCommitForSnapshot(this); } /** diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index b601ca5985885..c5be47eea25d6 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -41,6 +41,7 @@ import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.indices.IndexingMemoryController; import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.plugins.IndexStorePlugin; import org.elasticsearch.threadpool.ThreadPool; import java.util.List; @@ -60,6 +61,7 @@ public final class EngineConfig { private volatile boolean enableGcDeletes = true; private final TimeValue flushMergesAfter; private final String codecName; + private final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier; private final ThreadPool threadPool; private final Engine.Warmer warmer; private final Store store; @@ -120,18 +122,30 @@ public Supplier retentionLeasesSupplier() { /** * Creates a new {@link org.elasticsearch.index.engine.EngineConfig} */ - public EngineConfig(ShardId shardId, ThreadPool threadPool, - IndexSettings indexSettings, Engine.Warmer warmer, Store store, - MergePolicy mergePolicy, Analyzer analyzer, - Similarity similarity, CodecService codecService, Engine.EventListener eventListener, - QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, - TranslogConfig translogConfig, TimeValue flushMergesAfter, - List externalRefreshListener, - List internalRefreshListener, Sort indexSort, - CircuitBreakerService circuitBreakerService, LongSupplier globalCheckpointSupplier, - Supplier retentionLeasesSupplier, - LongSupplier primaryTermSupplier, - TombstoneDocSupplier tombstoneDocSupplier) { + public EngineConfig( + ShardId shardId, + ThreadPool threadPool, + IndexSettings indexSettings, + Engine.Warmer warmer, + Store store, + MergePolicy mergePolicy, + Analyzer analyzer, + Similarity similarity, + CodecService codecService, + Engine.EventListener eventListener, + QueryCache queryCache, + QueryCachingPolicy queryCachingPolicy, + TranslogConfig translogConfig, + TimeValue flushMergesAfter, + List externalRefreshListener, + List internalRefreshListener, + Sort indexSort, + CircuitBreakerService circuitBreakerService, + LongSupplier globalCheckpointSupplier, + Supplier retentionLeasesSupplier, + LongSupplier primaryTermSupplier, + TombstoneDocSupplier tombstoneDocSupplier, + IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier) { this.shardId = shardId; this.indexSettings = indexSettings; this.threadPool = threadPool; @@ -169,6 +183,7 @@ public EngineConfig(ShardId shardId, ThreadPool threadPool, this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier); this.primaryTermSupplier = primaryTermSupplier; this.tombstoneDocSupplier = tombstoneDocSupplier; + this.snapshotCommitSupplier = snapshotCommitSupplier; } /** @@ -370,4 +385,8 @@ public interface TombstoneDocSupplier { public TombstoneDocSupplier getTombstoneDocSupplier() { return tombstoneDocSupplier; } + + public IndexStorePlugin.SnapshotCommitSupplier getSnapshotCommitSupplier() { + return snapshotCommitSupplier; + } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index d5ce10f198d41..9211af62ac2a7 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -151,6 +151,7 @@ import org.elasticsearch.indices.recovery.RecoveryFailedException; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryTarget; +import org.elasticsearch.plugins.IndexStorePlugin; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.rest.RestStatus; @@ -225,6 +226,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private final GlobalCheckpointListeners globalCheckpointListeners; private final PendingReplicationActions pendingReplicationActions; private final ReplicationTracker replicationTracker; + private final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier; protected volatile ShardRouting shardRouting; protected volatile IndexShardState state; @@ -305,7 +307,8 @@ public IndexShard( final List listeners, final Runnable globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, - final CircuitBreakerService circuitBreakerService) throws IOException { + final CircuitBreakerService circuitBreakerService, + final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); this.shardRouting = shardRouting; @@ -315,6 +318,7 @@ public IndexShard( this.similarityService = similarityService; Objects.requireNonNull(store, "Store must be provided to the index shard"); this.engineFactory = Objects.requireNonNull(engineFactory); + this.snapshotCommitSupplier = Objects.requireNonNull(snapshotCommitSupplier); this.store = store; this.indexSortSupplier = indexSortSupplier; this.indexEventListener = indexEventListener; @@ -2849,16 +2853,30 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { this.warmer.warm(reader); } }; - return new EngineConfig(shardId, - threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(), + return new EngineConfig( + shardId, + threadPool, + indexSettings, + warmer, + store, + indexSettings.getMergePolicy(), buildIndexAnalyzer(mapperService), - similarityService.similarity(mapperService == null ? null : mapperService::fieldType), codecService, shardEventListener, - indexCache != null ? indexCache.query() : null, cachingPolicy, translogConfig, + similarityService.similarity(mapperService == null ? null : mapperService::fieldType), + codecService, + shardEventListener, + indexCache != null ? indexCache.query() : null, + cachingPolicy, + translogConfig, IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), List.of(refreshListeners, refreshPendingLocationListener), Collections.singletonList(new RefreshMetricUpdater(refreshMetric)), - indexSort, circuitBreakerService, globalCheckpointSupplier, replicationTracker::getRetentionLeases, - () -> getOperationPrimaryTerm(), tombstoneDocSupplier()); + indexSort, + circuitBreakerService, + globalCheckpointSupplier, + replicationTracker::getRetentionLeases, + this::getOperationPrimaryTerm, + tombstoneDocSupplier(), + snapshotCommitSupplier); } /** diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 9b0ff7828057b..a669a91d0ba16 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -215,6 +215,7 @@ public class IndicesService extends AbstractLifecycleComponent private final OldShardsStats oldShardsStats = new OldShardsStats(); private final MapperRegistry mapperRegistry; private final NamedWriteableRegistry namedWriteableRegistry; + private final Map snapshotCommitSuppliers; private final IndexingMemoryController indexingMemoryController; private final TimeValue cleanInterval; final IndicesRequestCache indicesRequestCache; // pkg-private for testing @@ -253,7 +254,8 @@ public IndicesService(Settings settings, PluginsService pluginsService, NodeEnvi Collection>> engineFactoryProviders, Map directoryFactories, ValuesSourceRegistry valuesSourceRegistry, Map recoveryStateFactories, - List indexFoldersDeletionListeners) { + List indexFoldersDeletionListeners, + Map snapshotCommitSuppliers) { this.settings = settings; this.threadPool = threadPool; this.pluginsService = pluginsService; @@ -301,6 +303,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon this.directoryFactories = directoryFactories; this.recoveryStateFactories = recoveryStateFactories; this.indexFoldersDeletionListeners = new CompositeIndexFoldersDeletionListener(indexFoldersDeletionListeners); + this.snapshotCommitSuppliers = snapshotCommitSuppliers; // doClose() is called when shutting down a node, yet there might still be ongoing requests // that we need to wait for before closing some resources such as the caches. In order to // avoid closing these resources while ongoing requests are still being processed, we use a @@ -679,7 +682,8 @@ private synchronized IndexService createIndexService(IndexService.IndexCreationC namedWriteableRegistry, this::isIdFieldDataEnabled, valuesSourceRegistry, - indexFoldersDeletionListeners + indexFoldersDeletionListeners, + snapshotCommitSuppliers ); } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index a5033673e5e87..37d94ea8a24c5 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -498,6 +498,13 @@ protected Node(final Environment initialEnvironment, .flatMap(List::stream) .collect(Collectors.toList()); + final Map snapshotCommitSuppliers = + pluginsService.filterPlugins(IndexStorePlugin.class) + .stream() + .map(IndexStorePlugin::getSnapshotCommitSuppliers) + .flatMap(m -> m.entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + final Map> systemIndexDescriptorMap = pluginsService .filterPlugins(SystemIndexPlugin.class) .stream() @@ -519,7 +526,8 @@ protected Node(final Environment initialEnvironment, clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry, threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptService, clusterService, client, metaStateService, engineFactoryProviders, indexStoreFactories, - searchModule.getValuesSourceRegistry(), recoveryStateFactories, indexFoldersDeletionListeners); + searchModule.getValuesSourceRegistry(), recoveryStateFactories, indexFoldersDeletionListeners, + snapshotCommitSuppliers); final AliasValidator aliasValidator = new AliasValidator(); diff --git a/server/src/main/java/org/elasticsearch/plugins/IndexStorePlugin.java b/server/src/main/java/org/elasticsearch/plugins/IndexStorePlugin.java index 37341accede0a..a801812603f4e 100644 --- a/server/src/main/java/org/elasticsearch/plugins/IndexStorePlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/IndexStorePlugin.java @@ -25,6 +25,8 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.indices.recovery.RecoveryState; @@ -121,4 +123,26 @@ interface IndexFoldersDeletionListener { default List getIndexFoldersDeletionListeners() { return Collections.emptyList(); } + + /** + * An interface that allows plugins to override the {@link org.apache.lucene.index.IndexCommit} of which a snapshot is taken. By default + * we snapshot the latest such commit. + */ + @FunctionalInterface + interface SnapshotCommitSupplier { + Engine.IndexCommitRef acquireIndexCommitForSnapshot(Engine engine) throws EngineException; + } + + /** + * The {@link SnapshotCommitSupplier} mappings for this plugin. When an index is created the store type setting + * {@link org.elasticsearch.index.IndexModule#INDEX_STORE_TYPE_SETTING} on the index will determine whether the snapshot commit supplier + * should be overridden and, if so, which override to use. + * + * @return a collection of snapshot commit suppliers, keyed by the value of + * {@link org.elasticsearch.index.IndexModule#INDEX_STORE_TYPE_SETTING}. + */ + default Map getSnapshotCommitSuppliers() { + return Collections.emptyMap(); + } + } diff --git a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java index d18dd73cdd47a..095e37005e2d1 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java @@ -180,7 +180,8 @@ public void tearDown() throws Exception { private IndexService newIndexService(IndexModule module) throws IOException { return module.newIndexService(CREATE_INDEX, nodeEnvironment, xContentRegistry(), deleter, circuitBreakerService, bigArrays, threadPool, scriptService, clusterService, null, indicesQueryCache, mapperRegistry, - new IndicesFieldDataCache(settings, listener), writableRegistry(), () -> false, null, indexDeletionListener); + new IndicesFieldDataCache(settings, listener), writableRegistry(), () -> false, null, indexDeletionListener, + emptyMap()); } public void testWrapperIsBound() throws IOException { diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 19d5fa1e373c7..b4ab0ae3164eb 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -105,6 +105,7 @@ import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.codec.CodecService; @@ -2906,7 +2907,8 @@ public void testRecoverFromForeignTranslog() throws IOException { () -> UNASSIGNED_SEQ_NO, () -> RetentionLeases.EMPTY, primaryTerm::get, - tombstoneDocSupplier()); + tombstoneDocSupplier(), + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig)); engine = createEngine(store, primaryTranslogDir); // and recover again! @@ -5972,7 +5974,7 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception { config.getQueryCachingPolicy(), translogConfig, config.getFlushMergesAfter(), config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier()); + config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier(), config.getSnapshotCommitSupplier()); try (InternalEngine engine = createEngine(configWithWarmer)) { assertThat(warmedUpReaders, empty()); assertThat(expectThrows(Throwable.class, () -> engine.acquireSearcher("test")).getMessage(), diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 091a7ca1f16a8..165af70e50da7 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -76,6 +76,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.engine.CommitStats; @@ -4071,7 +4072,7 @@ public void testCloseShardWhileEngineIsWarming() throws Exception { config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier()); + config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier(), IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); return new InternalEngine(configWithWarmer); }); Thread recoveryThread = new Thread(() -> expectThrows(AlreadyClosedException.class, () -> recoverShardFromStore(shard))); diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index a8d228baf0a22..d4aa479622579 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -40,6 +40,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.engine.Engine; @@ -149,7 +150,8 @@ public void onFailedEngine(String reason, @Nullable Exception e) { () -> SequenceNumbers.NO_OPS_PERFORMED, () -> RetentionLeases.EMPTY, () -> primaryTerm, - EngineTestCase.tombstoneDocSupplier()); + EngineTestCase.tombstoneDocSupplier(), + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); engine = new InternalEngine(config); engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE); listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation); diff --git a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java index c9416bf7ad62b..4abe0aa670817 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndexingMemoryControllerTests.java @@ -382,7 +382,7 @@ EngineConfig configWithRefreshListener(EngineConfig config, ReferenceManager.Ref config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), internalRefreshListener, config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier()); + config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier(), config.getSnapshotCommitSupplier()); } ThreadPoolStats.Stats getRefreshThreadPoolStats() { diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index e6de55ca6d763..fd9b804183c3b 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -1497,8 +1497,8 @@ protected NamedWriteableRegistry writeableRegistry() { emptyMap(), null, emptyMap(), - List.of() - ); + List.of(), + emptyMap()); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); snapshotShardsService = new SnapshotShardsService(settings, clusterService, repositoriesService, transportService, indicesService); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 653acc7439792..f1ba2866b0334 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -81,6 +81,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.MapperTestUtils; import org.elasticsearch.index.VersionType; @@ -247,7 +248,7 @@ public EngineConfig copy(EngineConfig config, LongSupplier globalCheckpointSuppl config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getCircuitBreakerService(), globalCheckpointSupplier, config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), tombstoneDocSupplier()); + config.getPrimaryTermSupplier(), tombstoneDocSupplier(), config.getSnapshotCommitSupplier()); } public EngineConfig copy(EngineConfig config, Analyzer analyzer) { @@ -257,7 +258,7 @@ public EngineConfig copy(EngineConfig config, Analyzer analyzer) { config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier()); + config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier(), config.getSnapshotCommitSupplier()); } public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { @@ -267,7 +268,7 @@ public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier()); + config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier(), config.getSnapshotCommitSupplier()); } @Override @@ -720,7 +721,8 @@ public EngineConfig config( globalCheckpointSupplier, retentionLeasesSupplier, primaryTerm, - tombstoneDocSupplier()); + tombstoneDocSupplier(), + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); } protected EngineConfig config(EngineConfig config, Store store, Path translogPath, @@ -735,7 +737,7 @@ protected EngineConfig config(EngineConfig config, Store store, Path translogPat translogConfig, config.getFlushMergesAfter(), config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), - config.getPrimaryTermSupplier(), tombstoneDocSupplier); + config.getPrimaryTermSupplier(), tombstoneDocSupplier, config.getSnapshotCommitSupplier()); } protected EngineConfig noOpConfig(IndexSettings indexSettings, Store store, Path translogPath) { diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index d23c58d93e12a..9148fc1bcc5a3 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -47,6 +47,7 @@ import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.MapperTestUtils; import org.elasticsearch.index.VersionType; @@ -401,7 +402,8 @@ protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMe Arrays.asList(listeners), globalCheckpointSyncer, retentionLeaseSyncer, - breakerService); + breakerService, + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); success = true; } finally { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 7b7b89e240063..6353e4dd3ef9f 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.codec.CodecService; @@ -270,7 +271,8 @@ public void onFailedEngine(String reason, Exception e) { globalCheckpoint::longValue, () -> RetentionLeases.EMPTY, () -> primaryTerm.get(), - EngineTestCase.tombstoneDocSupplier()); + EngineTestCase.tombstoneDocSupplier(), + IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER); } private static Store createStore( diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsConstants.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsConstants.java index 919ef5a3ba721..ef700379ba77c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsConstants.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsConstants.java @@ -27,4 +27,3 @@ public static boolean isSearchableSnapshotStore(Settings indexSettings) { public static final String SNAPSHOT_BLOB_CACHE_INDEX = ".snapshot-blob-cache"; } - diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java index b49f6d1761afa..9f10edf968cff 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java @@ -535,6 +535,13 @@ public List getIndexFoldersDeletionListeners() { return Collections.unmodifiableList(listeners); } + @Override + public Map getSnapshotCommitSuppliers() { + final Map suppliers = new HashMap<>(); + filterPlugins(IndexStorePlugin.class).forEach(p -> suppliers.putAll(p.getSnapshotCommitSuppliers())); + return suppliers; + } + private List filterPlugins(Class type) { return plugins.stream().filter(x -> type.isAssignableFrom(x.getClass())).map(p -> ((T)p)) .collect(Collectors.toList()); diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java index 2e293d75448fe..1b868daa1dc6e 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java @@ -834,8 +834,7 @@ public void testSnapshotOfSearchableSnapshotIncludesNoDataButCanBeRestored() thr mountSnapshot(repositoryName, snapshotOne.getName(), indexName, restoredIndexName, Settings.EMPTY); ensureGreen(restoredIndexName); - if (randomBoolean() && false) { - // NB skipped as it doesn't work today + if (randomBoolean()) { logger.info("--> closing index before snapshot"); assertAcked(client().admin().indices().prepareClose(restoredIndexName)); } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index 0c2b5a6ced7a7..4da8e11a84d06 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.searchablesnapshots; -import org.apache.lucene.index.IndexCommit; import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; @@ -31,10 +30,11 @@ import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.engine.EngineException; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.ReadOnlyEngine; import org.elasticsearch.index.store.SearchableSnapshotDirectory; +import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.TranslogStats; import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.indices.recovery.SearchableSnapshotRecoveryState; @@ -282,22 +282,21 @@ public Optional getEngineFactory(IndexSettings indexSettings) { if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexSettings.getSettings()) && indexSettings.getSettings().getAsBoolean("index.frozen", false) == false) { return Optional.of( - engineConfig -> new ReadOnlyEngine(engineConfig, null, new TranslogStats(), false, Function.identity(), false) { - - // present an empty IndexCommit to the snapshot mechanism so that we copy no shard data to the repository - private final IndexCommit emptyIndexCommit = emptyIndexCommit(engineConfig.getStore().directory()); - - @Override - public IndexCommitRef acquireIndexCommitForSnapshot() throws EngineException { - store.incRef(); - return new IndexCommitRef(emptyIndexCommit, store::decRef); - } - } + engineConfig -> new ReadOnlyEngine(engineConfig, null, new TranslogStats(), false, Function.identity(), false) ); } return Optional.empty(); } + @Override + public Map getSnapshotCommitSuppliers() { + return Map.of(SNAPSHOT_DIRECTORY_FACTORY_KEY, e -> { + final Store store = e.config().getStore(); + store.incRef(); + return new Engine.IndexCommitRef(emptyIndexCommit(store.directory()), store::decRef); + }); + } + @Override public List> getActions() { return List.of( diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsRestTestCase.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsRestTestCase.java index 2d3728f16ad50..c35d45e49dd65 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsRestTestCase.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsRestTestCase.java @@ -8,6 +8,9 @@ import org.apache.http.client.methods.HttpDelete; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; @@ -29,6 +32,7 @@ import java.util.Map; import java.util.function.Function; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -205,15 +209,14 @@ public void testClearCache() throws Exception { public void testSnapshotOfSearchableSnapshot() throws Exception { runSearchableSnapshotsTest((restoredIndexName, numDocs) -> { - if (randomBoolean() && false) { - // NB skipped as it doesn't work today + final boolean frozen = randomBoolean(); + if (frozen) { logger.info("--> freezing index [{}]", restoredIndexName); final Request freezeRequest = new Request(HttpPost.METHOD_NAME, restoredIndexName + "/_freeze"); assertOK(client().performRequest(freezeRequest)); } - if (randomBoolean() && false) { - // NB skipped as it doesn't work today + if (randomBoolean()) { logger.info("--> closing index [{}]", restoredIndexName); final Request closeRequest = new Request(HttpPost.METHOD_NAME, restoredIndexName + "/_close"); assertOK(client().performRequest(closeRequest)); @@ -226,7 +229,16 @@ public void testSnapshotOfSearchableSnapshot() throws Exception { // Remove the snapshots, if a previous test failed to delete them. This is // useful for third party tests that runs the test against a real external service. deleteSnapshot(snapshot2Name, true); - createSnapshot(REPOSITORY_NAME, snapshot2Name, true); + + final Request snapshotRequest = new Request(HttpPut.METHOD_NAME, "_snapshot/" + REPOSITORY_NAME + '/' + snapshot2Name); + snapshotRequest.addParameter("wait_for_completion", "true"); + try (XContentBuilder builder = jsonBuilder()) { + builder.startObject(); + builder.field("indices", restoredIndexName); + builder.endObject(); + snapshotRequest.setEntity(new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON)); + } + assertOK(client().performRequest(snapshotRequest)); final List>> snapshotShardsStats = extractValue( responseAsMap( @@ -250,7 +262,7 @@ public void testSnapshotOfSearchableSnapshot() throws Exception { deleteSnapshot(snapshot2Name, false); - assertSearchResults(restoredIndexName, numDocs, randomFrom(Boolean.TRUE, Boolean.FALSE, null)); + assertSearchResults(restoredIndexName, numDocs, frozen ? Boolean.FALSE : randomFrom(Boolean.TRUE, Boolean.FALSE, null)); }); }