From 3aa7e469f9bc8e24a3bf2c27f094dc7a47bdbdd0 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 14 Sep 2021 11:11:01 +0200 Subject: [PATCH 1/8] Add maintenance service to clean up unused docs in snapshot blob cache --- .../plugin/searchable-snapshots/build.gradle | 1 + ...ableSnapshotsBlobStoreCacheIntegTests.java | 79 ++++-- .../SearchableSnapshots.java | 2 + .../BlobStoreCacheMaintenanceService.java | 229 ++++++++++++++++++ .../store/SearchableSnapshotDirectory.java | 5 + 5 files changed, 290 insertions(+), 26 deletions(-) create mode 100644 x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java diff --git a/x-pack/plugin/searchable-snapshots/build.gradle b/x-pack/plugin/searchable-snapshots/build.gradle index 4504535e4a121..17066a5a996aa 100644 --- a/x-pack/plugin/searchable-snapshots/build.gradle +++ b/x-pack/plugin/searchable-snapshots/build.gradle @@ -14,6 +14,7 @@ dependencies { compileOnly project(path: xpackModule('core')) implementation project(path: 'preallocate') internalClusterTestImplementation(testArtifact(project(xpackModule('core')))) + internalClusterTestImplementation(project(path: ':modules:reindex')) } addQaCheckDependencies() diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheIntegTests.java index 074a6f759385d..88e8c6f8a08dc 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheIntegTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.searchablesnapshots.cache.blob; +import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.index.IndexRequestBuilder; @@ -26,11 +27,14 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.reindex.ReindexPlugin; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexingStats; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.snapshots.SearchableSnapshotsSettings; @@ -46,6 +50,7 @@ import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsRequest; import org.elasticsearch.xpack.searchablesnapshots.cache.full.CacheService; import org.elasticsearch.xpack.searchablesnapshots.cache.shared.FrozenCacheService; +import org.elasticsearch.xpack.searchablesnapshots.store.SearchableSnapshotDirectory; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -59,6 +64,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_BLOB_CACHE_INDEX; import static org.elasticsearch.xpack.searchablesnapshots.cache.shared.SharedBytes.pageAligned; +import static org.elasticsearch.xpack.searchablesnapshots.store.SearchableSnapshotDirectory.unwrapDirectory; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -93,7 +99,10 @@ public static void tearDownCacheSettings() { @Override protected Collection> nodePlugins() { - return CollectionUtils.appendToCopy(super.nodePlugins(), WaitForSnapshotBlobCacheShardsActivePlugin.class); + final List> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(WaitForSnapshotBlobCacheShardsActivePlugin.class); + plugins.add(ReindexPlugin.class); + return plugins; } @Override @@ -158,7 +167,7 @@ public void testBlobStoreCache() throws Exception { storage1, blobCacheMaxLength.getStringRep() ); - final String restoredIndex = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + final String restoredIndex = "restored-" + randomAlphaOfLength(10).toLowerCase(Locale.ROOT); mountSnapshot( repositoryName, snapshot.getName(), @@ -173,17 +182,9 @@ public void testBlobStoreCache() throws Exception { ); ensureGreen(restoredIndex); - // wait for all async cache fills to complete - assertBusy(() -> { - for (final SearchableSnapshotShardStats shardStats : client().execute( - SearchableSnapshotsStatsAction.INSTANCE, - new SearchableSnapshotsStatsRequest() - ).actionGet().getStats()) { - for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) { - assertThat(Strings.toString(indexInputStats), indexInputStats.getCurrentIndexCacheFills(), equalTo(0L)); - } - } - }); + assertRecoveryStats(restoredIndex, false); + assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME); + waitForBlobCacheFillsToComplete(); for (final SearchableSnapshotShardStats shardStats : client().execute( SearchableSnapshotsStatsAction.INSTANCE, @@ -197,7 +198,6 @@ public void testBlobStoreCache() throws Exception { logger.info("--> verifying cached documents in system index [{}]", SNAPSHOT_BLOB_CACHE_INDEX); if (numberOfDocs > 0) { ensureYellow(SNAPSHOT_BLOB_CACHE_INDEX); - refreshSystemIndex(); logger.info("--> verifying system index [{}] data tiers preference", SNAPSHOT_BLOB_CACHE_INDEX); assertThat( @@ -210,11 +210,14 @@ public void testBlobStoreCache() throws Exception { ); } + refreshSystemIndex(); + final long numberOfCachedBlobs = systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX) .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) .get() .getHits() .getTotalHits().value; + IndexingStats indexingStats = systemClient().admin() .indices() .prepareStats(SNAPSHOT_BLOB_CACHE_INDEX) @@ -228,19 +231,25 @@ public void testBlobStoreCache() throws Exception { logger.info("--> verifying number of documents in index [{}]", restoredIndex); assertHitCount(client().prepareSearch(restoredIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); - assertAcked(client().admin().indices().prepareDelete(restoredIndex)); - assertBusy(() -> { - refreshSystemIndex(); - assertThat( - systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get().getHits().getTotalHits().value, - greaterThan(0L) - ); - }); + for (IndicesService indicesService : internalCluster().getDataNodeInstances(IndicesService.class)) { + for (IndexService indexService : indicesService) { + if (indexService.index().getName().equals(restoredIndex)) { + for (IndexShard indexShard : indexService) { + try { + final SearchableSnapshotDirectory directory = unwrapDirectory(indexShard.store().directory()); + directory.clearStats(); + } catch (AlreadyClosedException ace) { + // ok to ignore these + } + } + } + } + } final Storage storage2 = randomFrom(Storage.values()); logger.info("--> mount snapshot [{}] as an index for the second time [storage={}]", snapshot, storage2); - final String restoredAgainIndex = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + final String restoredAgainIndex = "restored-" + randomAlphaOfLength(10).toLowerCase(Locale.ROOT); mountSnapshot( repositoryName, snapshot.getName(), @@ -255,6 +264,10 @@ public void testBlobStoreCache() throws Exception { ); ensureGreen(restoredAgainIndex); + assertRecoveryStats(restoredAgainIndex, false); + assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME); + waitForBlobCacheFillsToComplete(); + logger.info("--> verifying shards of [{}] were started without using the blob store more than necessary", restoredAgainIndex); checkNoBlobStoreAccess(); @@ -292,6 +305,10 @@ public Settings onNodeStopped(String nodeName) throws Exception { }); ensureGreen(restoredAgainIndex); + assertRecoveryStats(restoredAgainIndex, false); + assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME); + waitForBlobCacheFillsToComplete(); + logger.info("--> shards of [{}] should start without downloading bytes from the blob store", restoredAgainIndex); checkNoBlobStoreAccess(); @@ -314,8 +331,18 @@ public Settings onNodeStopped(String nodeName) throws Exception { logger.info("--> verifying number of documents in index [{}]", restoredAgainIndex); assertHitCount(client().prepareSearch(restoredAgainIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); - // TODO also test when the index is frozen - // TODO also test when prewarming is enabled + logger.info("--> deleting indices, maintenance service should clean up snapshot blob cache index"); + assertAcked(client().admin().indices().prepareDelete("restored-*")); + assertBusy(() -> { + refreshSystemIndex(); + assertHitCount( + systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX) + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) + .setSize(0) + .get(), + 0L + ); + }); } private void checkNoBlobStoreAccess() { diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index 3dc13a5ca4f66..56e6940c26b29 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -100,6 +100,7 @@ import org.elasticsearch.xpack.searchablesnapshots.allocation.decider.SearchableSnapshotAllocationDecider; import org.elasticsearch.xpack.searchablesnapshots.allocation.decider.SearchableSnapshotEnableAllocationDecider; import org.elasticsearch.xpack.searchablesnapshots.allocation.decider.SearchableSnapshotRepositoryExistsAllocationDecider; +import org.elasticsearch.xpack.searchablesnapshots.cache.blob.BlobStoreCacheMaintenanceService; import org.elasticsearch.xpack.searchablesnapshots.cache.blob.BlobStoreCacheService; import org.elasticsearch.xpack.searchablesnapshots.cache.full.CacheService; import org.elasticsearch.xpack.searchablesnapshots.cache.full.PersistentCache; @@ -347,6 +348,7 @@ public Collection createComponents( threadPool::absoluteTimeInMillis ); this.blobStoreCacheService.set(blobStoreCacheService); + clusterService.addListener(new BlobStoreCacheMaintenanceService(clusterService, threadPool, client, SNAPSHOT_BLOB_CACHE_INDEX)); components.add(blobStoreCacheService); } else { PersistentCache.cleanUp(settings, nodeEnvironment); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java new file mode 100644 index 0000000000000..bf63bbcb03edb --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java @@ -0,0 +1,229 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.searchablesnapshots.cache.blob; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.DeleteByQueryAction; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.snapshots.SearchableSnapshotsSettings; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.function.Supplier; + +import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; +import static org.elasticsearch.xpack.core.ClientHelper.SEARCHABLE_SNAPSHOTS_ORIGIN; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING; + +/** + * A service that delete documents in the snapshot blob cache index when they are not required anymore. + * + * This service runs on the data node that contains the snapshot blob cache primary shard. It listens to cluster state updates to find + * searchable snapshot indices that are deleted and checks if the index snapshot is still used by other searchable snapshot indices. If the + * index snapshot is not used anymore then i triggers the deletion of corresponding cached blobs in the snapshot blob cache index using a + * delete-by-query. + */ +public class BlobStoreCacheMaintenanceService implements ClusterStateListener { + + private static final Logger logger = LogManager.getLogger(BlobStoreCacheMaintenanceService.class); + + private final ClusterService clusterService; + private final Client clientWithOrigin; + private final String systemIndexName; + private final ThreadPool threadPool; + + public BlobStoreCacheMaintenanceService(ClusterService clusterService, ThreadPool threadPool, Client client, String systemIndexName) { + this.clientWithOrigin = new OriginSettingClient(Objects.requireNonNull(client), SEARCHABLE_SNAPSHOTS_ORIGIN); + this.systemIndexName = Objects.requireNonNull(systemIndexName); + this.clusterService = Objects.requireNonNull(clusterService); + this.threadPool = Objects.requireNonNull(threadPool); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + final ClusterState state = event.state(); + if (state.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) { + return; // state not fully recovered + } + if (event.indicesDeleted() == null || event.indicesDeleted().isEmpty()) { + return; // no indices deleted in this cluster state update + } + final ShardRouting primary = systemIndexPrimaryShard(state); + if (primary == null || Objects.equals(state.nodes().getLocalNodeId(), primary.currentNodeId()) == false) { + return; // system index primary shard does not exist or is not assigned to this data node + } + + final Set tasks = new HashSet<>(); + + for (Index deletedIndex : event.indicesDeleted()) { + final IndexMetadata indexMetadata = event.previousState().metadata().index(deletedIndex); + if (indexMetadata != null) { + final Settings indexSetting = indexMetadata.getSettings(); + if (SearchableSnapshotsSettings.isSearchableSnapshotStore(indexSetting)) { + assert state.metadata().hasIndex(deletedIndex) == false; + + final SnapshotId snapshotId = new SnapshotId( + SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSetting), + SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSetting) + ); + final IndexId indexId = new IndexId( + SNAPSHOT_INDEX_NAME_SETTING.get(indexSetting), + SNAPSHOT_INDEX_ID_SETTING.get(indexSetting) + ); + + // we should do nothing if the current cluster state contains another + // searchable snapshot index that uses the same index snapshot + if (hasSearchableSnapshotWith(state, snapshotId, indexId) == false) { + tasks.add(new MaintenanceTask(snapshotId, indexId, indexMetadata.getNumberOfShards(), clusterService::state)); + } + } + } + } + tasks.forEach(maintenanceTask -> threadPool.generic().execute(maintenanceTask)); + } + + @Nullable + private ShardRouting systemIndexPrimaryShard(final ClusterState state) { + final IndexMetadata indexMetadata = state.metadata().index(systemIndexName); + if (indexMetadata != null) { + final IndexRoutingTable indexRoutingTable = state.routingTable().index(indexMetadata.getIndex()); + if (indexRoutingTable != null) { + return indexRoutingTable.shard(0).primaryShard(); + } + } + return null; + } + + private static boolean hasSearchableSnapshotWith(final ClusterState state, final SnapshotId snapshotId, final IndexId indexId) { + for (IndexMetadata indexMetadata : state.metadata()) { + final Settings indexSettings = indexMetadata.getSettings(); + if (SearchableSnapshotsSettings.isSearchableSnapshotStore(indexSettings)) { + final SnapshotId otherSnapshotId = new SnapshotId( + SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings), + SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings) + ); + if (Objects.equals(snapshotId, otherSnapshotId)) { + final IndexId otherIndexId = new IndexId( + SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings), + SNAPSHOT_INDEX_ID_SETTING.get(indexSettings) + ); + if (Objects.equals(indexId, otherIndexId)) { + return true; + } + } + } + } + return false; + } + + private class MaintenanceTask extends AbstractRunnable { + + private final SnapshotId snapshotId; + private final IndexId indexId; + private final int numberOfShards; + private final Supplier state; + + MaintenanceTask(SnapshotId snapshotId, IndexId indexId, int numberOfShards, Supplier clusterStateSupplier) { + this.snapshotId = Objects.requireNonNull(snapshotId); + this.indexId = Objects.requireNonNull(indexId); + this.numberOfShards = numberOfShards; + this.state = clusterStateSupplier; + } + + @Override + protected void doRun() { + if (hasSearchableSnapshotWith(state.get(), snapshotId, indexId)) { + logger.debug( + "snapshot blob cache maintenance task skipped, another index is using [snapshot:{}, index:{}]]", + snapshotId, + indexId + ); + return; + } + + final Set paths = new HashSet<>(numberOfShards); + for (int shard = 0; shard < numberOfShards; shard++) { + paths.add(String.join("/", snapshotId.getUUID(), indexId.getId(), String.valueOf(shard))); + } + + final DeleteByQueryRequest request = new DeleteByQueryRequest(systemIndexName); + request.setQuery(QueryBuilders.termsQuery("blob.path", paths)); + clientWithOrigin.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<>() { + @Override + public void onResponse(BulkByScrollResponse response) { + logger.debug( + "snapshot blob cache maintenance task deleted [{}] documents for [snapshot:{}, index:{}]]", + response.getDeleted(), + snapshotId, + indexId + ); + } + + @Override + public void onFailure(Exception e) { + logger.warn( + () -> new ParameterizedMessage( + "exception when executing snapshot blob cache maintenance task for [snapshot:{}, index:{}]]", + snapshotId, + indexId + ), + e + ); + } + }); + } + + @Override + public void onFailure(Exception e) { + logger.warn( + () -> new ParameterizedMessage("snapshot blob cache maintenance task [snapshot:{}, index:{}]] failed", snapshotId, indexId), + e + ); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + MaintenanceTask that = (MaintenanceTask) o; + return numberOfShards == that.numberOfShards + && Objects.equals(snapshotId, that.snapshotId) + && Objects.equals(indexId, that.indexId); + } + + @Override + public int hashCode() { + return Objects.hash(snapshotId, indexId, numberOfShards); + } + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectory.java index d3f2d0ff8da3c..f5b0cd13852e8 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectory.java @@ -283,6 +283,11 @@ IndexInputStats getStats(String fileName) { return stats.get(getNonNullFileExt(fileName)); } + // only used in tests + public void clearStats() { + stats.clear(); + } + private BlobStoreIndexShardSnapshot.FileInfo fileInfo(final String name) throws FileNotFoundException { return files().stream() .filter(fileInfo -> fileInfo.physicalName().equals(name)) From 1189dd3d8f98b0e118643e73b9e73b654a88c812 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 21 Sep 2021 20:47:52 +0200 Subject: [PATCH 2/8] feedback --- ...tsBlobStoreCacheMaintenanceIntegTests.java | 185 ++++++++++++++++++ .../SearchableSnapshots.java | 2 +- .../BlobStoreCacheMaintenanceService.java | 170 +++++++--------- 3 files changed, 258 insertions(+), 99 deletions(-) create mode 100644 x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java new file mode 100644 index 0000000000000..0e79082f06c5b --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java @@ -0,0 +1,185 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.searchablesnapshots.cache.blob; + +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.reindex.ReindexPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest.Storage; +import org.elasticsearch.xpack.searchablesnapshots.BaseFrozenSearchableSnapshotsIntegTestCase; +import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_BLOB_CACHE_INDEX; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; + +public class SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests extends BaseFrozenSearchableSnapshotsIntegTestCase { + + @Override + protected Collection> nodePlugins() { + final List> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(ReindexPlugin.class); + return plugins; + } + + /** + * Test that snapshot blob cache entries are deleted from the system index after the corresponding searchable snapshot index is deleted + */ + public void testMaintenance() throws Exception { + final String repositoryName = "repository"; + createRepository(repositoryName, FsRepository.TYPE); + + final int nbIndices = randomIntBetween(1, 3); + + logger.info("--> generating [{}] indices with cached entries in system index...", nbIndices); + final Map mountedIndices = new HashMap<>(); + + int i = 0; + long previousNumberOfCachedEntries = 0; + while (mountedIndices.size() < nbIndices) { + final String indexName = "index-" + i; + createIndex(indexName); + + final List indexRequestBuilders = new ArrayList<>(); + for (int n = 100; n > 0; n--) { + indexRequestBuilders.add( + client().prepareIndex(indexName) + .setSource( + XContentFactory.smileBuilder() + .startObject() + .field("text", randomRealisticUnicodeOfCodepointLength(10)) + .endObject() + ) + ); + } + indexRandom(true, indexRequestBuilders); + + createSnapshot(repositoryName, "snapshot-" + i, List.of(indexName)); + assertAcked(client().admin().indices().prepareDelete(indexName)); + + final String mountedIndex = "mounted-index-" + i; + mountSnapshot(repositoryName, "snapshot-" + i, "index-" + i, mountedIndex, Settings.EMPTY, randomFrom(Storage.values())); + + ensureGreen(mountedIndex); + assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME); + assertExecutorIsIdle(SearchableSnapshots.CACHE_PREWARMING_THREAD_POOL_NAME); + waitForBlobCacheFillsToComplete(); + + refreshSystemIndex(false); + + final long numberOfEntriesInCache = numberOfEntriesInCache(); + if (numberOfEntriesInCache > previousNumberOfCachedEntries) { + final long nbEntries = numberOfEntriesInCache - previousNumberOfCachedEntries; + logger.info("--> mounted index [{}] has [{}] entries in cache", mountedIndex, nbEntries); + mountedIndices.put(mountedIndex, nbEntries); + + } else { + logger.info("--> mounted index [{}] did not generate any entry in cache, skipping", mountedIndex); + assertAcked(client().admin().indices().prepareDelete(mountedIndex)); + } + + previousNumberOfCachedEntries = numberOfEntriesInCache; + i += 1; + } + + ensureYellow(SNAPSHOT_BLOB_CACHE_INDEX); + refreshSystemIndex(true); + + final GetSettingsResponse mountedIndicesSettings = client().admin().indices().prepareGetSettings("mounted-*").get(); + + final long numberOfEntriesInCache = numberOfEntriesInCache(); + logger.info("--> found [{}] entries in snapshot blob cache", numberOfEntriesInCache); + assertThat(numberOfEntriesInCache, equalTo(mountedIndices.values().stream().mapToLong(l -> l).sum())); + + final List indicesToDelete = randomSubsetOf(randomIntBetween(1, mountedIndices.size()), mountedIndices.keySet()); + assertAcked(client().admin().indices().prepareDelete(indicesToDelete.toArray(String[]::new))); + + final long expectedDeletedEntriesInCache = mountedIndices.entrySet() + .stream() + .filter(e -> indicesToDelete.contains(e.getKey())) + .mapToLong(Map.Entry::getValue) + .sum(); + logger.info("--> expect [{}] deletions in snapshot blob cache", expectedDeletedEntriesInCache); + + assertBusy(() -> { + refreshSystemIndex(true); + assertThat(numberOfEntriesInCache(), equalTo(numberOfEntriesInCache - expectedDeletedEntriesInCache)); + + for (String mountedIndex : mountedIndices.keySet()) { + final Settings indexSettings = mountedIndicesSettings.getIndexToSettings().get(mountedIndex); + assertHitCount( + systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX) + .setQuery( + BlobStoreCacheMaintenanceService.buildDeleteByQuery( + INDEX_NUMBER_OF_SHARDS_SETTING.get(indexSettings), + SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings), + SNAPSHOT_INDEX_ID_SETTING.get(indexSettings) + ) + ) + .setSize(0) + .get(), + indicesToDelete.contains(mountedIndex) ? 0L : mountedIndices.get(mountedIndex) + ); + } + }); + } + + /** + * @return a {@link Client} that can be used to query the blob store cache system index + */ + private Client systemClient() { + return new OriginSettingClient(client(), ClientHelper.SEARCHABLE_SNAPSHOTS_ORIGIN); + } + + private long numberOfEntriesInCache() { + return systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX) + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) + .setTrackTotalHits(true) + .setSize(0) + .get() + .getHits() + .getTotalHits().value; + } + + private void refreshSystemIndex(boolean failIfNotExist) { + try { + final RefreshResponse refreshResponse = systemClient().admin() + .indices() + .prepareRefresh(SNAPSHOT_BLOB_CACHE_INDEX) + .setIndicesOptions(failIfNotExist ? RefreshRequest.DEFAULT_INDICES_OPTIONS : IndicesOptions.LENIENT_EXPAND_OPEN) + .get(); + assertThat(refreshResponse.getSuccessfulShards(), failIfNotExist ? greaterThan(0) : greaterThanOrEqualTo(0)); + assertThat(refreshResponse.getFailedShards(), equalTo(0)); + } catch (IndexNotFoundException indexNotFoundException) { + throw new AssertionError("unexpected", indexNotFoundException); + } + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index a9e829403b6e4..427c4879bcb76 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -348,7 +348,7 @@ public Collection createComponents( threadPool::absoluteTimeInMillis ); this.blobStoreCacheService.set(blobStoreCacheService); - clusterService.addListener(new BlobStoreCacheMaintenanceService(clusterService, threadPool, client, SNAPSHOT_BLOB_CACHE_INDEX)); + clusterService.addListener(new BlobStoreCacheMaintenanceService(threadPool, client, SNAPSHOT_BLOB_CACHE_INDEX)); components.add(blobStoreCacheService); } else { PersistentCache.cleanUp(settings, nodeEnvironment); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java index bf63bbcb03edb..6da51d827a2b5 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java @@ -19,11 +19,11 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.core.Nullable; import org.elasticsearch.index.Index; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; @@ -36,7 +36,6 @@ import java.util.HashSet; import java.util.Objects; import java.util.Set; -import java.util.function.Supplier; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; import static org.elasticsearch.xpack.core.ClientHelper.SEARCHABLE_SNAPSHOTS_ORIGIN; @@ -57,15 +56,13 @@ public class BlobStoreCacheMaintenanceService implements ClusterStateListener { private static final Logger logger = LogManager.getLogger(BlobStoreCacheMaintenanceService.class); - private final ClusterService clusterService; private final Client clientWithOrigin; private final String systemIndexName; private final ThreadPool threadPool; - public BlobStoreCacheMaintenanceService(ClusterService clusterService, ThreadPool threadPool, Client client, String systemIndexName) { + public BlobStoreCacheMaintenanceService(ThreadPool threadPool, Client client, String systemIndexName) { this.clientWithOrigin = new OriginSettingClient(Objects.requireNonNull(client), SEARCHABLE_SNAPSHOTS_ORIGIN); this.systemIndexName = Objects.requireNonNull(systemIndexName); - this.clusterService = Objects.requireNonNull(clusterService); this.threadPool = Objects.requireNonNull(threadPool); } @@ -75,41 +72,13 @@ public void clusterChanged(ClusterChangedEvent event) { if (state.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) { return; // state not fully recovered } - if (event.indicesDeleted() == null || event.indicesDeleted().isEmpty()) { - return; // no indices deleted in this cluster state update - } final ShardRouting primary = systemIndexPrimaryShard(state); if (primary == null || Objects.equals(state.nodes().getLocalNodeId(), primary.currentNodeId()) == false) { return; // system index primary shard does not exist or is not assigned to this data node } - - final Set tasks = new HashSet<>(); - - for (Index deletedIndex : event.indicesDeleted()) { - final IndexMetadata indexMetadata = event.previousState().metadata().index(deletedIndex); - if (indexMetadata != null) { - final Settings indexSetting = indexMetadata.getSettings(); - if (SearchableSnapshotsSettings.isSearchableSnapshotStore(indexSetting)) { - assert state.metadata().hasIndex(deletedIndex) == false; - - final SnapshotId snapshotId = new SnapshotId( - SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSetting), - SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSetting) - ); - final IndexId indexId = new IndexId( - SNAPSHOT_INDEX_NAME_SETTING.get(indexSetting), - SNAPSHOT_INDEX_ID_SETTING.get(indexSetting) - ); - - // we should do nothing if the current cluster state contains another - // searchable snapshot index that uses the same index snapshot - if (hasSearchableSnapshotWith(state, snapshotId, indexId) == false) { - tasks.add(new MaintenanceTask(snapshotId, indexId, indexMetadata.getNumberOfShards(), clusterService::state)); - } - } - } + if (event.indicesDeleted().isEmpty() == false) { + threadPool.generic().execute(new MaintenanceTask(event)); } - tasks.forEach(maintenanceTask -> threadPool.generic().execute(maintenanceTask)); } @Nullable @@ -146,84 +115,89 @@ private static boolean hasSearchableSnapshotWith(final ClusterState state, final return false; } + static QueryBuilder buildDeleteByQuery(int numberOfShards, String snapshotUuid, String indexUuid) { + final Set paths = new HashSet<>(numberOfShards); + for (int shard = 0; shard < numberOfShards; shard++) { + paths.add(String.join("/", snapshotUuid, indexUuid, String.valueOf(shard))); + } + return QueryBuilders.termsQuery("blob.path", paths); + } + private class MaintenanceTask extends AbstractRunnable { - private final SnapshotId snapshotId; - private final IndexId indexId; - private final int numberOfShards; - private final Supplier state; + private final ClusterChangedEvent event; - MaintenanceTask(SnapshotId snapshotId, IndexId indexId, int numberOfShards, Supplier clusterStateSupplier) { - this.snapshotId = Objects.requireNonNull(snapshotId); - this.indexId = Objects.requireNonNull(indexId); - this.numberOfShards = numberOfShards; - this.state = clusterStateSupplier; + MaintenanceTask(ClusterChangedEvent event) { + assert event.indicesDeleted().isEmpty() == false; + this.event = Objects.requireNonNull(event); } @Override protected void doRun() { - if (hasSearchableSnapshotWith(state.get(), snapshotId, indexId)) { - logger.debug( - "snapshot blob cache maintenance task skipped, another index is using [snapshot:{}, index:{}]]", - snapshotId, - indexId - ); - return; - } - - final Set paths = new HashSet<>(numberOfShards); - for (int shard = 0; shard < numberOfShards; shard++) { - paths.add(String.join("/", snapshotId.getUUID(), indexId.getId(), String.valueOf(shard))); - } - - final DeleteByQueryRequest request = new DeleteByQueryRequest(systemIndexName); - request.setQuery(QueryBuilders.termsQuery("blob.path", paths)); - clientWithOrigin.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<>() { - @Override - public void onResponse(BulkByScrollResponse response) { - logger.debug( - "snapshot blob cache maintenance task deleted [{}] documents for [snapshot:{}, index:{}]]", - response.getDeleted(), - snapshotId, - indexId - ); - } - - @Override - public void onFailure(Exception e) { - logger.warn( - () -> new ParameterizedMessage( - "exception when executing snapshot blob cache maintenance task for [snapshot:{}, index:{}]]", - snapshotId, - indexId - ), - e - ); + final ClusterState state = event.state(); + for (Index deletedIndex : event.indicesDeleted()) { + final IndexMetadata indexMetadata = event.previousState().metadata().index(deletedIndex); + if (indexMetadata != null) { + final Settings indexSetting = indexMetadata.getSettings(); + if (SearchableSnapshotsSettings.isSearchableSnapshotStore(indexSetting)) { + assert state.metadata().hasIndex(deletedIndex) == false; + + final SnapshotId snapshotId = new SnapshotId( + SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSetting), + SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSetting) + ); + final IndexId indexId = new IndexId( + SNAPSHOT_INDEX_NAME_SETTING.get(indexSetting), + SNAPSHOT_INDEX_ID_SETTING.get(indexSetting) + ); + + // we should do nothing if the current cluster state contains another + // searchable snapshot index that uses the same index snapshot + if (hasSearchableSnapshotWith(state, snapshotId, indexId)) { + logger.debug( + "snapshot [{}] of index {} is in use, skipping maintenance of snapshot blob cache entries", + snapshotId, + indexId + ); + continue; + } + + final DeleteByQueryRequest request = new DeleteByQueryRequest(systemIndexName); + request.setQuery(buildDeleteByQuery(indexMetadata.getNumberOfShards(), snapshotId.getUUID(), indexId.getId())); + clientWithOrigin.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<>() { + @Override + public void onResponse(BulkByScrollResponse response) { + logger.debug( + "snapshot blob cache maintenance task deleted [{}] documents for snapshot [{}] of index {}", + response.getDeleted(), + snapshotId, + indexId + ); + } + + @Override + public void onFailure(Exception e) { + logger.debug( + () -> new ParameterizedMessage( + "exception when executing snapshot blob cache maintenance task for snapshot [{}] of index {}", + snapshotId, + indexId + ), + e + ); + } + }); + } } - }); + } } @Override public void onFailure(Exception e) { logger.warn( - () -> new ParameterizedMessage("snapshot blob cache maintenance task [snapshot:{}, index:{}]] failed", snapshotId, indexId), + () -> new ParameterizedMessage("snapshot blob cache maintenance task failed for cluster state update [{}]", event.source()), e ); } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - MaintenanceTask that = (MaintenanceTask) o; - return numberOfShards == that.numberOfShards - && Objects.equals(snapshotId, that.snapshotId) - && Objects.equals(indexId, that.indexId); - } - - @Override - public int hashCode() { - return Objects.hash(snapshotId, indexId, numberOfShards); - } } } From 95d4003e823753730a409014d86648ce98409e27 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 22 Sep 2021 10:53:26 +0200 Subject: [PATCH 3/8] improve test --- ...tsBlobStoreCacheMaintenanceIntegTests.java | 97 +++++++++++++++++-- 1 file changed, 91 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java index 0e79082f06c5b..e443334b6b849 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java @@ -9,7 +9,6 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; -import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; @@ -28,15 +27,19 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_BLOB_CACHE_INDEX; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -57,10 +60,11 @@ public void testMaintenance() throws Exception { final String repositoryName = "repository"; createRepository(repositoryName, FsRepository.TYPE); - final int nbIndices = randomIntBetween(1, 3); + final int nbIndices = randomIntBetween(3, 10); logger.info("--> generating [{}] indices with cached entries in system index...", nbIndices); final Map mountedIndices = new HashMap<>(); + final Map mountedIndicesSettings = new HashMap<>(); int i = 0; long previousNumberOfCachedEntries = 0; @@ -100,6 +104,7 @@ public void testMaintenance() throws Exception { final long nbEntries = numberOfEntriesInCache - previousNumberOfCachedEntries; logger.info("--> mounted index [{}] has [{}] entries in cache", mountedIndex, nbEntries); mountedIndices.put(mountedIndex, nbEntries); + mountedIndicesSettings.put(mountedIndex, getIndexSettings(mountedIndex)); } else { logger.info("--> mounted index [{}] did not generate any entry in cache, skipping", mountedIndex); @@ -113,8 +118,6 @@ public void testMaintenance() throws Exception { ensureYellow(SNAPSHOT_BLOB_CACHE_INDEX); refreshSystemIndex(true); - final GetSettingsResponse mountedIndicesSettings = client().admin().indices().prepareGetSettings("mounted-*").get(); - final long numberOfEntriesInCache = numberOfEntriesInCache(); logger.info("--> found [{}] entries in snapshot blob cache", numberOfEntriesInCache); assertThat(numberOfEntriesInCache, equalTo(mountedIndices.values().stream().mapToLong(l -> l).sum())); @@ -127,14 +130,14 @@ public void testMaintenance() throws Exception { .filter(e -> indicesToDelete.contains(e.getKey())) .mapToLong(Map.Entry::getValue) .sum(); - logger.info("--> expect [{}] deletions in snapshot blob cache", expectedDeletedEntriesInCache); + logger.info("--> deleting indices [{}] with [{}] entries in snapshot blob cache", indicesToDelete, expectedDeletedEntriesInCache); assertBusy(() -> { refreshSystemIndex(true); assertThat(numberOfEntriesInCache(), equalTo(numberOfEntriesInCache - expectedDeletedEntriesInCache)); for (String mountedIndex : mountedIndices.keySet()) { - final Settings indexSettings = mountedIndicesSettings.getIndexToSettings().get(mountedIndex); + final Settings indexSettings = mountedIndicesSettings.get(mountedIndex); assertHitCount( systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX) .setQuery( @@ -150,6 +153,84 @@ public void testMaintenance() throws Exception { ); } }); + + final Set remainingIndices = new HashSet<>(mountedIndices.keySet()); + indicesToDelete.forEach(remainingIndices::remove); + + if (remainingIndices.isEmpty() == false) { + final List moreIndicesToDelete = randomSubsetOf(randomIntBetween(1, remainingIndices.size()), remainingIndices); + + final String randomMountedIndex = randomFrom(moreIndicesToDelete); + final Settings randomIndexSettings = getIndexSettings(randomMountedIndex); + final String snapshotId = SNAPSHOT_SNAPSHOT_ID_SETTING.get(randomIndexSettings); + final String snapshotName = SNAPSHOT_SNAPSHOT_NAME_SETTING.get(randomIndexSettings); + final String snapshotIndexName = SNAPSHOT_INDEX_NAME_SETTING.get(randomIndexSettings); + + final String remainingMountedIndex = "mounted-remaining-index"; + mountSnapshot( + repositoryName, + snapshotName, + snapshotIndexName, + remainingMountedIndex, + Settings.EMPTY, + randomFrom(Storage.values()) + ); + + ensureGreen(remainingMountedIndex); + mountedIndicesSettings.put(remainingMountedIndex, getIndexSettings(remainingMountedIndex)); + + assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME); + assertExecutorIsIdle(SearchableSnapshots.CACHE_PREWARMING_THREAD_POOL_NAME); + waitForBlobCacheFillsToComplete(); + + logger.info( + "--> deleting more mounted indices [{}] with snapshot [{}/{}] of index [{}] is still mounted as index [{}]", + moreIndicesToDelete, + snapshotId, + snapshotIndexName, + snapshotIndexName, + remainingMountedIndex + ); + assertAcked(client().admin().indices().prepareDelete(moreIndicesToDelete.toArray(String[]::new))); + + assertBusy(() -> { + refreshSystemIndex(true); + + for (String mountedIndex : mountedIndices.keySet()) { + final Settings indexSettings = mountedIndicesSettings.get(mountedIndex); + + final long remainingEntriesInCache = systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX) + .setQuery( + BlobStoreCacheMaintenanceService.buildDeleteByQuery( + INDEX_NUMBER_OF_SHARDS_SETTING.get(indexSettings), + SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings), + SNAPSHOT_INDEX_ID_SETTING.get(indexSettings) + ) + ) + .setSize(0) + .get() + .getHits() + .getTotalHits().value; + + if (indicesToDelete.contains(mountedIndex)) { + assertThat(remainingEntriesInCache, equalTo(0L)); + } else if (snapshotId.equals(SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings))) { + assertThat(remainingEntriesInCache, greaterThanOrEqualTo(mountedIndices.get(randomMountedIndex))); + } else if (moreIndicesToDelete.contains(mountedIndex)) { + assertThat(remainingEntriesInCache, equalTo(0L)); + } else { + assertThat(remainingEntriesInCache, equalTo(mountedIndices.get(mountedIndex))); + } + } + }); + } + + logger.info("--> deleting indices, maintenance service should clean up snapshot blob cache index"); + assertAcked(client().admin().indices().prepareDelete("mounted-*")); + assertBusy(() -> { + refreshSystemIndex(true); + assertHitCount(systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get(), 0L); + }); } /** @@ -182,4 +263,8 @@ private void refreshSystemIndex(boolean failIfNotExist) { throw new AssertionError("unexpected", indexNotFoundException); } } + + private Settings getIndexSettings(String indexName) { + return client().admin().indices().prepareGetSettings(indexName).get().getIndexToSettings().get(indexName); + } } From 618499f3b1fb0f7a986753fdd14ea4928cd6a93c Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 23 Sep 2021 13:46:17 +0200 Subject: [PATCH 4/8] feedback --- ...ableSnapshotsBlobStoreCacheIntegTests.java | 2 +- ...tsBlobStoreCacheMaintenanceIntegTests.java | 2 +- .../BlobStoreCacheMaintenanceService.java | 43 +++++++++++++++++-- 3 files changed, 41 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheIntegTests.java index ce21a388edd24..8081a8ac1d325 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheIntegTests.java @@ -31,12 +31,12 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.reindex.ReindexPlugin; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexingStats; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.reindex.ReindexPlugin; import org.elasticsearch.snapshots.SearchableSnapshotsSettings; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.test.InternalTestCluster; diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java index e443334b6b849..d12130e5a6d68 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java @@ -16,8 +16,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.IndexNotFoundException; -import org.elasticsearch.index.reindex.ReindexPlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.reindex.ReindexPlugin; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest.Storage; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java index 6da51d827a2b5..58815bea7d0b1 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -34,7 +35,9 @@ import org.elasticsearch.threadpool.ThreadPool; import java.util.HashSet; +import java.util.LinkedList; import java.util.Objects; +import java.util.Queue; import java.util.Set; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; @@ -134,9 +137,12 @@ private class MaintenanceTask extends AbstractRunnable { @Override protected void doRun() { + final Queue>> queue = new LinkedList<>(); final ClusterState state = event.state(); + for (Index deletedIndex : event.indicesDeleted()) { final IndexMetadata indexMetadata = event.previousState().metadata().index(deletedIndex); + assert indexMetadata != null : "no previous metadata found for " + deletedIndex; if (indexMetadata != null) { final Settings indexSetting = indexMetadata.getSettings(); if (SearchableSnapshotsSettings.isSearchableSnapshotStore(indexSetting)) { @@ -164,12 +170,15 @@ protected void doRun() { final DeleteByQueryRequest request = new DeleteByQueryRequest(systemIndexName); request.setQuery(buildDeleteByQuery(indexMetadata.getNumberOfShards(), snapshotId.getUUID(), indexId.getId())); - clientWithOrigin.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<>() { + request.setRefresh(queue.isEmpty()); + + queue.add(Tuple.tuple(request, new ActionListener<>() { @Override public void onResponse(BulkByScrollResponse response) { logger.debug( - "snapshot blob cache maintenance task deleted [{}] documents for snapshot [{}] of index {}", + "blob cache maintenance task deleted [{}] entries after deletion of {} (snapshot:{}, index:{})", response.getDeleted(), + deletedIndex, snapshotId, indexId ); @@ -179,17 +188,43 @@ public void onResponse(BulkByScrollResponse response) { public void onFailure(Exception e) { logger.debug( () -> new ParameterizedMessage( - "exception when executing snapshot blob cache maintenance task for snapshot [{}] of index {}", + "exception when executing blob cache maintenance task after deletion of {} (snapshot:{}, index:{})", + deletedIndex, snapshotId, indexId ), e ); } - }); + })); } } } + + if (queue.isEmpty() == false) { + executeNextCleanUp(queue); + } + } + + void executeNextCleanUp(final Queue>> queue) { + assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC); + final Tuple> next = queue.poll(); + if (next != null) { + cleanUp(next.v1(), next.v2(), queue); + } + } + + void cleanUp( + final DeleteByQueryRequest request, + final ActionListener listener, + final Queue>> queue + ) { + assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC); + clientWithOrigin.execute(DeleteByQueryAction.INSTANCE, request, ActionListener.runAfter(listener, () -> { + if (queue.isEmpty() == false) { + threadPool.generic().execute(() -> executeNextCleanUp(queue)); + } + })); } @Override From dc218de95e4b297de78aee6053b327fca2c9f472 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 23 Sep 2021 14:36:32 +0200 Subject: [PATCH 5/8] remove var + ignore --- .../blob/SearchableSnapshotsBlobStoreCacheIntegTests.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheIntegTests.java index 8081a8ac1d325..53e05525530d2 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheIntegTests.java @@ -50,7 +50,6 @@ import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsRequest; import org.elasticsearch.xpack.searchablesnapshots.cache.full.CacheService; import org.elasticsearch.xpack.searchablesnapshots.cache.shared.FrozenCacheService; -import org.elasticsearch.xpack.searchablesnapshots.store.SearchableSnapshotDirectory; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -237,9 +236,8 @@ public void testBlobStoreCache() throws Exception { if (indexService.index().getName().equals(restoredIndex)) { for (IndexShard indexShard : indexService) { try { - final SearchableSnapshotDirectory directory = unwrapDirectory(indexShard.store().directory()); - directory.clearStats(); - } catch (AlreadyClosedException ace) { + unwrapDirectory(indexShard.store().directory()).clearStats(); + } catch (AlreadyClosedException ignore) { // ok to ignore these } } From e0343a5a67a7e5101608e61c0955c540d05922ec Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 23 Sep 2021 14:37:56 +0200 Subject: [PATCH 6/8] streams --- ...hableSnapshotsBlobStoreCacheMaintenanceIntegTests.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java index d12130e5a6d68..a3c60276c6ae6 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java @@ -27,10 +27,11 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Collectors; import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -154,8 +155,9 @@ public void testMaintenance() throws Exception { } }); - final Set remainingIndices = new HashSet<>(mountedIndices.keySet()); - indicesToDelete.forEach(remainingIndices::remove); + final Set remainingIndices = mountedIndices.keySet().stream() + .filter(Predicate.not(indicesToDelete::contains)) + .collect(Collectors.toSet()); if (remainingIndices.isEmpty() == false) { final List moreIndicesToDelete = randomSubsetOf(randomIntBetween(1, remainingIndices.size()), remainingIndices); From d8164199dde5c00be5670b1b390ce511f0fdf666 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 23 Sep 2021 14:45:05 +0200 Subject: [PATCH 7/8] more streams --- .../cache/blob/BlobStoreCacheMaintenanceService.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java index 58815bea7d0b1..d19a6e42ef5ce 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java @@ -34,11 +34,12 @@ import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.threadpool.ThreadPool; -import java.util.HashSet; import java.util.LinkedList; import java.util.Objects; import java.util.Queue; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; import static org.elasticsearch.xpack.core.ClientHelper.SEARCHABLE_SNAPSHOTS_ORIGIN; @@ -119,10 +120,10 @@ private static boolean hasSearchableSnapshotWith(final ClusterState state, final } static QueryBuilder buildDeleteByQuery(int numberOfShards, String snapshotUuid, String indexUuid) { - final Set paths = new HashSet<>(numberOfShards); - for (int shard = 0; shard < numberOfShards; shard++) { - paths.add(String.join("/", snapshotUuid, indexUuid, String.valueOf(shard))); - } + final Set paths = IntStream.range(0, numberOfShards) + .mapToObj(shard -> String.join("/", snapshotUuid, indexUuid, String.valueOf(shard))) + .collect(Collectors.toSet()); + assert paths.isEmpty() == false; return QueryBuilders.termsQuery("blob.path", paths); } From b94ecd9ee25b7cc0cc7d4e5ab15ea2dae07cf9eb Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 23 Sep 2021 14:53:11 +0200 Subject: [PATCH 8/8] spotless --- ...SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java index a3c60276c6ae6..17a5245d4e266 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java @@ -155,7 +155,8 @@ public void testMaintenance() throws Exception { } }); - final Set remainingIndices = mountedIndices.keySet().stream() + final Set remainingIndices = mountedIndices.keySet() + .stream() .filter(Predicate.not(indicesToDelete::contains)) .collect(Collectors.toSet());