From 760a0595c6230961ebd8357a10c6cd88bb3eb9e5 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 1 Mar 2021 11:28:43 +0100 Subject: [PATCH 1/2] Adjust the length of blob cache docs for Lucene metadata files (#69431) Today searchable snapshots IndexInput implementations use the blob store cache to cache the first 4096 bytes of every Lucene files. After some experiments we think that we could adjust the length of the cached data depending of the Lucene file that is read, caching up to 64KB for Lucene metadata files (ie files that are fully read when a Directory is opened) and only 1KB for other files. The files that are cached up to 64KB are the following extensions: "cfe", // compound file's entry table "dvm", // doc values metadata file "fdm", // stored fields metadata file "fnm", // field names metadata file "kdm", // Lucene 8.6 point format metadata file "nvm", // norms metadata file "tmd", // Lucene 8.6 terms metadata file "tvm", // terms vectors metadata file "vem" // Lucene 9.0 indexed vectors metadata The 64KB limit can be configured on a per index basis through a new index setting. This change is extracted from #69283 and does not address the caching of CFS files. Backport of #69431 --- ...ableSnapshotsBlobStoreCacheIntegTests.java | 305 +++++++++-------- .../BaseSearchableSnapshotsIntegTestCase.java | 14 +- .../cache/BlobStoreCacheService.java | 132 +++++++- .../blobstore/cache/CachedBlob.java | 23 ++ .../BaseSearchableSnapshotIndexInput.java | 14 +- .../store/SearchableSnapshotDirectory.java | 31 +- .../cache/CachedBlobContainerIndexInput.java | 52 ++- .../index/store/cache/FrozenIndexInput.java | 58 ++-- .../direct/DirectBlobContainerIndexInput.java | 14 +- .../SearchableSnapshots.java | 35 +- .../searchablesnapshots/cache/ByteRange.java | 6 +- ...SearchableSnapshotDirectoryStatsTests.java | 25 +- .../SearchableSnapshotDirectoryTests.java | 9 +- .../CachedBlobContainerIndexInputTests.java | 8 +- .../store/cache/FrozenIndexInputTests.java | 5 +- .../index/store/cache/TestUtils.java | 14 +- .../DirectBlobContainerIndexInputTests.java | 18 +- .../AbstractSearchableSnapshotsTestCase.java | 44 +++ x-pack/qa/rolling-upgrade/build.gradle | 7 +- .../upgrades/AbstractUpgradeTestCase.java | 5 + .../SearchableSnapshotsRollingUpgradeIT.java | 312 ++++++++++++++++++ .../UpgradeClusterClientYamlTestSuiteIT.java | 10 + 22 files changed, 876 insertions(+), 265 deletions(-) create mode 100644 x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/SearchableSnapshotsRollingUpgradeIT.java diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java index a2102d3de228d..ea28874161aa8 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/blobstore/cache/SearchableSnapshotsBlobStoreCacheIntegTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -28,23 +29,31 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.IndexNotFoundException; -import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.shard.IndexingStats; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider; import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest.Storage; import org.elasticsearch.xpack.core.searchablesnapshots.SearchableSnapshotShardStats; import org.elasticsearch.xpack.searchablesnapshots.BaseSearchableSnapshotsIntegTestCase; import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots; import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants; import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsAction; import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsRequest; +import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; +import org.elasticsearch.xpack.searchablesnapshots.cache.FrozenCacheService; +import org.junit.AfterClass; +import org.junit.BeforeClass; import java.io.IOException; import java.nio.file.Files; @@ -69,6 +78,35 @@ public class SearchableSnapshotsBlobStoreCacheIntegTests extends BaseSearchableSnapshotsIntegTestCase { + private static Settings cacheSettings = null; + private static ByteSizeValue blobCacheMaxLength = null; + + @BeforeClass + public static void setUpCacheSettings() { + blobCacheMaxLength = new ByteSizeValue(randomLongBetween(64L, 128L), ByteSizeUnit.KB); + + final Settings.Builder builder = Settings.builder(); + // Cold (full copy) cache should be unlimited to not cause evictions + builder.put(CacheService.SNAPSHOT_CACHE_SIZE_SETTING.getKey(), new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES)); + // Align ranges to match the blob cache max length + builder.put(CacheService.SNAPSHOT_CACHE_RANGE_SIZE_SETTING.getKey(), blobCacheMaxLength); + builder.put(CacheService.SNAPSHOT_CACHE_RECOVERY_RANGE_SIZE_SETTING.getKey(), blobCacheMaxLength); + + // Frozen (shared cache) cache should be large enough to not cause direct reads + builder.put(SnapshotsService.SNAPSHOT_CACHE_SIZE_SETTING.getKey(), ByteSizeValue.ofMb(128)); + // Align ranges to match the blob cache max length + builder.put(SnapshotsService.SNAPSHOT_CACHE_REGION_SIZE_SETTING.getKey(), blobCacheMaxLength); + builder.put(SnapshotsService.SHARED_CACHE_RANGE_SIZE_SETTING.getKey(), blobCacheMaxLength); + builder.put(FrozenCacheService.FROZEN_CACHE_RECOVERY_RANGE_SIZE_SETTING.getKey(), blobCacheMaxLength); + cacheSettings = builder.build(); + } + + @AfterClass + public static void tearDownCacheSettings() { + blobCacheMaxLength = null; + cacheSettings = null; + } + @Override protected Collection> nodePlugins() { return CollectionUtils.appendToCopy(super.nodePlugins(), WaitForSnapshotBlobCacheShardsActivePlugin.class); @@ -81,30 +119,25 @@ protected int numberOfReplicas() { @Override protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder() - .put(super.nodeSettings(nodeOrdinal)) - .put( - CacheService.SNAPSHOT_CACHE_RANGE_SIZE_SETTING.getKey(), - randomLongBetween(new ByteSizeValue(4, ByteSizeUnit.KB).getBytes(), new ByteSizeValue(20, ByteSizeUnit.KB).getBytes()) + "b" - ) - .put(CacheService.SNAPSHOT_CACHE_SIZE_SETTING.getKey(), new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES)) - .build(); + return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(cacheSettings).build(); } public void testBlobStoreCache() throws Exception { final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); createIndex(indexName); - final List indexRequestBuilders = new ArrayList<>(); - for (int i = scaledRandomIntBetween(0, 10_000); i >= 0; i--) { - indexRequestBuilders.add( - client().prepareIndex(indexName, SINGLE_MAPPING_NAME).setSource("text", randomUnicodeOfLength(10), "num", i) - ); - } - indexRandom(true, false, true, indexRequestBuilders); - final long numberOfDocs = indexRequestBuilders.size(); final NumShards numberOfShards = getNumShards(indexName); + final int numberOfDocs = scaledRandomIntBetween(0, 20_000); + if (numberOfDocs > 0) { + final List indexRequestBuilders = new ArrayList<>(); + for (int i = numberOfDocs; i > 0; i--) { + XContentBuilder builder = XContentFactory.smileBuilder(); + builder.startObject().field("text", randomRealisticUnicodeOfCodepointLengthBetween(5, 50)).field("num", i).endObject(); + indexRequestBuilders.add(client().prepareIndex(indexName, SINGLE_MAPPING_NAME).setSource(builder)); + } + indexRandom(true, true, true, indexRequestBuilders); + } if (randomBoolean()) { logger.info("--> force-merging index before snapshotting"); final ForceMergeResponse forceMergeResponse = client().admin() @@ -133,15 +166,25 @@ public void testBlobStoreCache() throws Exception { () -> systemClient().admin().indices().prepareGetIndex().addIndices(SNAPSHOT_BLOB_CACHE_INDEX).get() ); - logger.info("--> mount snapshot [{}] as an index for the first time", snapshot); - final String restoredIndex = mountSnapshot( + Storage storage = randomFrom(Storage.values()); + logger.info( + "--> mount snapshot [{}] as an index for the first time [storage={}, max length={}]", + snapshot, + storage, + blobCacheMaxLength.getStringRep() + ); + final String restoredIndex = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + mountSnapshot( repositoryName, snapshot.getName(), indexName, + restoredIndex, Settings.builder() .put(SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true) .put(SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), false) - .build() + .put(SearchableSnapshots.SNAPSHOT_BLOB_CACHE_METADATA_FILES_MAX_LENGTH, blobCacheMaxLength) + .build(), + storage ); ensureGreen(restoredIndex); @@ -167,60 +210,62 @@ public void testBlobStoreCache() throws Exception { } logger.info("--> verifying cached documents in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX); - assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot); - - logger.info("--> verifying system index [{}] data tiers preference", SNAPSHOT_BLOB_CACHE_INDEX); - assertThat( - systemClient().admin() - .indices() - .prepareGetSettings(SNAPSHOT_BLOB_CACHE_INDEX) - .get() - .getSetting(SNAPSHOT_BLOB_CACHE_INDEX, DataTierAllocationDecider.INDEX_ROUTING_PREFER), - equalTo(DATA_TIERS_CACHE_INDEX_PREFERENCE) - ); + if (numberOfDocs > 0) { + assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot); + + logger.info("--> verifying system index [{}] data tiers preference", SNAPSHOT_BLOB_CACHE_INDEX); + assertThat( + systemClient().admin() + .indices() + .prepareGetSettings(SNAPSHOT_BLOB_CACHE_INDEX) + .get() + .getSetting(SNAPSHOT_BLOB_CACHE_INDEX, DataTierAllocationDecider.INDEX_ROUTING_PREFER), + equalTo(DATA_TIERS_CACHE_INDEX_PREFERENCE) + ); + } - final long numberOfCachedBlobs = systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).get().getHits().getTotalHits().value; - final long numberOfCacheWrites = systemClient().admin() + final long numberOfCachedBlobs = systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX) + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) + .get() + .getHits() + .getTotalHits().value; + IndexingStats indexingStats = systemClient().admin() .indices() .prepareStats(SNAPSHOT_BLOB_CACHE_INDEX) + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) .clear() .setIndexing(true) .get() - .getTotal().indexing.getTotal().getIndexCount(); + .getTotal() + .getIndexing(); + final long numberOfCacheWrites = indexingStats != null ? indexingStats.getTotal().getIndexCount() : 0L; - logger.info("--> verifying documents in index [{}]", restoredIndex); + logger.info("--> verifying number of documents in index [{}]", restoredIndex); assertHitCount(client().prepareSearch(restoredIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); - assertHitCount( - client().prepareSearch(restoredIndex) - .setQuery(QueryBuilders.rangeQuery("num").lte(numberOfDocs)) - .setSize(0) - .setTrackTotalHits(true) - .get(), - numberOfDocs - ); - assertHitCount( - client().prepareSearch(restoredIndex) - .setQuery(QueryBuilders.rangeQuery("num").gt(numberOfDocs + 1)) - .setSize(0) - .setTrackTotalHits(true) - .get(), - 0L - ); - assertAcked(client().admin().indices().prepareDelete(restoredIndex)); - logger.info("--> mount snapshot [{}] as an index for the second time", snapshot); - final String restoredAgainIndex = mountSnapshot( + storage = randomFrom(Storage.values()); + logger.info("--> mount snapshot [{}] as an index for the second time [storage={}]", snapshot, storage); + final String restoredAgainIndex = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + mountSnapshot( repositoryName, snapshot.getName(), indexName, + restoredAgainIndex, Settings.builder() .put(SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING.getKey(), true) .put(SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.getKey(), false) - .build() + .put(SearchableSnapshots.SNAPSHOT_BLOB_CACHE_METADATA_FILES_MAX_LENGTH, blobCacheMaxLength) + .build(), + storage ); ensureGreen(restoredAgainIndex); + logger.info("--> verifying cached documents (after second mount) in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX); + if (numberOfDocs > 0) { + assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot); + } + logger.info("--> verifying shards of [{}] were started without using the blob store more than necessary", restoredAgainIndex); for (final SearchableSnapshotShardStats shardStats : client().execute( SearchableSnapshotsStatsAction.INSTANCE, @@ -229,44 +274,33 @@ public void testBlobStoreCache() throws Exception { for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) { // we read the header of each file contained within the .cfs file, which could be anywhere final boolean mayReadMoreThanHeader = indexInputStats.getFileExt().equals("cfs"); - if (indexInputStats.getTotalSize() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE * 2 - || mayReadMoreThanHeader == false) { + if (mayReadMoreThanHeader == false) { assertThat(Strings.toString(indexInputStats), indexInputStats.getBlobStoreBytesRequested().getCount(), equalTo(0L)); } } } - logger.info("--> verifying documents in index [{}]", restoredAgainIndex); + logger.info("--> verifying number of documents in index [{}]", restoredAgainIndex); assertHitCount(client().prepareSearch(restoredAgainIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); - assertHitCount( - client().prepareSearch(restoredAgainIndex) - .setQuery(QueryBuilders.rangeQuery("num").lte(numberOfDocs)) - .setSize(0) - .setTrackTotalHits(true) - .get(), - numberOfDocs - ); - assertHitCount( - client().prepareSearch(restoredAgainIndex) - .setQuery(QueryBuilders.rangeQuery("num").gt(numberOfDocs + 1)) - .setSize(0) - .setTrackTotalHits(true) - .get(), - 0L - ); - - logger.info("--> verifying cached documents (again) in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX); - assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot); logger.info("--> verifying that no extra cached blobs were indexed [{}]", SNAPSHOT_BLOB_CACHE_INDEX); - refreshSystemIndex(); - assertHitCount(systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get(), numberOfCachedBlobs); - assertThat( - systemClient().admin().indices().prepareStats(SNAPSHOT_BLOB_CACHE_INDEX).clear().setIndexing(true).get().getTotal().indexing - .getTotal() - .getIndexCount(), - equalTo(numberOfCacheWrites) + if (numberOfDocs > 0) { + refreshSystemIndex(); + } + assertHitCount( + systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN).setSize(0).get(), + numberOfCachedBlobs ); + indexingStats = systemClient().admin() + .indices() + .prepareStats(SNAPSHOT_BLOB_CACHE_INDEX) + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) + .clear() + .setIndexing(true) + .get() + .getTotal() + .getIndexing(); + assertThat(indexingStats != null ? indexingStats.getTotal().getIndexCount() : 0L, equalTo(numberOfCacheWrites)); logger.info("--> restarting cluster"); internalCluster().fullRestart(new InternalTestCluster.RestartCallback() { @@ -280,36 +314,43 @@ public Settings onNodeStopped(String nodeName) throws Exception { }); ensureGreen(restoredAgainIndex); - logger.info("--> verifying documents in index [{}]", restoredAgainIndex); - assertHitCount(client().prepareSearch(restoredAgainIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); - assertHitCount( - client().prepareSearch(restoredAgainIndex) - .setQuery(QueryBuilders.rangeQuery("num").lte(numberOfDocs)) - .setSize(0) - .setTrackTotalHits(true) - .get(), - numberOfDocs - ); - assertHitCount( - client().prepareSearch(restoredAgainIndex) - .setQuery(QueryBuilders.rangeQuery("num").gt(numberOfDocs + 1)) - .setSize(0) - .setTrackTotalHits(true) - .get(), - 0L - ); - logger.info("--> verifying cached documents (after restart) in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX); - assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot); + if (numberOfDocs > 0) { + assertCachedBlobsInSystemIndex(repositoryName, blobsInSnapshot); + } + + logger.info("--> shards of [{}] should start without downloading bytes from the blob store", restoredAgainIndex); + for (final SearchableSnapshotShardStats shardStats : client().execute( + SearchableSnapshotsStatsAction.INSTANCE, + new SearchableSnapshotsStatsRequest() + ).actionGet().getStats()) { + for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) { + // we read the header of each file contained within the .cfs file, which could be anywhere + final boolean mayReadMoreThanHeader = indexInputStats.getFileExt().equals("cfs"); + if (mayReadMoreThanHeader == false) { + assertThat(Strings.toString(indexInputStats), indexInputStats.getBlobStoreBytesRequested().getCount(), equalTo(0L)); + } + } + } logger.info("--> verifying that no cached blobs were indexed in system index [{}] after restart", SNAPSHOT_BLOB_CACHE_INDEX); - assertHitCount(systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get(), numberOfCachedBlobs); - assertThat( - systemClient().admin().indices().prepareStats(SNAPSHOT_BLOB_CACHE_INDEX).clear().setIndexing(true).get().getTotal().indexing - .getTotal() - .getIndexCount(), - equalTo(0L) + assertHitCount( + systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN).setSize(0).get(), + numberOfCachedBlobs ); + indexingStats = systemClient().admin() + .indices() + .prepareStats(SNAPSHOT_BLOB_CACHE_INDEX) + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) + .clear() + .setIndexing(true) + .get() + .getTotal() + .getIndexing(); + assertThat(indexingStats != null ? indexingStats.getTotal().getIndexCount() : 0L, equalTo(0L)); + + logger.info("--> verifying number of documents in index [{}]", restoredAgainIndex); + assertHitCount(client().prepareSearch(restoredAgainIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); // TODO also test when the index is frozen // TODO also test when prewarming is enabled @@ -356,6 +397,7 @@ private Map blobsInSnapshot(Path repository private void assertCachedBlobsInSystemIndex(final String repositoryName, final Map blobsInSnapshot) throws Exception { + final BlobStoreCacheService blobCacheService = internalCluster().getDataNodeInstance(BlobStoreCacheService.class); assertBusy(() -> { refreshSystemIndex(); @@ -366,37 +408,18 @@ private void assertCachedBlobsInSystemIndex(final String repositoryName, final M continue; } - final String path = String.join("/", repositoryName, blob.getKey(), fileInfo.physicalName()); - if (fileInfo.length() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE * 2) { - // file has been fully cached - final GetResponse getResponse = systemClient().prepareGet( - SNAPSHOT_BLOB_CACHE_INDEX, - SINGLE_MAPPING_NAME, - path + "/@0" - ).get(); - assertThat("not cached: [" + path + "/@0] for blob [" + fileInfo + "]", getResponse.isExists(), is(true)); - final CachedBlob cachedBlob = CachedBlob.fromSource(getResponse.getSourceAsMap()); - assertThat(cachedBlob.from(), equalTo(0L)); - assertThat(cachedBlob.to(), equalTo(fileInfo.length())); - assertThat((long) cachedBlob.length(), equalTo(fileInfo.length())); - numberOfCachedBlobs += 1; - - } else { - // first region of file has been cached - GetResponse getResponse = systemClient().prepareGet(SNAPSHOT_BLOB_CACHE_INDEX, SINGLE_MAPPING_NAME, path + "/@0") - .get(); - assertThat( - "not cached: [" + path + "/@0] for first region of blob [" + fileInfo + "]", - getResponse.isExists(), - is(true) - ); - - CachedBlob cachedBlob = CachedBlob.fromSource(getResponse.getSourceAsMap()); - assertThat(cachedBlob.from(), equalTo(0L)); - assertThat(cachedBlob.to(), equalTo((long) BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE)); - assertThat(cachedBlob.length(), equalTo(BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE)); - numberOfCachedBlobs += 1; - } + final String fileName = fileInfo.physicalName(); + final long length = fileInfo.length(); + final ByteRange expectedByteRange = blobCacheService.computeBlobCacheByteRange(fileName, length, blobCacheMaxLength); + final String path = String.join("/", repositoryName, blob.getKey(), fileName, "@" + expectedByteRange.start()); + + final GetResponse getResponse = systemClient().prepareGet(SNAPSHOT_BLOB_CACHE_INDEX, SINGLE_MAPPING_NAME, path).get(); + assertThat("Expected cached blob [" + path + "] for blob [" + fileInfo + "]", getResponse.isExists(), is(true)); + final CachedBlob cachedBlob = CachedBlob.fromSource(getResponse.getSourceAsMap()); + assertThat(cachedBlob.from(), equalTo(expectedByteRange.start())); + assertThat(cachedBlob.to(), equalTo(expectedByteRange.end())); + assertThat((long) cachedBlob.length(), equalTo(expectedByteRange.length())); + numberOfCachedBlobs += 1; } } diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java index d874b81ee2aeb..3c9dcdddc85b3 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java @@ -27,6 +27,7 @@ import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction; import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest; +import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest.Storage; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; import org.elasticsearch.xpack.searchablesnapshots.cache.FrozenCacheService; @@ -130,6 +131,17 @@ protected void mountSnapshot( String indexName, String restoredIndexName, Settings restoredIndexSettings + ) throws Exception { + mountSnapshot(repositoryName, snapshotName, indexName, restoredIndexName, restoredIndexSettings, Storage.FULL_COPY); + } + + protected void mountSnapshot( + String repositoryName, + String snapshotName, + String indexName, + String restoredIndexName, + Settings restoredIndexSettings, + final Storage storage ) throws Exception { final MountSearchableSnapshotRequest mountRequest = new MountSearchableSnapshotRequest( restoredIndexName, @@ -142,7 +154,7 @@ protected void mountSnapshot( .build(), Strings.EMPTY_ARRAY, true, - MountSearchableSnapshotRequest.Storage.FULL_COPY + storage ); final RestoreSnapshotResponse restoreResponse = client().execute(MountSearchableSnapshotAction.INSTANCE, mountRequest).get(); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/BlobStoreCacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/BlobStoreCacheService.java index 91b8dabe3aa89..d68e0f707cc48 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/BlobStoreCacheService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/BlobStoreCacheService.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.index.IndexFileNames; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; @@ -23,15 +24,24 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.cache.Cache; +import org.elasticsearch.common.cache.CacheBuilder; import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange; import java.time.Instant; +import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -41,14 +51,24 @@ public class BlobStoreCacheService { private static final Logger logger = LogManager.getLogger(BlobStoreCacheService.class); - public static final int DEFAULT_CACHED_BLOB_SIZE = ByteSizeUnit.KB.toIntBytes(4); + /** + * Before 8.0.0 blobs were cached using a 4KB or 8KB maximum length. + */ + private static final Version OLD_CACHED_BLOB_SIZE_VERSION = Version.V_7_13_0; // TODO adjust after backport + public static final int DEFAULT_CACHED_BLOB_SIZE = ByteSizeUnit.KB.toIntBytes(1); + private static final Cache LOG_EXCEEDING_FILES_CACHE = CacheBuilder.builder() + .setExpireAfterAccess(TimeValue.timeValueMinutes(60L)) + .build(); + + private final ClusterService clusterService; private final ThreadPool threadPool; private final Client client; private final String index; - public BlobStoreCacheService(ThreadPool threadPool, Client client, String index) { + public BlobStoreCacheService(ClusterService clusterService, ThreadPool threadPool, Client client, String index) { this.client = new OriginSettingClient(client, SEARCHABLE_SNAPSHOTS_ORIGIN); + this.clusterService = clusterService; this.threadPool = threadPool; this.index = index; } @@ -155,4 +175,112 @@ public void onFailure(Exception e) { listener.onFailure(e); } } + + private static final Set METADATA_FILES_EXTENSIONS; + private static final Set OTHER_FILES_EXTENSIONS; + static { + // List of Lucene file extensions that are considered as "metadata" and should therefore be fully cached in the blob store cache. + // Those files are usually fully read by Lucene when it opens a Directory. + METADATA_FILES_EXTENSIONS = org.elasticsearch.common.collect.Set.of( + "cfe", // compound file's entry table + "dvm", // doc values metadata file + "fdm", // stored fields metadata file + "fnm", // field names metadata file + "kdm", // Lucene 8.6 point format metadata file + "nvm", // norms metadata file + "tmd", // Lucene 8.6 terms metadata file + "tvm", // terms vectors metadata file + "vem" // Lucene 9.0 indexed vectors metadata + ); + + // List of extensions for which Lucene usually only reads the first 1024 byte and checks a header checksum when opening a Directory. + OTHER_FILES_EXTENSIONS = org.elasticsearch.common.collect.Set.of( + "cfs", + "dii", + "dim", + "doc", + "dvd", + "fdt", + "fdx", + "kdd", + "kdi", + "liv", + "nvd", + "pay", + "pos", + "tim", + "tip", + "tvd", + "tvx", + "vec" + ); + assert Sets.intersection(METADATA_FILES_EXTENSIONS, OTHER_FILES_EXTENSIONS).isEmpty(); + } + + /** + * Computes the {@link ByteRange} corresponding to the header of a Lucene file. This range can vary depending of the type of the file + * which is indicated by the file's extension. The returned byte range can never be larger than the file's length but it can be smaller. + * + * For files that are declared as metadata files in {@link #METADATA_FILES_EXTENSIONS}, the header can be as large as the specified + * maximum metadata length parameter {@code maxMetadataLength}. Non-metadata files have a fixed length header of maximum 1KB. + * + * @param fileName the name of the file + * @param fileLength the length of the file + * @param maxMetadataLength the maximum accepted length for metadata files + * + * @return the header {@link ByteRange} + */ + public ByteRange computeBlobCacheByteRange(String fileName, long fileLength, ByteSizeValue maxMetadataLength) { + final String fileExtension = IndexFileNames.getExtension(fileName); + assert fileExtension == null || METADATA_FILES_EXTENSIONS.contains(fileExtension) || OTHER_FILES_EXTENSIONS.contains(fileExtension) + : "unknown Lucene file extension [" + fileExtension + "] - should it be considered a metadata file?"; + + if (useLegacyCachedBlobSizes()) { + if (fileLength <= ByteSizeUnit.KB.toBytes(8L)) { + return ByteRange.of(0L, fileLength); + } else { + return ByteRange.of(0L, ByteSizeUnit.KB.toBytes(4L)); + } + } + + if (METADATA_FILES_EXTENSIONS.contains(fileExtension)) { + final long maxAllowedLengthInBytes = maxMetadataLength.getBytes(); + if (fileLength > maxAllowedLengthInBytes) { + logExceedingFile(fileExtension, fileLength, maxMetadataLength); + } + return ByteRange.of(0L, Math.min(fileLength, maxAllowedLengthInBytes)); + } + return ByteRange.of(0L, Math.min(fileLength, DEFAULT_CACHED_BLOB_SIZE)); + } + + protected boolean useLegacyCachedBlobSizes() { + final Version minNodeVersion = clusterService.state().nodes().getMinNodeVersion(); + return minNodeVersion.before(OLD_CACHED_BLOB_SIZE_VERSION); + } + + private static void logExceedingFile(String extension, long length, ByteSizeValue maxAllowedLength) { + if (logger.isWarnEnabled()) { + try { + // Use of a cache to prevent too many log traces per hour + LOG_EXCEEDING_FILES_CACHE.computeIfAbsent(extension, key -> { + logger.warn( + "file with extension [{}] is larger ([{}]) than the max. length allowed [{}] to cache metadata files in blob cache", + extension, + length, + maxAllowedLength + ); + return key; + }); + } catch (ExecutionException e) { + logger.warn( + () -> new ParameterizedMessage( + "Failed to log information about exceeding file type [{}] with length [{}]", + extension, + length + ), + e + ); + } + } + } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/CachedBlob.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/CachedBlob.java index 699fddbac023b..323e60a9e91a6 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/CachedBlob.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/blobstore/cache/CachedBlob.java @@ -187,4 +187,27 @@ public static CachedBlob fromSource(final Map source) { to.longValue() ); } + + @Override + public String toString() { + return "CachedBlob [" + + "creationTime=" + + creationTime + + ", version=" + + version + + ", repository='" + + repository + + '\'' + + ", name='" + + name + + '\'' + + ", path='" + + path + + '\'' + + ", from=" + + from + + ", to=" + + to + + ']'; + } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java index d426dd1daf32b..2564aa2a5e4da 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java @@ -15,6 +15,7 @@ import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants; +import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange; import java.io.IOException; import java.io.InputStream; @@ -29,6 +30,7 @@ public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInput { protected final Logger logger; + protected final SearchableSnapshotDirectory directory; protected final BlobContainer blobContainer; protected final FileInfo fileInfo; protected final IOContext context; @@ -36,6 +38,9 @@ public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInpu protected final long offset; protected final long length; + /** Range of bytes that should be cached in the blob cache for the current index input **/ + protected final ByteRange blobCacheByteRange; + // the following are only mutable so they can be adjusted after cloning/slicing protected volatile boolean isClone; private AtomicBoolean closed; @@ -43,20 +48,23 @@ public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInpu public BaseSearchableSnapshotIndexInput( Logger logger, String resourceDesc, - BlobContainer blobContainer, + SearchableSnapshotDirectory directory, FileInfo fileInfo, IOContext context, IndexInputStats stats, long offset, - long length + long length, + ByteRange blobCacheByteRange ) { super(resourceDesc, context); this.logger = Objects.requireNonNull(logger); - this.blobContainer = Objects.requireNonNull(blobContainer); + this.directory = Objects.requireNonNull(directory); + this.blobContainer = Objects.requireNonNull(directory.blobContainer()); this.fileInfo = Objects.requireNonNull(fileInfo); this.context = Objects.requireNonNull(context); assert fileInfo.metadata().hashEqualsContents() == false : "this method should only be used with blobs that are NOT stored in metadata's hash field " + "(fileInfo: " + fileInfo + ')'; + this.blobCacheByteRange = Objects.requireNonNull(blobCacheByteRange); this.stats = Objects.requireNonNull(stats); this.offset = offset; this.length = length; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java index 1a10890b2ab35..d6fac81fe4166 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lucene.store.ByteArrayIndexInput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.LazyInitializable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; @@ -59,6 +60,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots; import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants; +import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; import org.elasticsearch.xpack.searchablesnapshots.cache.FrozenCacheService; import org.elasticsearch.xpack.searchablesnapshots.cache.FrozenCacheService.FrozenCacheFile; @@ -86,6 +88,7 @@ import static org.apache.lucene.store.BufferedIndexInput.bufferSize; import static org.elasticsearch.index.IndexModule.INDEX_STORE_TYPE_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_BLOB_CACHE_METADATA_FILES_MAX_LENGTH_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING; @@ -136,6 +139,7 @@ public class SearchableSnapshotDirectory extends BaseDirectory { private final AtomicBoolean closed; private final boolean partial; private final FrozenCacheService frozenCacheService; + private final ByteSizeValue blobStoreCacheMaxLength; // volatile fields are updated once under `this` lock, all together, iff loaded is not true. private volatile BlobStoreIndexShardSnapshot snapshot; @@ -179,6 +183,7 @@ public SearchableSnapshotDirectory( this.excludedFileTypes = new HashSet<>(SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING.get(indexSettings)); this.uncachedChunkSize = SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING.get(indexSettings).getBytes(); this.blobStoreCachePath = String.join("/", snapshotId.getUUID(), indexId.getId(), String.valueOf(shardId.id())); + this.blobStoreCacheMaxLength = SNAPSHOT_BLOB_CACHE_METADATA_FILES_MAX_LENGTH_SETTING.get(indexSettings); this.threadPool = threadPool; this.loaded = false; this.frozenCacheService = frozenCacheService; @@ -419,14 +424,7 @@ public IndexInput openInput(final String name, final IOContext context) throws I ); } } else { - return new DirectBlobContainerIndexInput( - blobContainer(), - fileInfo, - context, - inputStats, - getUncachedChunkSize(), - bufferSize(context) - ); + return new DirectBlobContainerIndexInput(this, fileInfo, context, inputStats, getUncachedChunkSize(), bufferSize(context)); } } @@ -679,10 +677,19 @@ public static SearchableSnapshotDirectory unwrapDirectory(Directory dir) { return null; } - public CachedBlob getCachedBlob(String name, long offset, int length) { - final CachedBlob cachedBlob = blobStoreCacheService.get(repository, name, blobStoreCachePath, offset); - assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || cachedBlob.from() <= offset; - assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || offset + length <= cachedBlob.to(); + public ByteRange getBlobCacheByteRange(String fileName, long fileLength) { + return blobStoreCacheService.computeBlobCacheByteRange(fileName, fileLength, blobStoreCacheMaxLength); + } + + public CachedBlob getCachedBlob(String name, ByteRange range) { + final CachedBlob cachedBlob = blobStoreCacheService.get(repository, name, blobStoreCachePath, range.start()); + if (cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY) { + return cachedBlob; + } else if (cachedBlob.from() != range.start() || cachedBlob.to() != range.end()) { + // expected range in cache might differ with the returned cached blob; this can happen if the range to put in cache is changed + // between versions or through the index setting. In this case we assume it is a cache miss to force the blob to be cached again + return CachedBlob.CACHE_MISS; + } return cachedBlob; } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java index 511c0926bdb6f..a71405ee53158 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java @@ -56,7 +56,6 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn private static final Logger logger = LogManager.getLogger(CachedBlobContainerIndexInput.class); private static final int COPY_BUFFER_SIZE = ByteSizeUnit.KB.toIntBytes(8); - private final SearchableSnapshotDirectory directory; private final CacheFileReference cacheFileReference; private final int defaultRangeSize; private final int recoveryRangeSize; @@ -84,7 +83,8 @@ public CachedBlobContainerIndexInput( fileInfo.length(), new CacheFileReference(directory, fileInfo.physicalName(), fileInfo.length()), rangeSize, - recoveryRangeSize + recoveryRangeSize, + directory.getBlobCacheByteRange(fileInfo.physicalName(), fileInfo.length()) ); assert getBufferSize() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE; // must be able to cache at least one buffer's worth stats.incrementOpenCount(); @@ -100,10 +100,10 @@ private CachedBlobContainerIndexInput( long length, CacheFileReference cacheFileReference, int rangeSize, - int recoveryRangeSize + int recoveryRangeSize, + ByteRange blobCacheByteRange ) { - super(logger, resourceDesc, directory.blobContainer(), fileInfo, context, stats, offset, length); - this.directory = directory; + super(logger, resourceDesc, directory, fileInfo, context, stats, offset, length, blobCacheByteRange); this.cacheFileReference = cacheFileReference; this.lastReadPosition = this.offset; this.lastSeekPosition = this.offset; @@ -157,30 +157,18 @@ protected void doReadInternal(ByteBuffer b) throws IOException { } // Requested data is not on disk, so try the cache index next. - final ByteRange indexCacheMiss; // null if not a miss - // We try to use the cache index if: - // - the file is small enough to be fully cached - final boolean canBeFullyCached = fileInfo.length() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE * 2; - // - we're reading the first N bytes of the file - final boolean isStartOfFile = (position + length <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE); - - if (canBeFullyCached || isStartOfFile) { - final CachedBlob cachedBlob = directory.getCachedBlob(fileInfo.physicalName(), 0L, length); + if (blobCacheByteRange.contains(position, position + length)) { + final CachedBlob cachedBlob = directory.getCachedBlob(fileInfo.physicalName(), blobCacheByteRange); + assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || cachedBlob.from() <= position; + assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || length <= cachedBlob.length(); if (cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY) { // We would have liked to find a cached entry but we did not find anything: the cache on the disk will be requested // so we compute the region of the file we would like to have the next time. The region is expressed as a tuple of // {start, end} where positions are relative to the whole file. - - if (canBeFullyCached) { - // if the index input is smaller than twice the size of the blob cache, it will be fully indexed - indexCacheMiss = ByteRange.of(0L, fileInfo.length()); - } else { - // the index input is too large to fully cache, so just cache the initial range - indexCacheMiss = ByteRange.of(0L, BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE); - } + indexCacheMiss = blobCacheByteRange; // We must fill in a cache miss even if CACHE_NOT_READY since the cache index is only created on the first put. // TODO TBD use a different trigger for creating the cache index and avoid a put in the CACHE_NOT_READY case. @@ -192,15 +180,15 @@ protected void doReadInternal(ByteBuffer b) throws IOException { position ); stats.addIndexCacheBytesRead(cachedBlob.length()); - - final BytesRefIterator cachedBytesIterator = cachedBlob.bytes().slice(toIntBytes(position), length).iterator(); - BytesRef bytesRef; - while ((bytesRef = cachedBytesIterator.next()) != null) { - b.put(bytesRef.bytes, bytesRef.offset, bytesRef.length); - } - assert b.position() == length : "copied " + b.position() + " but expected " + length; - try { + final int sliceOffset = toIntBytes(position - cachedBlob.from()); + final BytesRefIterator cachedBytesIterator = cachedBlob.bytes().slice(sliceOffset, length).iterator(); + BytesRef bytesRef; + while ((bytesRef = cachedBytesIterator.next()) != null) { + b.put(bytesRef.bytes, bytesRef.offset, bytesRef.length); + } + assert b.position() == length : "copied " + b.position() + " but expected " + length; + final ByteRange cachedRange = ByteRange.of(cachedBlob.from(), cachedBlob.to()); cacheFile.populateAndRead( cachedRange, @@ -278,7 +266,6 @@ protected void doReadInternal(ByteBuffer b) throws IOException { // - use a BigArrays for allocation // - use an intermediate copy buffer to read the file in sensibly-sized chunks // - release the buffer once the indexing operation is complete - assert indexCacheMissLength <= COPY_BUFFER_SIZE : indexCacheMiss; final ByteBuffer byteBuffer = ByteBuffer.allocate(indexCacheMissLength); Channels.readFromFileChannelWithEofException(channel, indexCacheMiss.start(), byteBuffer); @@ -600,7 +587,8 @@ public IndexInput slice(String sliceDescription, long offset, long length) { length, cacheFileReference, defaultRangeSize, - recoveryRangeSize + recoveryRangeSize, + ByteRange.EMPTY // TODO implement blob cache for slices when it makes sense (like CFs) ); slice.isClone = true; return slice; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java index 772320e52ba66..a1b6be776cac3 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/FrozenIndexInput.java @@ -48,7 +48,6 @@ public class FrozenIndexInput extends BaseSearchableSnapshotIndexInput { private static final Logger logger = LogManager.getLogger(FrozenIndexInput.class); private static final int COPY_BUFFER_SIZE = ByteSizeUnit.KB.toIntBytes(8); - private final SearchableSnapshotDirectory directory; private final FrozenCacheFile frozenCacheFile; private final int defaultRangeSize; private final int recoveryRangeSize; @@ -76,7 +75,8 @@ public FrozenIndexInput( fileInfo.length(), directory.getFrozenCacheFile(fileInfo.physicalName(), fileInfo.length()), rangeSize, - recoveryRangeSize + recoveryRangeSize, + directory.getBlobCacheByteRange(fileInfo.physicalName(), fileInfo.length()) ); assert getBufferSize() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE; // must be able to cache at least one buffer's worth stats.incrementOpenCount(); @@ -92,10 +92,10 @@ private FrozenIndexInput( long length, FrozenCacheFile frozenCacheFile, int rangeSize, - int recoveryRangeSize + int recoveryRangeSize, + ByteRange blobCacheByteRange ) { - super(logger, resourceDesc, directory.blobContainer(), fileInfo, context, stats, offset, length); - this.directory = directory; + super(logger, resourceDesc, directory, fileInfo, context, stats, offset, length, blobCacheByteRange); this.frozenCacheFile = frozenCacheFile; this.lastReadPosition = this.offset; this.lastSeekPosition = this.offset; @@ -166,30 +166,18 @@ protected void doReadInternal(ByteBuffer b) throws IOException { } // Requested data is not on disk, so try the cache index next. - final ByteRange indexCacheMiss; // null if not a miss - // We try to use the cache index if: - // - the file is small enough to be fully cached - final boolean canBeFullyCached = fileInfo.length() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE * 2; - // - we're reading the first N bytes of the file - final boolean isStartOfFile = (position + length <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE); - - if (canBeFullyCached || isStartOfFile) { - final CachedBlob cachedBlob = directory.getCachedBlob(fileInfo.physicalName(), 0L, length); + if (blobCacheByteRange.contains(position, position + length)) { + final CachedBlob cachedBlob = directory.getCachedBlob(fileInfo.physicalName(), blobCacheByteRange); + assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || cachedBlob.from() <= position; + assert cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY || length <= cachedBlob.length(); if (cachedBlob == CachedBlob.CACHE_MISS || cachedBlob == CachedBlob.CACHE_NOT_READY) { // We would have liked to find a cached entry but we did not find anything: the cache on the disk will be requested // so we compute the region of the file we would like to have the next time. The region is expressed as a tuple of // {start, end} where positions are relative to the whole file. - - if (canBeFullyCached) { - // if the index input is smaller than twice the size of the blob cache, it will be fully indexed - indexCacheMiss = ByteRange.of(0L, fileInfo.length()); - } else { - // the index input is too large to fully cache, so just cache the initial range - indexCacheMiss = ByteRange.of(0L, (long) BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE); - } + indexCacheMiss = blobCacheByteRange; // We must fill in a cache miss even if CACHE_NOT_READY since the cache index is only created on the first put. // TODO TBD use a different trigger for creating the cache index and avoid a put in the CACHE_NOT_READY case. @@ -203,17 +191,17 @@ protected void doReadInternal(ByteBuffer b) throws IOException { stats.addIndexCacheBytesRead(cachedBlob.length()); preventAsyncBufferChanges.run(); - - final BytesRefIterator cachedBytesIterator = cachedBlob.bytes().slice(toIntBytes(position), length).iterator(); - int copiedBytes = 0; - BytesRef bytesRef; - while ((bytesRef = cachedBytesIterator.next()) != null) { - b.put(bytesRef.bytes, bytesRef.offset, bytesRef.length); - copiedBytes += bytesRef.length; - } - assert copiedBytes == length : "copied " + copiedBytes + " but expected " + length; - try { + final int sliceOffset = toIntBytes(position - cachedBlob.from()); + final BytesRefIterator cachedBytesIterator = cachedBlob.bytes().slice(sliceOffset, length).iterator(); + int copiedBytes = 0; + BytesRef bytesRef; + while ((bytesRef = cachedBytesIterator.next()) != null) { + b.put(bytesRef.bytes, bytesRef.offset, bytesRef.length); + copiedBytes += bytesRef.length; + } + assert copiedBytes == length : "copied " + copiedBytes + " but expected " + length; + final ByteRange cachedRange = ByteRange.of(cachedBlob.from(), cachedBlob.to()); frozenCacheFile.populateAndRead( cachedRange, @@ -316,12 +304,11 @@ protected void doReadInternal(ByteBuffer b) throws IOException { if (indexCacheMiss != null) { final Releasable onCacheFillComplete = stats.addIndexCacheFill(); final int indexCacheMissLength = toIntBytes(indexCacheMiss.length()); + // We assume that we only cache small portions of blobs so that we do not need to: // - use a BigArrays for allocation // - use an intermediate copy buffer to read the file in sensibly-sized chunks // - release the buffer once the indexing operation is complete - assert indexCacheMissLength <= COPY_BUFFER_SIZE : indexCacheMiss; - final ByteBuffer byteBuffer = ByteBuffer.allocate(indexCacheMissLength); final StepListener readListener = frozenCacheFile.readIfAvailableOrPending( @@ -642,7 +629,8 @@ public IndexInput slice(String sliceDescription, long offset, long length) { length, frozenCacheFile, defaultRangeSize, - recoveryRangeSize + recoveryRangeSize, + ByteRange.EMPTY // TODO implement blob cache for slices when it makes sense (like CFs) ); slice.isClone = true; return slice; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java index b083aa7304e7a..d732c6f036bf0 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java @@ -18,6 +18,8 @@ import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; import org.elasticsearch.index.store.BaseSearchableSnapshotIndexInput; import org.elasticsearch.index.store.IndexInputStats; +import org.elasticsearch.index.store.SearchableSnapshotDirectory; +import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange; import java.io.Closeable; import java.io.EOFException; @@ -67,7 +69,7 @@ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn private static final int COPY_BUFFER_SIZE = 8192; public DirectBlobContainerIndexInput( - BlobContainer blobContainer, + SearchableSnapshotDirectory directory, FileInfo fileInfo, IOContext context, IndexInputStats stats, @@ -76,7 +78,7 @@ public DirectBlobContainerIndexInput( ) { this( "DirectBlobContainerIndexInput(" + fileInfo.physicalName() + ")", - blobContainer, + directory, fileInfo, context, stats, @@ -91,7 +93,7 @@ public DirectBlobContainerIndexInput( private DirectBlobContainerIndexInput( String resourceDesc, - BlobContainer blobContainer, + SearchableSnapshotDirectory directory, FileInfo fileInfo, IOContext context, IndexInputStats stats, @@ -101,7 +103,7 @@ private DirectBlobContainerIndexInput( long sequentialReadSize, int bufferSize ) { - super(logger, resourceDesc, blobContainer, fileInfo, context, stats, offset, length); + super(logger, resourceDesc, directory, fileInfo, context, stats, offset, length, ByteRange.EMPTY); // TODO should use blob cache this.position = position; assert sequentialReadSize >= 0; this.sequentialReadSize = sequentialReadSize; @@ -271,7 +273,7 @@ protected void seekInternal(long pos) throws IOException { public DirectBlobContainerIndexInput clone() { final DirectBlobContainerIndexInput clone = new DirectBlobContainerIndexInput( "clone(" + this + ")", - blobContainer, + directory, fileInfo, context, stats, @@ -292,7 +294,7 @@ public IndexInput slice(String sliceDescription, long offset, long length) throw if ((offset >= 0L) && (length >= 0L) && (offset + length <= length())) { final DirectBlobContainerIndexInput slice = new DirectBlobContainerIndexInput( getFullSliceDescription(sliceDescription), - blobContainer, + directory, fileInfo, context, stats, diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index 6844421d67155..cbdd3b94e1201 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -6,6 +6,7 @@ */ package org.elasticsearch.xpack.searchablesnapshots; +import org.apache.lucene.store.BufferedIndexInput; import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequest; @@ -95,6 +96,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.function.Function; @@ -190,6 +192,31 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Eng Setting.Property.PrivateIndex, Setting.Property.NotCopyableOnResize ); + public static final String SNAPSHOT_BLOB_CACHE_METADATA_FILES_MAX_LENGTH = "index.store.snapshot.blob_cache.metadata_files.max_length"; + public static final Setting SNAPSHOT_BLOB_CACHE_METADATA_FILES_MAX_LENGTH_SETTING = new Setting<>( + new Setting.SimpleKey(SNAPSHOT_BLOB_CACHE_METADATA_FILES_MAX_LENGTH), + s -> new ByteSizeValue(64L, ByteSizeUnit.KB).getStringRep(), + s -> Setting.parseByteSize( + s, + new ByteSizeValue(1L, ByteSizeUnit.KB), + new ByteSizeValue(Long.MAX_VALUE), + SNAPSHOT_BLOB_CACHE_METADATA_FILES_MAX_LENGTH + ), + value -> { + if (value.getBytes() % BufferedIndexInput.BUFFER_SIZE != 0L) { + final String message = String.format( + Locale.ROOT, + "failed to parse value [%s] for setting [%s], must be a multiple of [%s] bytes", + value.getStringRep(), + SNAPSHOT_BLOB_CACHE_METADATA_FILES_MAX_LENGTH, + BufferedIndexInput.BUFFER_SIZE + ); + throw new IllegalArgumentException(message); + } + }, + Setting.Property.IndexScope, + Setting.Property.NotCopyableOnResize + ); /** * Prefer to allocate to the cold tier, then the frozen tier, then the warm tier, then the hot tier @@ -270,6 +297,7 @@ public List> getSettings() { SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING, SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING, SNAPSHOT_PARTIAL_SETTING, + SNAPSHOT_BLOB_CACHE_METADATA_FILES_MAX_LENGTH_SETTING, CacheService.SNAPSHOT_CACHE_SIZE_SETTING, CacheService.SNAPSHOT_CACHE_RANGE_SIZE_SETTING, CacheService.SNAPSHOT_CACHE_RECOVERY_RANGE_SIZE_SETTING, @@ -311,7 +339,12 @@ public Collection createComponents( final FrozenCacheService frozenCacheService = new FrozenCacheService(environment, threadPool); this.frozenCacheService.set(frozenCacheService); components.add(cacheService); - final BlobStoreCacheService blobStoreCacheService = new BlobStoreCacheService(threadPool, client, SNAPSHOT_BLOB_CACHE_INDEX); + final BlobStoreCacheService blobStoreCacheService = new BlobStoreCacheService( + clusterService, + threadPool, + client, + SNAPSHOT_BLOB_CACHE_INDEX + ); this.blobStoreCacheService.set(blobStoreCacheService); components.add(blobStoreCacheService); } else { diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/ByteRange.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/ByteRange.java index 96e1725b3a0c6..165634f5e0978 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/ByteRange.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/ByteRange.java @@ -65,6 +65,10 @@ public boolean isSubRangeOf(ByteRange range) { return start >= range.start() && end <= range.end(); } + public boolean contains(long start, long end) { + return start() <= start && end <= end(); + } + @Override public int hashCode() { return 31 * Long.hashCode(start) + Long.hashCode(end); @@ -84,7 +88,7 @@ public boolean equals(Object obj) { @Override public String toString() { - return "ByteRange{" + start + "}{" + end + "}"; + return "ByteRange [" + start + "-" + end + ']'; } @Override diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java index 88f8e69ac6854..b399c3c7c8a87 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java @@ -79,7 +79,7 @@ public void testOpenCount() throws Exception { IndexInputStats inputStats = directory.getStats(fileName); assertThat(inputStats, (i == 0L) ? nullValue() : notNullValue()); - final IndexInput input = directory.openInput(fileName, newIOContext(random())); + final IndexInput input = directory.openInput(fileName, randomIOContext()); inputStats = directory.getStats(fileName); assertThat(inputStats.getOpened().longValue(), equalTo(i + 1L)); input.close(); @@ -94,7 +94,7 @@ public void testCloseCount() throws Exception { executeTestCase((fileName, fileContent, directory) -> { try { for (long i = 0L; i < randomLongBetween(1L, 20L); i++) { - final IndexInput input = directory.openInput(fileName, newIOContext(random())); + final IndexInput input = directory.openInput(fileName, randomIOContext()); IndexInputStats inputStats = directory.getStats(fileName); assertThat(inputStats, notNullValue()); @@ -115,7 +115,7 @@ public void testCachedBytesReadsAndWrites() throws Exception { final ByteSizeValue cacheSize = new ByteSizeValue(10, ByteSizeUnit.MB); executeTestCaseWithCache(cacheSize, rangeSize, (fileName, fileContent, directory) -> { - try (IndexInput input = directory.openInput(fileName, newIOContext(random()))) { + try (IndexInput input = directory.openInput(fileName, randomIOContext())) { final long length = input.length(); final IndexInputStats inputStats = directory.getStats(fileName); @@ -167,7 +167,7 @@ public void testCachedBytesReadsAndWrites() throws Exception { public void testCachedBytesReadsAndWritesNoCache() throws Exception { final ByteSizeValue uncachedChunkSize = new ByteSizeValue(randomIntBetween(512, MAX_FILE_LENGTH), ByteSizeUnit.BYTES); executeTestCaseWithoutCache(uncachedChunkSize, (fileName, fileContent, directory) -> { - try (IndexInput input = directory.openInput(fileName, newIOContext(random()))) { + try (IndexInput input = directory.openInput(fileName, randomIOContext())) { final long length = input.length(); final IndexInputStats inputStats = directory.getStats(fileName); @@ -193,7 +193,7 @@ public void testDirectBytesReadsWithCache() throws Exception { executeTestCaseWithCache(ByteSizeValue.ZERO, randomCacheRangeSize(), (fileName, fileContent, directory) -> { assertThat(directory.getStats(fileName), nullValue()); - final IOContext ioContext = newIOContext(random()); + final IOContext ioContext = randomIOContext(); try { IndexInput input = directory.openInput(fileName, ioContext); if (randomBoolean()) { @@ -246,7 +246,7 @@ public void testDirectBytesReadsWithoutCache() throws Exception { executeTestCaseWithoutCache(uncachedChunkSize, (fileName, fileContent, directory) -> { assertThat(directory.getStats(fileName), nullValue()); - final IOContext ioContext = newIOContext(random()); + final IOContext ioContext = randomIOContext(); try (IndexInput original = directory.openInput(fileName, ioContext)) { final IndexInput input = original.clone(); // always clone to only execute direct reads final IndexInputStats inputStats = directory.getStats(fileName); @@ -281,7 +281,7 @@ public void testOptimizedBytesReads() throws Exception { // use a large uncached chunk size that allows to read the file in a single operation final ByteSizeValue uncachedChunkSize = new ByteSizeValue(1, ByteSizeUnit.GB); executeTestCaseWithoutCache(uncachedChunkSize, (fileName, fileContent, directory) -> { - final IOContext context = newIOContext(random()); + final IOContext context = randomIOContext(); try (IndexInput input = directory.openInput(fileName, context)) { final IndexInputStats inputStats = directory.getStats(fileName); assertThat(inputStats, notNullValue()); @@ -316,7 +316,7 @@ public void testOptimizedBytesReads() throws Exception { public void testReadBytesContiguously() throws Exception { executeTestCaseWithDefaultCache((fileName, fileContent, cacheDirectory) -> { - final IOContext ioContext = newIOContext(random()); + final IOContext ioContext = randomIOContext(); try (IndexInput input = cacheDirectory.openInput(fileName, ioContext)) { final IndexInputStats inputStats = cacheDirectory.getStats(fileName); @@ -367,7 +367,7 @@ public void testReadBytesContiguously() throws Exception { public void testReadBytesNonContiguously() throws Exception { executeTestCaseWithDefaultCache((fileName, fileContent, cacheDirectory) -> { - final IOContext ioContext = newIOContext(random()); + final IOContext ioContext = randomIOContext(); try (IndexInput input = cacheDirectory.openInput(fileName, ioContext)) { final IndexInputStats inputStats = cacheDirectory.getStats(fileName); @@ -418,7 +418,7 @@ public void testReadBytesNonContiguously() throws Exception { public void testForwardSeeks() throws Exception { executeTestCaseWithDefaultCache((fileName, fileContent, cacheDirectory) -> { - final IOContext ioContext = newIOContext(random()); + final IOContext ioContext = randomIOContext(); try (IndexInput indexInput = cacheDirectory.openInput(fileName, ioContext)) { IndexInput input = indexInput; if (randomBoolean()) { @@ -476,7 +476,7 @@ public void testForwardSeeks() throws Exception { public void testBackwardSeeks() throws Exception { executeTestCaseWithDefaultCache((fileName, fileContent, cacheDirectory) -> { - final IOContext ioContext = newIOContext(random()); + final IOContext ioContext = randomIOContext(); try (IndexInput indexInput = cacheDirectory.openInput(fileName, ioContext)) { IndexInput input = indexInput; if (randomBoolean()) { @@ -606,8 +606,7 @@ private void executeTestCase( ) throws Exception { final byte[] fileContent = randomByteArrayOfLength(randomIntBetween(10, MAX_FILE_LENGTH)); - final String fileExtension = randomAlphaOfLength(3); - final String fileName = randomAlphaOfLength(10) + '.' + fileExtension; + final String fileName = randomAlphaOfLength(10) + randomFileExtension(); final SnapshotId snapshotId = new SnapshotId("_name", "_uuid"); final IndexId indexId = new IndexId("_name", "_uuid"); final ShardId shardId = new ShardId("_name", "_uuid", 0); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java index 6d59c20a992d9..7466320023b5b 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java @@ -632,8 +632,8 @@ protected void assertSnapshotOrGenericThread() { private void testIndexInputs(final CheckedBiConsumer consumer) throws Exception { testDirectories((directory, snapshotDirectory) -> { - for (String fileName : randomSubsetOf(asList(snapshotDirectory.listAll()))) { - final IOContext context = newIOContext(random()); + for (String fileName : randomSubsetOf(Arrays.asList(snapshotDirectory.listAll()))) { + final IOContext context = randomIOContext(); try (IndexInput indexInput = directory.openInput(fileName, context)) { final List closeables = new ArrayList<>(); try { @@ -660,8 +660,7 @@ public void testClearCache() throws Exception { final Path shardSnapshotDir = createTempDir(); for (int i = 0; i < nbRandomFiles; i++) { - final String fileName = "file_" + randomAlphaOfLength(10); - + final String fileName = randomAlphaOfLength(5) + randomFileExtension(); final Tuple bytes = randomChecksumBytes(randomIntBetween(1, 100_000)); final byte[] input = bytes.v2(); final String checksum = bytes.v1(); @@ -727,7 +726,7 @@ public void testClearCache() throws Exception { final BlobStoreIndexShardSnapshot.FileInfo fileInfo = randomFrom(randomFiles); final int fileLength = toIntBytes(fileInfo.length()); - try (IndexInput input = directory.openInput(fileInfo.physicalName(), newIOContext(random()))) { + try (IndexInput input = directory.openInput(fileInfo.physicalName(), randomIOContext())) { assertThat(input.length(), equalTo((long) fileLength)); final int start = between(0, fileLength - 1); final int end = between(start + 1, fileLength); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java index 55112813cc21e..762d9bedb0133 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java @@ -63,7 +63,7 @@ public void testRandomReads() throws Exception { ShardId shardId = new ShardId("_name", "_uuid", 0); for (int i = 0; i < 5; i++) { - final String fileName = randomAlphaOfLength(10); + final String fileName = randomAlphaOfLength(5) + randomFileExtension(); final Tuple bytes = randomChecksumBytes(randomIntBetween(1, 100_000)); final byte[] input = bytes.v2(); @@ -129,7 +129,7 @@ public void testRandomReads() throws Exception { assertThat("Snapshot should be loaded", directory.snapshot(), notNullValue()); assertThat("BlobContainer should be loaded", directory.blobContainer(), notNullValue()); - try (IndexInput indexInput = directory.openInput(fileName, newIOContext(random()))) { + try (IndexInput indexInput = directory.openInput(fileName, randomIOContext())) { assertThat(indexInput, instanceOf(CachedBlobContainerIndexInput.class)); assertEquals(input.length, indexInput.length()); assertEquals(0, indexInput.getFilePointer()); @@ -178,7 +178,7 @@ public void testThrowsEOFException() throws Exception { IndexId indexId = new IndexId("_name", "_uuid"); ShardId shardId = new ShardId("_name", "_uuid", 0); - final String fileName = randomAlphaOfLength(10); + final String fileName = randomAlphaOfLength(5) + randomFileExtension(); final Tuple bytes = randomChecksumBytes(randomIntBetween(1, 1000)); final byte[] input = bytes.v2(); @@ -231,7 +231,7 @@ public void testThrowsEOFException() throws Exception { assertThat("Snapshot should be loaded", searchableSnapshotDirectory.snapshot(), notNullValue()); assertThat("BlobContainer should be loaded", searchableSnapshotDirectory.blobContainer(), notNullValue()); - try (IndexInput indexInput = searchableSnapshotDirectory.openInput(fileName, newIOContext(random()))) { + try (IndexInput indexInput = searchableSnapshotDirectory.openInput(fileName, randomIOContext())) { assertThat(indexInput, instanceOf(CachedBlobContainerIndexInput.class)); final byte[] buffer = new byte[input.length + 1]; final IOException exception = expectThrows(IOException.class, () -> indexInput.readBytes(buffer, 0, buffer.length)); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/FrozenIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/FrozenIndexInputTests.java index ef4cadb062595..8342799b1940b 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/FrozenIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/FrozenIndexInputTests.java @@ -34,7 +34,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Collections; -import java.util.Locale; import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Mockito.mock; @@ -44,7 +43,7 @@ public class FrozenIndexInputTests extends AbstractSearchableSnapshotsTestCase { private static final ShardId SHARD_ID = new ShardId(new Index("_index_name", "_index_id"), 0); public void testRandomReads() throws IOException { - final String fileName = randomAlphaOfLengthBetween(5, 10).toLowerCase(Locale.ROOT); + final String fileName = randomAlphaOfLength(5) + randomFileExtension(); final Tuple bytes = randomChecksumBytes(randomIntBetween(1, 100_000)); final byte[] fileData = bytes.v2(); @@ -104,7 +103,7 @@ public void testRandomReads() throws IOException { directory.loadSnapshot(createRecoveryState(true), ActionListener.wrap(() -> {})); // TODO does not test using the recovery range size - final IndexInput indexInput = directory.openInput(fileName, newIOContext(random())); + final IndexInput indexInput = directory.openInput(fileName, randomIOContext()); assertThat(indexInput, instanceOf(FrozenIndexInput.class)); assertEquals(fileData.length, indexInput.length()); assertEquals(0, indexInput.getFilePointer()); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java index 5fd47897eb9fc..7ed84c9b2c428 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java @@ -314,7 +314,12 @@ private static Client mockClient() { public static class NoopBlobStoreCacheService extends BlobStoreCacheService { public NoopBlobStoreCacheService() { - super(null, mockClient(), null); + super(null, null, mockClient(), null); + } + + @Override + protected boolean useLegacyCachedBlobSizes() { + return false; } @Override @@ -340,7 +345,7 @@ public static class SimpleBlobStoreCacheService extends BlobStoreCacheService { private final ConcurrentHashMap blobs = new ConcurrentHashMap<>(); public SimpleBlobStoreCacheService() { - super(null, mockClient(), null); + super(null, null, mockClient(), null); } private static Client mockClient() { @@ -349,6 +354,11 @@ private static Client mockClient() { return client; } + @Override + protected boolean useLegacyCachedBlobSizes() { + return false; + } + @Override protected void getAsync(String repository, String name, String path, long offset, ActionListener listener) { CachedBlob blob = blobs.get(CachedBlob.generateId(repository, name, path, offset)); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java index e165d44bcb70d..7820ddf499ee8 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java @@ -8,6 +8,7 @@ import org.apache.lucene.store.BufferedIndexInput; import org.apache.lucene.util.Version; +import org.elasticsearch.blobstore.cache.CachedBlob; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lucene.store.ESIndexInputTestCase; @@ -15,7 +16,9 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; import org.elasticsearch.index.store.IndexInputStats; +import org.elasticsearch.index.store.SearchableSnapshotDirectory; import org.elasticsearch.index.store.StoreFileMetadata; +import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange; import java.io.ByteArrayInputStream; import java.io.EOFException; @@ -25,6 +28,8 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.xpack.searchablesnapshots.AbstractSearchableSnapshotsTestCase.randomChecksumBytes; +import static org.elasticsearch.xpack.searchablesnapshots.AbstractSearchableSnapshotsTestCase.randomFileExtension; +import static org.elasticsearch.xpack.searchablesnapshots.AbstractSearchableSnapshotsTestCase.randomIOContext; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsUtils.toIntBytes; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; @@ -33,6 +38,7 @@ import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.startsWith; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; @@ -59,9 +65,10 @@ private DirectBlobContainerIndexInput createIndexInput( String checksum, Runnable onReadBlob ) throws IOException { + final String fileName = randomAlphaOfLength(5) + randomFileExtension(); final FileInfo fileInfo = new FileInfo( randomAlphaOfLength(5), - new StoreFileMetadata("test", input.length, checksum, Version.LATEST), + new StoreFileMetadata(fileName, input.length, checksum, Version.LATEST), partSize == input.length ? randomFrom( new ByteSizeValue(partSize, ByteSizeUnit.BYTES), @@ -116,10 +123,15 @@ public int read(byte[] b, int off, int len) throws IOException { }; } }); + + final SearchableSnapshotDirectory directory = mock(SearchableSnapshotDirectory.class); + when(directory.getCachedBlob(anyString(), any(ByteRange.class))).thenReturn(CachedBlob.CACHE_NOT_READY); + when(directory.blobContainer()).thenReturn(blobContainer); + final DirectBlobContainerIndexInput indexInput = new DirectBlobContainerIndexInput( - blobContainer, + directory, fileInfo, - newIOContext(random()), + randomIOContext(), new IndexInputStats(1, 0L, () -> 0L), minimumReadSize, randomBoolean() ? BufferedIndexInput.BUFFER_SIZE : between(BufferedIndexInput.MIN_BUFFER_SIZE, BufferedIndexInput.BUFFER_SIZE) diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java index 2e64651ccb6db..a76a3618022db 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java @@ -11,6 +11,7 @@ import org.apache.lucene.store.ByteBuffersDataOutput; import org.apache.lucene.store.ByteBuffersIndexInput; import org.apache.lucene.store.ByteBuffersIndexOutput; +import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.elasticsearch.Version; @@ -333,4 +334,47 @@ public static Tuple randomChecksumBytes(byte[] bytes) throws IOE } return Tuple.tuple(checksum, out.toArrayCopy()); } + + public static String randomFileExtension() { + return randomFrom( + ".cfe", + ".cfs", + ".dii", + ".dim", + ".doc", + ".dvd", + ".dvm", + ".fdt", + ".fdx", + ".fdm", + ".fnm", + ".kdd", + ".kdi", + ".kdm", + ".liv", + ".nvd", + ".nvm", + ".pay", + ".pos", + ".tim", + ".tip", + ".tmd", + ".tvd", + ".tvx", + ".vec", + ".vem" + ); + } + + /** + * @return a random {@link IOContext} that corresponds to a default, read or read_once usage. + * + * It's important that the context returned by this method is not a "merge" once as {@link org.apache.lucene.store.BufferedIndexInput} + * uses a different buffer size for them. + */ + public static IOContext randomIOContext() { + final IOContext ioContext = randomFrom(IOContext.DEFAULT, IOContext.READ, IOContext.READONCE); + assert ioContext.context != IOContext.Context.MERGE; + return ioContext; + } } diff --git a/x-pack/qa/rolling-upgrade/build.gradle b/x-pack/qa/rolling-upgrade/build.gradle index 5d082a64fcb3b..376d0ff7f65a1 100644 --- a/x-pack/qa/rolling-upgrade/build.gradle +++ b/x-pack/qa/rolling-upgrade/build.gradle @@ -33,6 +33,7 @@ tasks.register("copyTestNodeKeyMaterial", Copy) { for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { String baseName = "v${bwcVersion}" + String repositoryPath = "${buildDir}/cluster/shared/repo/${baseName}" testClusters { "${baseName}" { @@ -41,7 +42,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { numberOfNodes = 3 setting 'repositories.url.allowed_urls', 'http://snapshot.test*' - setting 'path.repo', "${buildDir}/cluster/shared/repo/${baseName}" + setting 'path.repo', repositoryPath setting 'http.content_type.required', 'true' setting 'xpack.license.self_generated.type', 'trial' setting 'xpack.security.enabled', 'true' @@ -95,6 +96,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { dependsOn "copyTestNodeKeyMaterial" systemProperty 'tests.rest.suite', 'old_cluster' systemProperty 'tests.upgrade_from_version', oldVersion + systemProperty 'tests.path.repo', repositoryPath nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}") nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}") def toBlackList = [] @@ -129,6 +131,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { systemProperty 'tests.rest.suite', 'mixed_cluster' systemProperty 'tests.first_round', 'true' systemProperty 'tests.upgrade_from_version', oldVersion + systemProperty 'tests.path.repo', repositoryPath // We only need to run these tests once so we may as well do it when we're two thirds upgraded def toBlackList = [ 'mixed_cluster/10_basic/Start scroll in mixed cluster on upgraded node that we will continue after upgrade', @@ -201,6 +204,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { systemProperty 'tests.rest.blacklist', toBlackList.join(',') } systemProperty 'tests.upgrade_from_version', oldVersion + systemProperty 'tests.path.repo', repositoryPath } tasks.register("${baseName}#upgradedClusterTest", StandaloneRestIntegTestTask) { @@ -213,6 +217,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}") systemProperty 'tests.rest.suite', 'upgraded_cluster' systemProperty 'tests.upgrade_from_version', oldVersion + systemProperty 'tests.path.repo', repositoryPath def toBlackList = [] // Dataframe transforms were not added until 7.2.0 if (Version.fromString(oldVersion).before('7.2.0')) { diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/AbstractUpgradeTestCase.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/AbstractUpgradeTestCase.java index e40d9a549df76..75e303c1cb61c 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/AbstractUpgradeTestCase.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/AbstractUpgradeTestCase.java @@ -41,6 +41,11 @@ protected boolean preserveReposUponCompletion() { return true; } + @Override + protected boolean preserveSnapshotsUponCompletion() { + return true; + } + @Override protected boolean preserveTemplatesUponCompletion() { return true; diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/SearchableSnapshotsRollingUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/SearchableSnapshotsRollingUpgradeIT.java new file mode 100644 index 0000000000000..bf5dfefc98319 --- /dev/null +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/SearchableSnapshotsRollingUpgradeIT.java @@ -0,0 +1,312 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.upgrades; + +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpPut; +import org.elasticsearch.Version; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.searchable_snapshots.MountSnapshotRequest.Storage; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.rest.RestStatus; +import org.hamcrest.Matcher; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +public class SearchableSnapshotsRollingUpgradeIT extends AbstractUpgradeTestCase { + + public void testMountFullCopyAndRecoversCorrectly() throws Exception { + final Storage storage = Storage.FULL_COPY; + assumeVersion(Version.V_7_10_0, storage); + + executeMountAndRecoversCorrectlyTestCase(storage, 6789L); + } + + public void testMountPartialCopyAndRecoversCorrectly() throws Exception { + final Storage storage = Storage.SHARED_CACHE; + assumeVersion(Version.V_7_12_0, Storage.SHARED_CACHE); + + executeMountAndRecoversCorrectlyTestCase(storage, 5678L); + } + + /** + * Test that a snapshot mounted as a searchable snapshot index in the previous version recovers correctly during rolling upgrade + */ + private void executeMountAndRecoversCorrectlyTestCase(Storage storage, long numberOfDocs) throws Exception { + final String suffix = storage.storageName().toLowerCase(Locale.ROOT); + final String index = "mounted_index_" + suffix; + + if (CLUSTER_TYPE.equals(ClusterType.OLD)) { + final String repository = "repository_" + suffix; + final String snapshot = "snapshot_" + suffix; + + registerRepository(repository, FsRepository.TYPE, true, + Settings.builder().put("location", System.getProperty("tests.path.repo") + '/' + repository).build()); + + final String originalIndex = "logs_" + suffix; + createIndex(originalIndex, Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(3, 5)) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build()); + indexDocs(originalIndex, numberOfDocs); + createSnapshot(repository, snapshot, originalIndex); + deleteIndex(originalIndex); + + logger.info("mounting snapshot [repository={}, snapshot={}, index={}] as index [{}] with storage [{}] on version [{}]", + repository, snapshot, originalIndex, index, storage, UPGRADE_FROM_VERSION); + mountSnapshot(repository, snapshot, originalIndex, index, storage, Settings.EMPTY); + } + + ensureGreen(index); + assertHitCount(index, equalTo(numberOfDocs)); + } + + public void testBlobStoreCacheWithFullCopyInMixedVersions() throws Exception { + final Storage storage = Storage.FULL_COPY; + assumeVersion(Version.V_7_10_0, storage); + + executeBlobCacheCreationTestCase(storage, 9876L); + } + + public void testBlobStoreCacheWithPartialCopyInMixedVersions() throws Exception { + final Storage storage = Storage.SHARED_CACHE; + assumeVersion(Version.V_7_12_0, Storage.SHARED_CACHE); + + executeBlobCacheCreationTestCase(storage, 8765L); + } + + /** + * Test the behavior of the blob store cache in mixed versions cluster. The idea is to mount a new snapshot as an index on a node with + * version X so that this node generates cached blobs documents in the blob cache system index, and then mount the snapshot again on + * a different node with version Y so that this other node is likely to use the previously generated cached blobs documents. + */ + private void executeBlobCacheCreationTestCase(Storage storage, long numberOfDocs) throws Exception { + final String suffix = "blob_cache_" + storage.storageName().toLowerCase(Locale.ROOT); + final String repository = "repository_" + suffix; + + if (CLUSTER_TYPE.equals(ClusterType.OLD)) { + registerRepository(repository, FsRepository.TYPE, true, + Settings.builder().put("location", System.getProperty("tests.path.repo") + '/' + repository).build()); + + } else if (CLUSTER_TYPE.equals(ClusterType.MIXED)) { + final int numberOfNodes = 3; + waitForNodes(numberOfNodes); + + final Map nodesIdsAndVersions = nodesVersions(); + assertThat("Cluster should have 3 nodes", nodesIdsAndVersions.size(), equalTo(numberOfNodes)); + + final Version minVersion = nodesIdsAndVersions.values().stream().min(Version::compareTo).get(); + final Version maxVersion = nodesIdsAndVersions.values().stream().max(Version::compareTo).get(); + + final String nodeIdWithMinVersion = randomFrom(nodesIdsAndVersions.entrySet().stream() + .filter(node -> minVersion.equals(node.getValue())).map(Map.Entry::getKey) + .collect(Collectors.toSet())); + + final String nodeIdWithMaxVersion = randomValueOtherThan(nodeIdWithMinVersion, + () -> randomFrom(nodesIdsAndVersions.entrySet().stream() + .filter(node -> maxVersion.equals(node.getValue())).map(Map.Entry::getKey) + .collect(Collectors.toSet()))); + + // The snapshot is mounted on the node with the min. version in order to force the node to populate the blob store cache index. + // Then the snapshot is mounted again on a different node with a higher version in order to verify that the docs in the cache + // index can be used. + + final String firstIndex = "first_index_" + suffix; + createIndex(firstIndex, Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(3, 5)) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build()); + indexDocs(firstIndex, numberOfDocs); + + final String firstSnapshot = "first_snapshot_" + suffix; + createSnapshot(repository, firstSnapshot, firstIndex); + deleteIndex(firstIndex); + + String index = "first_mount_" + suffix; + logger.info("mounting snapshot as index [{}] with storage [{}] on node [{}] with min. version [{}]", + index, storage, nodeIdWithMinVersion, minVersion); + mountSnapshot(repository, firstSnapshot, firstIndex, index, storage, + Settings.builder() + // we want a specific node version to create docs in the blob cache index + .put("index.routing.allocation.include._id", nodeIdWithMinVersion) + // prevent interferences with blob cache when full_copy is used + .put("index.store.snapshot.cache.prewarm.enabled", false) + .build()); + ensureGreen(index); + assertHitCount(index, equalTo(numberOfDocs)); + deleteIndex(index); + + index = "second_mount_" + suffix; + logger.info("mounting the same snapshot of index [{}] with storage [{}], this time on node [{}] with higher version [{}]", + index, storage, nodeIdWithMaxVersion, maxVersion); + mountSnapshot(repository, firstSnapshot, firstIndex, index, storage, + Settings.builder() + // we want a specific node version to use the cached blobs created by the nodeIdWithMinVersion + .put("index.routing.allocation.include._id", nodeIdWithMaxVersion) + .put("index.routing.allocation.exclude._id", nodeIdWithMinVersion) + // prevent interferences with blob cache when full_copy is used + .put("index.store.snapshot.cache.prewarm.enabled", false) + .build()); + ensureGreen(index); + assertHitCount(index, equalTo(numberOfDocs)); + deleteIndex(index); + + deleteSnapshot(repository, firstSnapshot); + + // Now the same thing but this time the docs in blob cache index are created from the upgraded version and mounted in a second + // time on the node with the minimum version. + + final String secondIndex = "second_index_" + suffix; + createIndex(secondIndex, Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(3, 5)) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build()); + indexDocs(secondIndex, numberOfDocs * 2L); + + final String secondSnapshot = "second_snapshot_" + suffix; + createSnapshot(repository, secondSnapshot, secondIndex); + deleteIndex(secondIndex); + + index = "first_mount_" + suffix; + logger.info("mounting snapshot as index [{}] with storage [{}] on node [{}] with max. version [{}]", + index, storage, nodeIdWithMaxVersion, maxVersion); + mountSnapshot(repository, secondSnapshot, secondIndex, index, storage, + Settings.builder() + // we want a specific node version to create docs in the blob cache index + .put("index.routing.allocation.include._id", nodeIdWithMaxVersion) + // prevent interferences with blob cache when full_copy is used + .put("index.store.snapshot.cache.prewarm.enabled", false) + .build()); + ensureGreen(index); + assertHitCount(index, equalTo(numberOfDocs * 2L)); + deleteIndex(index); + + index = "second_mount_" + suffix; + logger.info("mounting the same snapshot of index [{}] with storage [{}], this time on node [{}] with lower version [{}]", + index, storage, nodeIdWithMinVersion, minVersion); + mountSnapshot(repository, secondSnapshot, secondIndex, index, storage, + Settings.builder() + // we want a specific node version to use the cached blobs created by the nodeIdWithMinVersion + .put("index.routing.allocation.include._id", nodeIdWithMinVersion) + .put("index.routing.allocation.exclude._id", nodeIdWithMaxVersion) + // prevent interferences with blob cache when full_copy is used + .put("index.store.snapshot.cache.prewarm.enabled", false) + .build()); + ensureGreen(index); + assertHitCount(index, equalTo(numberOfDocs * 2L)); + deleteIndex(index); + + deleteSnapshot(repository, secondSnapshot); + + } else if (CLUSTER_TYPE.equals(ClusterType.UPGRADED)) { + deleteRepository(repository); + } + } + + private static void assumeVersion(Version minSupportedVersion, Storage storageType) { + assumeTrue("Searchable snapshots with storage type [" + storageType + "] is supported since version [" + minSupportedVersion + ']', + UPGRADE_FROM_VERSION.onOrAfter(minSupportedVersion)); + } + + private static void indexDocs(String indexName, long numberOfDocs) throws IOException { + final StringBuilder builder = new StringBuilder(); + for (long i = 0L; i < numberOfDocs; i++) { + builder.append("{\"create\":{\"_index\":\"").append(indexName).append("\"}}\n"); + builder.append("{\"value\":").append(i).append("}\n"); + } + final Request bulk = new Request(HttpPost.METHOD_NAME, "/_bulk"); + bulk.addParameter("refresh", "true"); + bulk.setJsonEntity(builder.toString()); + final Response response = client().performRequest(bulk); + assertThat(response.getStatusLine().getStatusCode(), equalTo(RestStatus.OK.getStatus())); + assertFalse((Boolean) XContentMapValues.extractValue("errors", responseAsMap(response))); + } + + private static void createSnapshot(String repositoryName, String snapshotName, String indexName) throws IOException { + final Request request = new Request(HttpPut.METHOD_NAME, "/_snapshot/" + repositoryName + '/' + snapshotName); + request.addParameter("wait_for_completion", "true"); + request.setJsonEntity("{ \"indices\" : \"" + indexName + "\", \"include_global_state\": false}"); + final Response response = client().performRequest(request); + assertThat(response.getStatusLine().getStatusCode(), equalTo(RestStatus.OK.getStatus())); + } + + private static void waitForNodes(int numberOfNodes) throws IOException { + final Request request = new Request(HttpGet.METHOD_NAME, "/_cluster/health"); + request.addParameter("wait_for_nodes", String.valueOf(numberOfNodes)); + final Response response = client().performRequest(request); + assertThat(response.getStatusLine().getStatusCode(), equalTo(RestStatus.OK.getStatus())); + } + + @SuppressWarnings("unchecked") + private static Map nodesVersions() throws IOException { + final Response response = client().performRequest(new Request(HttpGet.METHOD_NAME, "_nodes/_all")); + assertThat(response.getStatusLine().getStatusCode(), equalTo(RestStatus.OK.getStatus())); + final Map nodes = (Map) extractValue(responseAsMap(response), "nodes"); + assertNotNull("Nodes info is null", nodes); + final Map nodesVersions = new HashMap<>(nodes.size()); + for (Map.Entry node : nodes.entrySet()) { + nodesVersions.put(node.getKey(), Version.fromString((String) extractValue((Map) node.getValue(), "version"))); + } + return nodesVersions; + } + + private static void deleteSnapshot(String repositoryName, String snapshotName) throws IOException { + final Request request = new Request(HttpDelete.METHOD_NAME, "/_snapshot/" + repositoryName + '/' + snapshotName); + final Response response = client().performRequest(request); + assertThat(response.getStatusLine().getStatusCode(), equalTo(RestStatus.OK.getStatus())); + } + + private static void mountSnapshot( + String repositoryName, + String snapshotName, + String indexName, + String renamedIndex, + Storage storage, + Settings indexSettings + ) throws IOException { + final Request request = new Request(HttpPost.METHOD_NAME, "/_snapshot/" + repositoryName + '/' + snapshotName + "/_mount"); + request.addParameter("storage", storage.storageName()); + request.addParameter("wait_for_completion", "true"); + request.setJsonEntity("{" + + " \"index\": \"" + indexName + "\"," + + " \"renamed_index\": \"" + renamedIndex + "\"," + + " \"index_settings\": " + Strings.toString(indexSettings) + + "}"); + final Response response = client().performRequest(request); + assertThat( + "Failed to mount snapshot [" + snapshotName + "] from repository [" + repositoryName + "]: " + response, + response.getStatusLine().getStatusCode(), + equalTo(RestStatus.OK.getStatus()) + ); + } + + private static void assertHitCount(String indexName, Matcher countMatcher) throws IOException { + final Response response = client().performRequest(new Request(HttpGet.METHOD_NAME, "/" + indexName + "/_count")); + assertThat(response.getStatusLine().getStatusCode(), equalTo(RestStatus.OK.getStatus())); + final Map responseAsMap = responseAsMap(response); + final Number responseCount = (Number) extractValue("count", responseAsMap); + assertThat(responseAsMap + "", responseCount, notNullValue()); + assertThat(((Number) extractValue("count", responseAsMap)).longValue(), countMatcher); + assertThat(((Number) extractValue("_shards.failed", responseAsMap)).intValue(), equalTo(0)); + } +} diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java index 085935923fb7b..0333d0b3f6930 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java @@ -89,6 +89,16 @@ protected boolean preserveDataStreamsUponCompletion() { return true; } + @Override + protected boolean preserveReposUponCompletion() { + return true; + } + + @Override + protected boolean preserveSnapshotsUponCompletion() { + return true; + } + public UpgradeClusterClientYamlTestSuiteIT(ClientYamlTestCandidate testCandidate) { super(testCandidate); } From 494359282d4007779cfd41ab32e3a9fb353a7329 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 2 Mar 2021 11:45:17 +0100 Subject: [PATCH 2/2] Reenable SearchableSnapshotsRollingUpgradeIT closes #69705 --- x-pack/qa/rolling-upgrade/build.gradle | 15 +-- .../SearchableSnapshotsRollingUpgradeIT.java | 100 ++++++++++-------- 2 files changed, 64 insertions(+), 51 deletions(-) diff --git a/x-pack/qa/rolling-upgrade/build.gradle b/x-pack/qa/rolling-upgrade/build.gradle index 376d0ff7f65a1..9a432b502ed43 100644 --- a/x-pack/qa/rolling-upgrade/build.gradle +++ b/x-pack/qa/rolling-upgrade/build.gradle @@ -33,7 +33,9 @@ tasks.register("copyTestNodeKeyMaterial", Copy) { for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { String baseName = "v${bwcVersion}" - String repositoryPath = "${buildDir}/cluster/shared/repo/${baseName}" + + // SearchableSnapshotsRollingUpgradeIT uses a specific repository to not interfere with other tests + String searchableSnapshotRepository = "${buildDir}/cluster/shared/searchable-snapshots-repo/${baseName}" testClusters { "${baseName}" { @@ -42,8 +44,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { numberOfNodes = 3 setting 'repositories.url.allowed_urls', 'http://snapshot.test*' - setting 'path.repo', repositoryPath - setting 'http.content_type.required', 'true' + setting 'path.repo', "[ \"${buildDir}/cluster/shared/repo/${baseName}\", \"${searchableSnapshotRepository}\" ]" setting 'xpack.license.self_generated.type', 'trial' setting 'xpack.security.enabled', 'true' setting 'xpack.security.transport.ssl.enabled', 'true' @@ -96,7 +97,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { dependsOn "copyTestNodeKeyMaterial" systemProperty 'tests.rest.suite', 'old_cluster' systemProperty 'tests.upgrade_from_version', oldVersion - systemProperty 'tests.path.repo', repositoryPath + systemProperty 'tests.path.searchable.snapshots.repo', searchableSnapshotRepository nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}") nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}") def toBlackList = [] @@ -131,7 +132,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { systemProperty 'tests.rest.suite', 'mixed_cluster' systemProperty 'tests.first_round', 'true' systemProperty 'tests.upgrade_from_version', oldVersion - systemProperty 'tests.path.repo', repositoryPath + systemProperty 'tests.path.searchable.snapshots.repo', searchableSnapshotRepository // We only need to run these tests once so we may as well do it when we're two thirds upgraded def toBlackList = [ 'mixed_cluster/10_basic/Start scroll in mixed cluster on upgraded node that we will continue after upgrade', @@ -204,7 +205,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { systemProperty 'tests.rest.blacklist', toBlackList.join(',') } systemProperty 'tests.upgrade_from_version', oldVersion - systemProperty 'tests.path.repo', repositoryPath + systemProperty 'tests.path.searchable.snapshots.repo', searchableSnapshotRepository } tasks.register("${baseName}#upgradedClusterTest", StandaloneRestIntegTestTask) { @@ -217,7 +218,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}") systemProperty 'tests.rest.suite', 'upgraded_cluster' systemProperty 'tests.upgrade_from_version', oldVersion - systemProperty 'tests.path.repo', repositoryPath + systemProperty 'tests.path.searchable.snapshots.repo', searchableSnapshotRepository def toBlackList = [] // Dataframe transforms were not added until 7.2.0 if (Version.fromString(oldVersion).before('7.2.0')) { diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/SearchableSnapshotsRollingUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/SearchableSnapshotsRollingUpgradeIT.java index bf5dfefc98319..9832c28ddb2ec 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/SearchableSnapshotsRollingUpgradeIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/SearchableSnapshotsRollingUpgradeIT.java @@ -54,18 +54,16 @@ public void testMountPartialCopyAndRecoversCorrectly() throws Exception { */ private void executeMountAndRecoversCorrectlyTestCase(Storage storage, long numberOfDocs) throws Exception { final String suffix = storage.storageName().toLowerCase(Locale.ROOT); + final String repository = "repository_" + suffix; + final String snapshot = "snapshot_" + suffix; final String index = "mounted_index_" + suffix; if (CLUSTER_TYPE.equals(ClusterType.OLD)) { - final String repository = "repository_" + suffix; - final String snapshot = "snapshot_" + suffix; - - registerRepository(repository, FsRepository.TYPE, true, - Settings.builder().put("location", System.getProperty("tests.path.repo") + '/' + repository).build()); + registerRepository(repository, FsRepository.TYPE, true, repositorySettings(repository)); final String originalIndex = "logs_" + suffix; createIndex(originalIndex, Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(3, 5)) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 3)) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .build()); indexDocs(originalIndex, numberOfDocs); @@ -79,6 +77,12 @@ private void executeMountAndRecoversCorrectlyTestCase(Storage storage, long numb ensureGreen(index); assertHitCount(index, equalTo(numberOfDocs)); + + if (CLUSTER_TYPE.equals(ClusterType.UPGRADED)) { + deleteIndex(index); + deleteSnapshot(repository, snapshot); + deleteRepository(repository); + } } public void testBlobStoreCacheWithFullCopyInMixedVersions() throws Exception { @@ -104,9 +108,29 @@ private void executeBlobCacheCreationTestCase(Storage storage, long numberOfDocs final String suffix = "blob_cache_" + storage.storageName().toLowerCase(Locale.ROOT); final String repository = "repository_" + suffix; + final int numberOfSnapshots = 2; + final String[] snapshots = new String[numberOfSnapshots]; + final String[] indices = new String[numberOfSnapshots]; + for (int i = 0; i < numberOfSnapshots; i++) { + snapshots[i] = "snapshot_" + i; + indices[i] = "index_" + i; + } + if (CLUSTER_TYPE.equals(ClusterType.OLD)) { - registerRepository(repository, FsRepository.TYPE, true, - Settings.builder().put("location", System.getProperty("tests.path.repo") + '/' + repository).build()); + registerRepository(repository, FsRepository.TYPE, true, repositorySettings(repository)); + + // snapshots must be created from indices on the lowest version, otherwise we won't be able + // to mount them again in the mixed version cluster (and we'll have IndexFormatTooNewException) + for (int i = 0; i < numberOfSnapshots; i++) { + createIndex(indices[i], Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 3)) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build()); + indexDocs(indices[i], numberOfDocs * (i + 1L)); + + createSnapshot(repository, snapshots[i], indices[i]); + deleteIndex(indices[i]); + } } else if (CLUSTER_TYPE.equals(ClusterType.MIXED)) { final int numberOfNodes = 3; @@ -131,21 +155,10 @@ private void executeBlobCacheCreationTestCase(Storage storage, long numberOfDocs // Then the snapshot is mounted again on a different node with a higher version in order to verify that the docs in the cache // index can be used. - final String firstIndex = "first_index_" + suffix; - createIndex(firstIndex, Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(3, 5)) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .build()); - indexDocs(firstIndex, numberOfDocs); - - final String firstSnapshot = "first_snapshot_" + suffix; - createSnapshot(repository, firstSnapshot, firstIndex); - deleteIndex(firstIndex); - - String index = "first_mount_" + suffix; + String index = "first_mount_" + indices[0]; logger.info("mounting snapshot as index [{}] with storage [{}] on node [{}] with min. version [{}]", index, storage, nodeIdWithMinVersion, minVersion); - mountSnapshot(repository, firstSnapshot, firstIndex, index, storage, + mountSnapshot(repository, snapshots[0], indices[0], index, storage, Settings.builder() // we want a specific node version to create docs in the blob cache index .put("index.routing.allocation.include._id", nodeIdWithMinVersion) @@ -156,10 +169,10 @@ private void executeBlobCacheCreationTestCase(Storage storage, long numberOfDocs assertHitCount(index, equalTo(numberOfDocs)); deleteIndex(index); - index = "second_mount_" + suffix; + index = "second_mount_" + indices[0]; logger.info("mounting the same snapshot of index [{}] with storage [{}], this time on node [{}] with higher version [{}]", index, storage, nodeIdWithMaxVersion, maxVersion); - mountSnapshot(repository, firstSnapshot, firstIndex, index, storage, + mountSnapshot(repository, snapshots[0], indices[0], index, storage, Settings.builder() // we want a specific node version to use the cached blobs created by the nodeIdWithMinVersion .put("index.routing.allocation.include._id", nodeIdWithMaxVersion) @@ -171,26 +184,13 @@ private void executeBlobCacheCreationTestCase(Storage storage, long numberOfDocs assertHitCount(index, equalTo(numberOfDocs)); deleteIndex(index); - deleteSnapshot(repository, firstSnapshot); - // Now the same thing but this time the docs in blob cache index are created from the upgraded version and mounted in a second // time on the node with the minimum version. - final String secondIndex = "second_index_" + suffix; - createIndex(secondIndex, Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(3, 5)) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .build()); - indexDocs(secondIndex, numberOfDocs * 2L); - - final String secondSnapshot = "second_snapshot_" + suffix; - createSnapshot(repository, secondSnapshot, secondIndex); - deleteIndex(secondIndex); - - index = "first_mount_" + suffix; + index = "first_mount_" + indices[1]; logger.info("mounting snapshot as index [{}] with storage [{}] on node [{}] with max. version [{}]", index, storage, nodeIdWithMaxVersion, maxVersion); - mountSnapshot(repository, secondSnapshot, secondIndex, index, storage, + mountSnapshot(repository, snapshots[1], indices[1], index, storage, Settings.builder() // we want a specific node version to create docs in the blob cache index .put("index.routing.allocation.include._id", nodeIdWithMaxVersion) @@ -201,10 +201,10 @@ private void executeBlobCacheCreationTestCase(Storage storage, long numberOfDocs assertHitCount(index, equalTo(numberOfDocs * 2L)); deleteIndex(index); - index = "second_mount_" + suffix; + index = "second_mount_" + indices[1]; logger.info("mounting the same snapshot of index [{}] with storage [{}], this time on node [{}] with lower version [{}]", index, storage, nodeIdWithMinVersion, minVersion); - mountSnapshot(repository, secondSnapshot, secondIndex, index, storage, + mountSnapshot(repository, snapshots[1], indices[1], index, storage, Settings.builder() // we want a specific node version to use the cached blobs created by the nodeIdWithMinVersion .put("index.routing.allocation.include._id", nodeIdWithMinVersion) @@ -216,9 +216,10 @@ private void executeBlobCacheCreationTestCase(Storage storage, long numberOfDocs assertHitCount(index, equalTo(numberOfDocs * 2L)); deleteIndex(index); - deleteSnapshot(repository, secondSnapshot); - } else if (CLUSTER_TYPE.equals(ClusterType.UPGRADED)) { + for (String snapshot : snapshots) { + deleteSnapshot(repository, snapshot); + } deleteRepository(repository); } } @@ -285,8 +286,11 @@ private static void mountSnapshot( Settings indexSettings ) throws IOException { final Request request = new Request(HttpPost.METHOD_NAME, "/_snapshot/" + repositoryName + '/' + snapshotName + "/_mount"); - request.addParameter("storage", storage.storageName()); - request.addParameter("wait_for_completion", "true"); + if (UPGRADE_FROM_VERSION.onOrAfter(Version.V_7_12_0)) { + request.addParameter("storage", storage.storageName()); + } else { + assertThat("Parameter 'storage' was introduced in 7.12.0 with " + Storage.SHARED_CACHE, storage, equalTo(Storage.FULL_COPY)); + } request.setJsonEntity("{" + " \"index\": \"" + indexName + "\"," + " \"renamed_index\": \"" + renamedIndex + "\"," + @@ -309,4 +313,12 @@ private static void assertHitCount(String indexName, Matcher countMatcher) assertThat(((Number) extractValue("count", responseAsMap)).longValue(), countMatcher); assertThat(((Number) extractValue("_shards.failed", responseAsMap)).intValue(), equalTo(0)); } + + private static Settings repositorySettings(String repository) { + final String pathRepo = System.getProperty("tests.path.searchable.snapshots.repo"); + assertThat("Searchable snapshots repository path is null", pathRepo, notNullValue()); + return Settings.builder() + .put("location", pathRepo + '/' + repository) + .build(); + } }