diff --git a/x-pack/plugin/searchable-snapshots/build.gradle b/x-pack/plugin/searchable-snapshots/build.gradle index 4504535e4a121..17066a5a996aa 100644 --- a/x-pack/plugin/searchable-snapshots/build.gradle +++ b/x-pack/plugin/searchable-snapshots/build.gradle @@ -14,6 +14,7 @@ dependencies { compileOnly project(path: xpackModule('core')) implementation project(path: 'preallocate') internalClusterTestImplementation(testArtifact(project(xpackModule('core')))) + internalClusterTestImplementation(project(path: ':modules:reindex')) } addQaCheckDependencies() diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheIntegTests.java index d5652c8ef1a42..53e05525530d2 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheIntegTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.searchablesnapshots.cache.blob; +import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.index.IndexRequestBuilder; @@ -26,13 +27,16 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexingStats; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.reindex.ReindexPlugin; import org.elasticsearch.snapshots.SearchableSnapshotsSettings; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.test.InternalTestCluster; @@ -59,6 +63,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_BLOB_CACHE_INDEX; import static org.elasticsearch.xpack.searchablesnapshots.cache.shared.SharedBytes.pageAligned; +import static org.elasticsearch.xpack.searchablesnapshots.store.SearchableSnapshotDirectory.unwrapDirectory; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -93,7 +98,10 @@ public static void tearDownCacheSettings() { @Override protected Collection> nodePlugins() { - return CollectionUtils.appendToCopy(super.nodePlugins(), WaitForSnapshotBlobCacheShardsActivePlugin.class); + final List> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(WaitForSnapshotBlobCacheShardsActivePlugin.class); + plugins.add(ReindexPlugin.class); + return plugins; } @Override @@ -159,7 +167,7 @@ public void testBlobStoreCache() throws Exception { storage1, blobCacheMaxLength.getStringRep() ); - final String restoredIndex = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + final String restoredIndex = "restored-" + randomAlphaOfLength(10).toLowerCase(Locale.ROOT); mountSnapshot( repositoryName, snapshot.getName(), @@ -174,17 +182,9 @@ public void testBlobStoreCache() throws Exception { ); ensureGreen(restoredIndex); - // wait for all async cache fills to complete - assertBusy(() -> { - for (final SearchableSnapshotShardStats shardStats : client().execute( - SearchableSnapshotsStatsAction.INSTANCE, - new SearchableSnapshotsStatsRequest() - ).actionGet().getStats()) { - for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) { - assertThat(Strings.toString(indexInputStats), indexInputStats.getCurrentIndexCacheFills(), equalTo(0L)); - } - } - }); + assertRecoveryStats(restoredIndex, false); + assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME); + waitForBlobCacheFillsToComplete(); for (final SearchableSnapshotShardStats shardStats : client().execute( SearchableSnapshotsStatsAction.INSTANCE, @@ -209,11 +209,14 @@ public void testBlobStoreCache() throws Exception { equalTo("data_content,data_hot") ); + refreshSystemIndex(); + final long numberOfCachedBlobs = systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX) .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) .get() .getHits() .getTotalHits().value; + IndexingStats indexingStats = systemClient().admin() .indices() .prepareStats(SNAPSHOT_BLOB_CACHE_INDEX) @@ -227,19 +230,24 @@ public void testBlobStoreCache() throws Exception { logger.info("--> verifying number of documents in index [{}]", restoredIndex); assertHitCount(client().prepareSearch(restoredIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); - assertAcked(client().admin().indices().prepareDelete(restoredIndex)); - assertBusy(() -> { - refreshSystemIndex(); - assertThat( - systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get().getHits().getTotalHits().value, - greaterThan(0L) - ); - }); + for (IndicesService indicesService : internalCluster().getDataNodeInstances(IndicesService.class)) { + for (IndexService indexService : indicesService) { + if (indexService.index().getName().equals(restoredIndex)) { + for (IndexShard indexShard : indexService) { + try { + unwrapDirectory(indexShard.store().directory()).clearStats(); + } catch (AlreadyClosedException ignore) { + // ok to ignore these + } + } + } + } + } final Storage storage2 = randomFrom(Storage.values()); logger.info("--> mount snapshot [{}] as an index for the second time [storage={}]", snapshot, storage2); - final String restoredAgainIndex = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + final String restoredAgainIndex = "restored-" + randomAlphaOfLength(10).toLowerCase(Locale.ROOT); mountSnapshot( repositoryName, snapshot.getName(), @@ -254,6 +262,10 @@ public void testBlobStoreCache() throws Exception { ); ensureGreen(restoredAgainIndex); + assertRecoveryStats(restoredAgainIndex, false); + assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME); + waitForBlobCacheFillsToComplete(); + logger.info("--> verifying shards of [{}] were started without using the blob store more than necessary", restoredAgainIndex); checkNoBlobStoreAccess(); @@ -289,6 +301,10 @@ public Settings onNodeStopped(String nodeName) throws Exception { }); ensureGreen(restoredAgainIndex); + assertRecoveryStats(restoredAgainIndex, false); + assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME); + waitForBlobCacheFillsToComplete(); + logger.info("--> shards of [{}] should start without downloading bytes from the blob store", restoredAgainIndex); checkNoBlobStoreAccess(); @@ -311,8 +327,18 @@ public Settings onNodeStopped(String nodeName) throws Exception { logger.info("--> verifying number of documents in index [{}]", restoredAgainIndex); assertHitCount(client().prepareSearch(restoredAgainIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs); - // TODO also test when the index is frozen - // TODO also test when prewarming is enabled + logger.info("--> deleting indices, maintenance service should clean up snapshot blob cache index"); + assertAcked(client().admin().indices().prepareDelete("restored-*")); + assertBusy(() -> { + refreshSystemIndex(); + assertHitCount( + systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX) + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) + .setSize(0) + .get(), + 0L + ); + }); } private void checkNoBlobStoreAccess() { diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java new file mode 100644 index 0000000000000..17a5245d4e266 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java @@ -0,0 +1,273 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.searchablesnapshots.cache.blob; + +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.reindex.ReindexPlugin; +import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest.Storage; +import org.elasticsearch.xpack.searchablesnapshots.BaseFrozenSearchableSnapshotsIntegTestCase; +import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_BLOB_CACHE_INDEX; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; + +public class SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests extends BaseFrozenSearchableSnapshotsIntegTestCase { + + @Override + protected Collection> nodePlugins() { + final List> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(ReindexPlugin.class); + return plugins; + } + + /** + * Test that snapshot blob cache entries are deleted from the system index after the corresponding searchable snapshot index is deleted + */ + public void testMaintenance() throws Exception { + final String repositoryName = "repository"; + createRepository(repositoryName, FsRepository.TYPE); + + final int nbIndices = randomIntBetween(3, 10); + + logger.info("--> generating [{}] indices with cached entries in system index...", nbIndices); + final Map mountedIndices = new HashMap<>(); + final Map mountedIndicesSettings = new HashMap<>(); + + int i = 0; + long previousNumberOfCachedEntries = 0; + while (mountedIndices.size() < nbIndices) { + final String indexName = "index-" + i; + createIndex(indexName); + + final List indexRequestBuilders = new ArrayList<>(); + for (int n = 100; n > 0; n--) { + indexRequestBuilders.add( + client().prepareIndex(indexName) + .setSource( + XContentFactory.smileBuilder() + .startObject() + .field("text", randomRealisticUnicodeOfCodepointLength(10)) + .endObject() + ) + ); + } + indexRandom(true, indexRequestBuilders); + + createSnapshot(repositoryName, "snapshot-" + i, List.of(indexName)); + assertAcked(client().admin().indices().prepareDelete(indexName)); + + final String mountedIndex = "mounted-index-" + i; + mountSnapshot(repositoryName, "snapshot-" + i, "index-" + i, mountedIndex, Settings.EMPTY, randomFrom(Storage.values())); + + ensureGreen(mountedIndex); + assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME); + assertExecutorIsIdle(SearchableSnapshots.CACHE_PREWARMING_THREAD_POOL_NAME); + waitForBlobCacheFillsToComplete(); + + refreshSystemIndex(false); + + final long numberOfEntriesInCache = numberOfEntriesInCache(); + if (numberOfEntriesInCache > previousNumberOfCachedEntries) { + final long nbEntries = numberOfEntriesInCache - previousNumberOfCachedEntries; + logger.info("--> mounted index [{}] has [{}] entries in cache", mountedIndex, nbEntries); + mountedIndices.put(mountedIndex, nbEntries); + mountedIndicesSettings.put(mountedIndex, getIndexSettings(mountedIndex)); + + } else { + logger.info("--> mounted index [{}] did not generate any entry in cache, skipping", mountedIndex); + assertAcked(client().admin().indices().prepareDelete(mountedIndex)); + } + + previousNumberOfCachedEntries = numberOfEntriesInCache; + i += 1; + } + + ensureYellow(SNAPSHOT_BLOB_CACHE_INDEX); + refreshSystemIndex(true); + + final long numberOfEntriesInCache = numberOfEntriesInCache(); + logger.info("--> found [{}] entries in snapshot blob cache", numberOfEntriesInCache); + assertThat(numberOfEntriesInCache, equalTo(mountedIndices.values().stream().mapToLong(l -> l).sum())); + + final List indicesToDelete = randomSubsetOf(randomIntBetween(1, mountedIndices.size()), mountedIndices.keySet()); + assertAcked(client().admin().indices().prepareDelete(indicesToDelete.toArray(String[]::new))); + + final long expectedDeletedEntriesInCache = mountedIndices.entrySet() + .stream() + .filter(e -> indicesToDelete.contains(e.getKey())) + .mapToLong(Map.Entry::getValue) + .sum(); + logger.info("--> deleting indices [{}] with [{}] entries in snapshot blob cache", indicesToDelete, expectedDeletedEntriesInCache); + + assertBusy(() -> { + refreshSystemIndex(true); + assertThat(numberOfEntriesInCache(), equalTo(numberOfEntriesInCache - expectedDeletedEntriesInCache)); + + for (String mountedIndex : mountedIndices.keySet()) { + final Settings indexSettings = mountedIndicesSettings.get(mountedIndex); + assertHitCount( + systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX) + .setQuery( + BlobStoreCacheMaintenanceService.buildDeleteByQuery( + INDEX_NUMBER_OF_SHARDS_SETTING.get(indexSettings), + SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings), + SNAPSHOT_INDEX_ID_SETTING.get(indexSettings) + ) + ) + .setSize(0) + .get(), + indicesToDelete.contains(mountedIndex) ? 0L : mountedIndices.get(mountedIndex) + ); + } + }); + + final Set remainingIndices = mountedIndices.keySet() + .stream() + .filter(Predicate.not(indicesToDelete::contains)) + .collect(Collectors.toSet()); + + if (remainingIndices.isEmpty() == false) { + final List moreIndicesToDelete = randomSubsetOf(randomIntBetween(1, remainingIndices.size()), remainingIndices); + + final String randomMountedIndex = randomFrom(moreIndicesToDelete); + final Settings randomIndexSettings = getIndexSettings(randomMountedIndex); + final String snapshotId = SNAPSHOT_SNAPSHOT_ID_SETTING.get(randomIndexSettings); + final String snapshotName = SNAPSHOT_SNAPSHOT_NAME_SETTING.get(randomIndexSettings); + final String snapshotIndexName = SNAPSHOT_INDEX_NAME_SETTING.get(randomIndexSettings); + + final String remainingMountedIndex = "mounted-remaining-index"; + mountSnapshot( + repositoryName, + snapshotName, + snapshotIndexName, + remainingMountedIndex, + Settings.EMPTY, + randomFrom(Storage.values()) + ); + + ensureGreen(remainingMountedIndex); + mountedIndicesSettings.put(remainingMountedIndex, getIndexSettings(remainingMountedIndex)); + + assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME); + assertExecutorIsIdle(SearchableSnapshots.CACHE_PREWARMING_THREAD_POOL_NAME); + waitForBlobCacheFillsToComplete(); + + logger.info( + "--> deleting more mounted indices [{}] with snapshot [{}/{}] of index [{}] is still mounted as index [{}]", + moreIndicesToDelete, + snapshotId, + snapshotIndexName, + snapshotIndexName, + remainingMountedIndex + ); + assertAcked(client().admin().indices().prepareDelete(moreIndicesToDelete.toArray(String[]::new))); + + assertBusy(() -> { + refreshSystemIndex(true); + + for (String mountedIndex : mountedIndices.keySet()) { + final Settings indexSettings = mountedIndicesSettings.get(mountedIndex); + + final long remainingEntriesInCache = systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX) + .setQuery( + BlobStoreCacheMaintenanceService.buildDeleteByQuery( + INDEX_NUMBER_OF_SHARDS_SETTING.get(indexSettings), + SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings), + SNAPSHOT_INDEX_ID_SETTING.get(indexSettings) + ) + ) + .setSize(0) + .get() + .getHits() + .getTotalHits().value; + + if (indicesToDelete.contains(mountedIndex)) { + assertThat(remainingEntriesInCache, equalTo(0L)); + } else if (snapshotId.equals(SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings))) { + assertThat(remainingEntriesInCache, greaterThanOrEqualTo(mountedIndices.get(randomMountedIndex))); + } else if (moreIndicesToDelete.contains(mountedIndex)) { + assertThat(remainingEntriesInCache, equalTo(0L)); + } else { + assertThat(remainingEntriesInCache, equalTo(mountedIndices.get(mountedIndex))); + } + } + }); + } + + logger.info("--> deleting indices, maintenance service should clean up snapshot blob cache index"); + assertAcked(client().admin().indices().prepareDelete("mounted-*")); + assertBusy(() -> { + refreshSystemIndex(true); + assertHitCount(systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get(), 0L); + }); + } + + /** + * @return a {@link Client} that can be used to query the blob store cache system index + */ + private Client systemClient() { + return new OriginSettingClient(client(), ClientHelper.SEARCHABLE_SNAPSHOTS_ORIGIN); + } + + private long numberOfEntriesInCache() { + return systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX) + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) + .setTrackTotalHits(true) + .setSize(0) + .get() + .getHits() + .getTotalHits().value; + } + + private void refreshSystemIndex(boolean failIfNotExist) { + try { + final RefreshResponse refreshResponse = systemClient().admin() + .indices() + .prepareRefresh(SNAPSHOT_BLOB_CACHE_INDEX) + .setIndicesOptions(failIfNotExist ? RefreshRequest.DEFAULT_INDICES_OPTIONS : IndicesOptions.LENIENT_EXPAND_OPEN) + .get(); + assertThat(refreshResponse.getSuccessfulShards(), failIfNotExist ? greaterThan(0) : greaterThanOrEqualTo(0)); + assertThat(refreshResponse.getFailedShards(), equalTo(0)); + } catch (IndexNotFoundException indexNotFoundException) { + throw new AssertionError("unexpected", indexNotFoundException); + } + } + + private Settings getIndexSettings(String indexName) { + return client().admin().indices().prepareGetSettings(indexName).get().getIndexToSettings().get(indexName); + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index d6370e6b9946d..427c4879bcb76 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -100,6 +100,7 @@ import org.elasticsearch.xpack.searchablesnapshots.allocation.decider.SearchableSnapshotAllocationDecider; import org.elasticsearch.xpack.searchablesnapshots.allocation.decider.SearchableSnapshotEnableAllocationDecider; import org.elasticsearch.xpack.searchablesnapshots.allocation.decider.SearchableSnapshotRepositoryExistsAllocationDecider; +import org.elasticsearch.xpack.searchablesnapshots.cache.blob.BlobStoreCacheMaintenanceService; import org.elasticsearch.xpack.searchablesnapshots.cache.blob.BlobStoreCacheService; import org.elasticsearch.xpack.searchablesnapshots.cache.full.CacheService; import org.elasticsearch.xpack.searchablesnapshots.cache.full.PersistentCache; @@ -347,6 +348,7 @@ public Collection createComponents( threadPool::absoluteTimeInMillis ); this.blobStoreCacheService.set(blobStoreCacheService); + clusterService.addListener(new BlobStoreCacheMaintenanceService(threadPool, client, SNAPSHOT_BLOB_CACHE_INDEX)); components.add(blobStoreCacheService); } else { PersistentCache.cleanUp(settings, nodeEnvironment); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java new file mode 100644 index 0000000000000..d19a6e42ef5ce --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java @@ -0,0 +1,239 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.searchablesnapshots.cache.blob; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.DeleteByQueryAction; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.snapshots.SearchableSnapshotsSettings; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.LinkedList; +import java.util.Objects; +import java.util.Queue; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; +import static org.elasticsearch.xpack.core.ClientHelper.SEARCHABLE_SNAPSHOTS_ORIGIN; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING; + +/** + * A service that delete documents in the snapshot blob cache index when they are not required anymore. + * + * This service runs on the data node that contains the snapshot blob cache primary shard. It listens to cluster state updates to find + * searchable snapshot indices that are deleted and checks if the index snapshot is still used by other searchable snapshot indices. If the + * index snapshot is not used anymore then i triggers the deletion of corresponding cached blobs in the snapshot blob cache index using a + * delete-by-query. + */ +public class BlobStoreCacheMaintenanceService implements ClusterStateListener { + + private static final Logger logger = LogManager.getLogger(BlobStoreCacheMaintenanceService.class); + + private final Client clientWithOrigin; + private final String systemIndexName; + private final ThreadPool threadPool; + + public BlobStoreCacheMaintenanceService(ThreadPool threadPool, Client client, String systemIndexName) { + this.clientWithOrigin = new OriginSettingClient(Objects.requireNonNull(client), SEARCHABLE_SNAPSHOTS_ORIGIN); + this.systemIndexName = Objects.requireNonNull(systemIndexName); + this.threadPool = Objects.requireNonNull(threadPool); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + final ClusterState state = event.state(); + if (state.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) { + return; // state not fully recovered + } + final ShardRouting primary = systemIndexPrimaryShard(state); + if (primary == null || Objects.equals(state.nodes().getLocalNodeId(), primary.currentNodeId()) == false) { + return; // system index primary shard does not exist or is not assigned to this data node + } + if (event.indicesDeleted().isEmpty() == false) { + threadPool.generic().execute(new MaintenanceTask(event)); + } + } + + @Nullable + private ShardRouting systemIndexPrimaryShard(final ClusterState state) { + final IndexMetadata indexMetadata = state.metadata().index(systemIndexName); + if (indexMetadata != null) { + final IndexRoutingTable indexRoutingTable = state.routingTable().index(indexMetadata.getIndex()); + if (indexRoutingTable != null) { + return indexRoutingTable.shard(0).primaryShard(); + } + } + return null; + } + + private static boolean hasSearchableSnapshotWith(final ClusterState state, final SnapshotId snapshotId, final IndexId indexId) { + for (IndexMetadata indexMetadata : state.metadata()) { + final Settings indexSettings = indexMetadata.getSettings(); + if (SearchableSnapshotsSettings.isSearchableSnapshotStore(indexSettings)) { + final SnapshotId otherSnapshotId = new SnapshotId( + SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings), + SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings) + ); + if (Objects.equals(snapshotId, otherSnapshotId)) { + final IndexId otherIndexId = new IndexId( + SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings), + SNAPSHOT_INDEX_ID_SETTING.get(indexSettings) + ); + if (Objects.equals(indexId, otherIndexId)) { + return true; + } + } + } + } + return false; + } + + static QueryBuilder buildDeleteByQuery(int numberOfShards, String snapshotUuid, String indexUuid) { + final Set paths = IntStream.range(0, numberOfShards) + .mapToObj(shard -> String.join("/", snapshotUuid, indexUuid, String.valueOf(shard))) + .collect(Collectors.toSet()); + assert paths.isEmpty() == false; + return QueryBuilders.termsQuery("blob.path", paths); + } + + private class MaintenanceTask extends AbstractRunnable { + + private final ClusterChangedEvent event; + + MaintenanceTask(ClusterChangedEvent event) { + assert event.indicesDeleted().isEmpty() == false; + this.event = Objects.requireNonNull(event); + } + + @Override + protected void doRun() { + final Queue>> queue = new LinkedList<>(); + final ClusterState state = event.state(); + + for (Index deletedIndex : event.indicesDeleted()) { + final IndexMetadata indexMetadata = event.previousState().metadata().index(deletedIndex); + assert indexMetadata != null : "no previous metadata found for " + deletedIndex; + if (indexMetadata != null) { + final Settings indexSetting = indexMetadata.getSettings(); + if (SearchableSnapshotsSettings.isSearchableSnapshotStore(indexSetting)) { + assert state.metadata().hasIndex(deletedIndex) == false; + + final SnapshotId snapshotId = new SnapshotId( + SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSetting), + SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSetting) + ); + final IndexId indexId = new IndexId( + SNAPSHOT_INDEX_NAME_SETTING.get(indexSetting), + SNAPSHOT_INDEX_ID_SETTING.get(indexSetting) + ); + + // we should do nothing if the current cluster state contains another + // searchable snapshot index that uses the same index snapshot + if (hasSearchableSnapshotWith(state, snapshotId, indexId)) { + logger.debug( + "snapshot [{}] of index {} is in use, skipping maintenance of snapshot blob cache entries", + snapshotId, + indexId + ); + continue; + } + + final DeleteByQueryRequest request = new DeleteByQueryRequest(systemIndexName); + request.setQuery(buildDeleteByQuery(indexMetadata.getNumberOfShards(), snapshotId.getUUID(), indexId.getId())); + request.setRefresh(queue.isEmpty()); + + queue.add(Tuple.tuple(request, new ActionListener<>() { + @Override + public void onResponse(BulkByScrollResponse response) { + logger.debug( + "blob cache maintenance task deleted [{}] entries after deletion of {} (snapshot:{}, index:{})", + response.getDeleted(), + deletedIndex, + snapshotId, + indexId + ); + } + + @Override + public void onFailure(Exception e) { + logger.debug( + () -> new ParameterizedMessage( + "exception when executing blob cache maintenance task after deletion of {} (snapshot:{}, index:{})", + deletedIndex, + snapshotId, + indexId + ), + e + ); + } + })); + } + } + } + + if (queue.isEmpty() == false) { + executeNextCleanUp(queue); + } + } + + void executeNextCleanUp(final Queue>> queue) { + assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC); + final Tuple> next = queue.poll(); + if (next != null) { + cleanUp(next.v1(), next.v2(), queue); + } + } + + void cleanUp( + final DeleteByQueryRequest request, + final ActionListener listener, + final Queue>> queue + ) { + assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC); + clientWithOrigin.execute(DeleteByQueryAction.INSTANCE, request, ActionListener.runAfter(listener, () -> { + if (queue.isEmpty() == false) { + threadPool.generic().execute(() -> executeNextCleanUp(queue)); + } + })); + } + + @Override + public void onFailure(Exception e) { + logger.warn( + () -> new ParameterizedMessage("snapshot blob cache maintenance task failed for cluster state update [{}]", event.source()), + e + ); + } + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectory.java index f6cc9ca40036c..cd403bcb00e44 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectory.java @@ -282,6 +282,11 @@ IndexInputStats getStats(String fileName) { return stats.get(getNonNullFileExt(fileName)); } + // only used in tests + public void clearStats() { + stats.clear(); + } + private BlobStoreIndexShardSnapshot.FileInfo fileInfo(final String name) throws FileNotFoundException { return files().stream() .filter(fileInfo -> fileInfo.physicalName().equals(name))