From 2b3d40051276cb07ac96d494383e8471837dc95e Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 28 Sep 2021 21:59:50 +0200 Subject: [PATCH 01/15] Add periodic maintenance task for snapshot blob cache --- ...tsBlobStoreCacheMaintenanceIntegTests.java | 208 +++++--- .../SearchableSnapshots.java | 9 +- .../BlobStoreCacheMaintenanceService.java | 464 ++++++++++++++++-- .../cache/blob/CachedBlob.java | 4 + 4 files changed, 587 insertions(+), 98 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java index 17a5245d4e266..d6ec2521deb25 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java @@ -7,14 +7,18 @@ package org.elasticsearch.xpack.searchablesnapshots.cache.blob; +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.reindex.ReindexPlugin; @@ -26,12 +30,16 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -57,71 +65,17 @@ protected Collection> nodePlugins() { /** * Test that snapshot blob cache entries are deleted from the system index after the corresponding searchable snapshot index is deleted */ - public void testMaintenance() throws Exception { + public void testCleanUpAfterIndicesAreDeleted() throws Exception { final String repositoryName = "repository"; createRepository(repositoryName, FsRepository.TYPE); - final int nbIndices = randomIntBetween(3, 10); - - logger.info("--> generating [{}] indices with cached entries in system index...", nbIndices); - final Map mountedIndices = new HashMap<>(); - final Map mountedIndicesSettings = new HashMap<>(); - - int i = 0; - long previousNumberOfCachedEntries = 0; - while (mountedIndices.size() < nbIndices) { - final String indexName = "index-" + i; - createIndex(indexName); - - final List indexRequestBuilders = new ArrayList<>(); - for (int n = 100; n > 0; n--) { - indexRequestBuilders.add( - client().prepareIndex(indexName) - .setSource( - XContentFactory.smileBuilder() - .startObject() - .field("text", randomRealisticUnicodeOfCodepointLength(10)) - .endObject() - ) - ); - } - indexRandom(true, indexRequestBuilders); - - createSnapshot(repositoryName, "snapshot-" + i, List.of(indexName)); - assertAcked(client().admin().indices().prepareDelete(indexName)); - - final String mountedIndex = "mounted-index-" + i; - mountSnapshot(repositoryName, "snapshot-" + i, "index-" + i, mountedIndex, Settings.EMPTY, randomFrom(Storage.values())); - - ensureGreen(mountedIndex); - assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME); - assertExecutorIsIdle(SearchableSnapshots.CACHE_PREWARMING_THREAD_POOL_NAME); - waitForBlobCacheFillsToComplete(); - - refreshSystemIndex(false); - - final long numberOfEntriesInCache = numberOfEntriesInCache(); - if (numberOfEntriesInCache > previousNumberOfCachedEntries) { - final long nbEntries = numberOfEntriesInCache - previousNumberOfCachedEntries; - logger.info("--> mounted index [{}] has [{}] entries in cache", mountedIndex, nbEntries); - mountedIndices.put(mountedIndex, nbEntries); - mountedIndicesSettings.put(mountedIndex, getIndexSettings(mountedIndex)); - - } else { - logger.info("--> mounted index [{}] did not generate any entry in cache, skipping", mountedIndex); - assertAcked(client().admin().indices().prepareDelete(mountedIndex)); - } - - previousNumberOfCachedEntries = numberOfEntriesInCache; - i += 1; - } - + final Map> mountedIndices = mountRandomIndicesWithCache(repositoryName, 3, 10); ensureYellow(SNAPSHOT_BLOB_CACHE_INDEX); refreshSystemIndex(true); final long numberOfEntriesInCache = numberOfEntriesInCache(); logger.info("--> found [{}] entries in snapshot blob cache", numberOfEntriesInCache); - assertThat(numberOfEntriesInCache, equalTo(mountedIndices.values().stream().mapToLong(l -> l).sum())); + assertThat(numberOfEntriesInCache, equalTo(mountedIndices.values().stream().mapToLong(Tuple::v2).sum())); final List indicesToDelete = randomSubsetOf(randomIntBetween(1, mountedIndices.size()), mountedIndices.keySet()); assertAcked(client().admin().indices().prepareDelete(indicesToDelete.toArray(String[]::new))); @@ -129,7 +83,7 @@ public void testMaintenance() throws Exception { final long expectedDeletedEntriesInCache = mountedIndices.entrySet() .stream() .filter(e -> indicesToDelete.contains(e.getKey())) - .mapToLong(Map.Entry::getValue) + .mapToLong(entry -> entry.getValue().v2()) .sum(); logger.info("--> deleting indices [{}] with [{}] entries in snapshot blob cache", indicesToDelete, expectedDeletedEntriesInCache); @@ -138,7 +92,7 @@ public void testMaintenance() throws Exception { assertThat(numberOfEntriesInCache(), equalTo(numberOfEntriesInCache - expectedDeletedEntriesInCache)); for (String mountedIndex : mountedIndices.keySet()) { - final Settings indexSettings = mountedIndicesSettings.get(mountedIndex); + final Settings indexSettings = mountedIndices.get(mountedIndex).v1(); assertHitCount( systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX) .setQuery( @@ -150,7 +104,7 @@ public void testMaintenance() throws Exception { ) .setSize(0) .get(), - indicesToDelete.contains(mountedIndex) ? 0L : mountedIndices.get(mountedIndex) + indicesToDelete.contains(mountedIndex) ? 0L : mountedIndices.get(mountedIndex).v2() ); } }); @@ -178,13 +132,12 @@ public void testMaintenance() throws Exception { Settings.EMPTY, randomFrom(Storage.values()) ); - ensureGreen(remainingMountedIndex); - mountedIndicesSettings.put(remainingMountedIndex, getIndexSettings(remainingMountedIndex)); assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME); assertExecutorIsIdle(SearchableSnapshots.CACHE_PREWARMING_THREAD_POOL_NAME); waitForBlobCacheFillsToComplete(); + ensureClusterStateConsistency(); logger.info( "--> deleting more mounted indices [{}] with snapshot [{}/{}] of index [{}] is still mounted as index [{}]", @@ -200,7 +153,7 @@ public void testMaintenance() throws Exception { refreshSystemIndex(true); for (String mountedIndex : mountedIndices.keySet()) { - final Settings indexSettings = mountedIndicesSettings.get(mountedIndex); + final Settings indexSettings = mountedIndices.get(mountedIndex).v1(); final long remainingEntriesInCache = systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX) .setQuery( @@ -218,11 +171,11 @@ public void testMaintenance() throws Exception { if (indicesToDelete.contains(mountedIndex)) { assertThat(remainingEntriesInCache, equalTo(0L)); } else if (snapshotId.equals(SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings))) { - assertThat(remainingEntriesInCache, greaterThanOrEqualTo(mountedIndices.get(randomMountedIndex))); + assertThat(remainingEntriesInCache, greaterThanOrEqualTo(mountedIndices.get(randomMountedIndex).v2())); } else if (moreIndicesToDelete.contains(mountedIndex)) { assertThat(remainingEntriesInCache, equalTo(0L)); } else { - assertThat(remainingEntriesInCache, equalTo(mountedIndices.get(mountedIndex))); + assertThat(remainingEntriesInCache, equalTo(mountedIndices.get(mountedIndex).v2())); } } }); @@ -236,6 +189,76 @@ public void testMaintenance() throws Exception { }); } + public void testPeriodicMaintenance() throws Exception { + ensureStableCluster(internalCluster().getNodeNames().length, TimeValue.timeValueSeconds(60L)); + + createRepository("repo", FsRepository.TYPE); + Map> mountedIndices = mountRandomIndicesWithCache("repo", 1, 3); + ensureYellow(SNAPSHOT_BLOB_CACHE_INDEX); + refreshSystemIndex(true); + assertThat(numberOfEntriesInCache(), equalTo(mountedIndices.values().stream().mapToLong(Tuple::v2).sum())); + + createRepository("other", FsRepository.TYPE); + Map> otherMountedIndices = mountRandomIndicesWithCache("other", 1, 3); + refreshSystemIndex(true); + assertThat( + numberOfEntriesInCache(), + equalTo(Stream.concat(mountedIndices.values().stream(), otherMountedIndices.values().stream()).mapToLong(Tuple::v2).sum()) + ); + + createRepository("backup", FsRepository.TYPE); + createSnapshot("backup", "backup", List.of(SNAPSHOT_BLOB_CACHE_INDEX)); + + final Set indicesToDelete = new HashSet<>(mountedIndices.keySet()); + indicesToDelete.add(randomFrom(otherMountedIndices.keySet())); + + assertAcked(systemClient().admin().indices().prepareDelete(SNAPSHOT_BLOB_CACHE_INDEX)); + assertAcked(client().admin().indices().prepareDelete(indicesToDelete.toArray(String[]::new))); + assertAcked(client().admin().cluster().prepareDeleteRepository("repo")); + ensureClusterStateConsistency(); + + refreshSystemIndex(false); + assertThat(numberOfEntriesInCache(), equalTo(0L)); + + assertAcked(client().admin().cluster().prepareUpdateSettings() + .setTransientSettings( + Settings.builder() + .put( + BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING.getKey(), + TimeValue.timeValueSeconds(1L)) + ) + ); + try { + final RestoreSnapshotResponse restoreResponse = client().admin() + .cluster() + .prepareRestoreSnapshot("backup", "backup") + .setIndices(SNAPSHOT_BLOB_CACHE_INDEX) + .setWaitForCompletion(true) + .get(); + assertThat(restoreResponse.getRestoreInfo().successfulShards(), equalTo(1)); + assertThat(restoreResponse.getRestoreInfo().failedShards(), equalTo(0)); + + final long expectNumberOfRemainingCacheEntries = otherMountedIndices.entrySet() + .stream() + .filter(e -> indicesToDelete.contains(e.getKey()) == false) + .mapToLong(e -> e.getValue().v2()) + .sum(); + + assertBusy(() -> { + refreshSystemIndex(true); + assertThat(numberOfEntriesInCache(), equalTo(expectNumberOfRemainingCacheEntries)); + }, 3000L, TimeUnit.SECONDS); + + } finally { + assertAcked(client().admin().cluster().prepareUpdateSettings() + .setTransientSettings( + Settings.builder() + .putNull(BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING.getKey()) + ) + ); + } + } + /** * @return a {@link Client} that can be used to query the blob store cache system index */ @@ -270,4 +293,59 @@ private void refreshSystemIndex(boolean failIfNotExist) { private Settings getIndexSettings(String indexName) { return client().admin().indices().prepareGetSettings(indexName).get().getIndexToSettings().get(indexName); } + + private Map> mountRandomIndicesWithCache(String repositoryName, int min, int max) throws Exception { + refreshSystemIndex(false); + long previousNumberOfCachedEntries = numberOfEntriesInCache(); + + final int nbIndices = randomIntBetween(min, max); + logger.info("--> generating [{}] indices with cached entries in system index...", nbIndices); + final Map> mountedIndices = new HashMap<>(); + + int i = 0; + while (mountedIndices.size() < nbIndices) { + final String indexName = "index-" + i; + createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()); + + final List indexRequestBuilders = new ArrayList<>(); + for (int n = 1000; n > 0; n--) { + indexRequestBuilders.add( + client().prepareIndex(indexName) + .setSource( + XContentFactory.smileBuilder() + .startObject() + .field("text_a", randomRealisticUnicodeOfCodepointLength(10)) + .field("text_b", randomRealisticUnicodeOfCodepointLength(10)) + .endObject() + ) + ); + } + indexRandom(true, indexRequestBuilders); + + createSnapshot(repositoryName, "snapshot-" + i, List.of(indexName)); + assertAcked(client().admin().indices().prepareDelete(indexName)); + + final String mountedIndex = "mounted-index-" + i + "-in-" + repositoryName; + mountSnapshot(repositoryName, "snapshot-" + i, "index-" + i, mountedIndex, Settings.EMPTY, randomFrom(Storage.values())); + + ensureGreen(mountedIndex); + assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME); + assertExecutorIsIdle(SearchableSnapshots.CACHE_PREWARMING_THREAD_POOL_NAME); + waitForBlobCacheFillsToComplete(); + + refreshSystemIndex(false); + final long numberOfEntriesInCache = numberOfEntriesInCache(); + if (numberOfEntriesInCache > previousNumberOfCachedEntries) { + final long nbEntries = numberOfEntriesInCache - previousNumberOfCachedEntries; + logger.info("--> mounted index [{}] has [{}] entries in cache", mountedIndex, nbEntries); + mountedIndices.put(mountedIndex, Tuple.tuple(getIndexSettings(mountedIndex), nbEntries)); + } else { + logger.info("--> mounted index [{}] did not generate any entry in cache, skipping", mountedIndex); + assertAcked(client().admin().indices().prepareDelete(mountedIndex)); + } + previousNumberOfCachedEntries = numberOfEntriesInCache; + i += 1; + } + return Collections.unmodifiableMap(mountedIndices); + } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index 427c4879bcb76..b51302cf2317a 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -313,7 +313,10 @@ public List> getSettings() { FrozenCacheService.FROZEN_CACHE_RECOVERY_RANGE_SIZE_SETTING, FrozenCacheService.SNAPSHOT_CACHE_MAX_FREQ_SETTING, FrozenCacheService.SNAPSHOT_CACHE_DECAY_INTERVAL_SETTING, - FrozenCacheService.SNAPSHOT_CACHE_MIN_TIME_DELTA_SETTING + FrozenCacheService.SNAPSHOT_CACHE_MIN_TIME_DELTA_SETTING, + BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING, + BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_KEEP_ALIVE_SETTING, + BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_BATCH_SIZE_SETTING ); } @@ -348,7 +351,9 @@ public Collection createComponents( threadPool::absoluteTimeInMillis ); this.blobStoreCacheService.set(blobStoreCacheService); - clusterService.addListener(new BlobStoreCacheMaintenanceService(threadPool, client, SNAPSHOT_BLOB_CACHE_INDEX)); + clusterService.addListener( + new BlobStoreCacheMaintenanceService(settings, clusterService, threadPool, client, SNAPSHOT_BLOB_CACHE_INDEX) + ); components.add(blobStoreCacheService); } else { PersistentCache.cleanUp(settings, nodeEnvironment); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java index d19a6e42ef5ce..ba3124499c4b6 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java @@ -10,18 +10,44 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkAction; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.search.ClosePointInTimeAction; +import org.elasticsearch.action.search.ClosePointInTimeRequest; +import org.elasticsearch.action.search.ClosePointInTimeResponse; +import org.elasticsearch.action.search.OpenPointInTimeAction; +import org.elasticsearch.action.search.OpenPointInTimeRequest; +import org.elasticsearch.action.search.OpenPointInTimeResponse; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.RepositoriesMetadata; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; import org.elasticsearch.index.query.QueryBuilder; @@ -29,24 +55,30 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; -import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.PointInTimeBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.snapshots.SearchableSnapshotsSettings; -import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; +import java.util.Arrays; +import java.util.HashSet; import java.util.LinkedList; import java.util.Objects; import java.util.Queue; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; import static org.elasticsearch.xpack.core.ClientHelper.SEARCHABLE_SNAPSHOTS_ORIGIN; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_BLOB_CACHE_INDEX; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING; /** * A service that delete documents in the snapshot blob cache index when they are not required anymore. @@ -60,14 +92,65 @@ public class BlobStoreCacheMaintenanceService implements ClusterStateListener { private static final Logger logger = LogManager.getLogger(BlobStoreCacheMaintenanceService.class); + /** + * The interval at which the periodic cleanup of the blob store cache index is scheduled. + */ + public static final Setting SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING = Setting.timeSetting( + "searchable_snapshots.blob_cache.periodic_cleanup.interval", + TimeValue.timeValueHours(1), + TimeValue.ZERO, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + /** + * The keep alive value for the internal point-in-time requests executed during the periodic cleanup. + */ + public static final Setting SNAPSHOT_SNAPSHOT_CLEANUP_KEEP_ALIVE_SETTING = Setting.timeSetting( + "searchable_snapshots.blob_cache.periodic_cleanup.pit_keep_alive", + TimeValue.timeValueMinutes(5L), + TimeValue.timeValueSeconds(10), + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + /** + * The number of documents that are searched for and bulk-deleted during the periodic cleanup. + */ + public static final Setting SNAPSHOT_SNAPSHOT_CLEANUP_BATCH_SIZE_SETTING = Setting.intSetting( + "searchable_snapshots.blob_cache.periodic_cleanup.batch_size", + 100, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + private final ClusterService clusterService; private final Client clientWithOrigin; private final String systemIndexName; private final ThreadPool threadPool; - public BlobStoreCacheMaintenanceService(ThreadPool threadPool, Client client, String systemIndexName) { + private volatile Scheduler.Cancellable periodicTask; + private volatile TimeValue periodicTaskInterval; + private volatile TimeValue periodicTaskKeepAlive; + private volatile int periodicTaskBatchSize; + private volatile boolean schedulePeriodic; + + public BlobStoreCacheMaintenanceService( + Settings settings, + ClusterService clusterService, + ThreadPool threadPool, + Client client, + String systemIndexName + ) { this.clientWithOrigin = new OriginSettingClient(Objects.requireNonNull(client), SEARCHABLE_SNAPSHOTS_ORIGIN); this.systemIndexName = Objects.requireNonNull(systemIndexName); + this.clusterService = Objects.requireNonNull(clusterService); this.threadPool = Objects.requireNonNull(threadPool); + this.periodicTaskInterval = SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING.get(settings); + this.periodicTaskKeepAlive = SNAPSHOT_SNAPSHOT_CLEANUP_KEEP_ALIVE_SETTING.get(settings); + this.periodicTaskBatchSize = SNAPSHOT_SNAPSHOT_CLEANUP_BATCH_SIZE_SETTING.get(settings); + ClusterSettings clusterSettings = clusterService.getClusterSettings(); + clusterSettings.addSettingsUpdateConsumer(SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING, this::setPeriodicTaskInterval); + clusterSettings.addSettingsUpdateConsumer(SNAPSHOT_SNAPSHOT_CLEANUP_KEEP_ALIVE_SETTING, this::setPeriodicTaskKeepAlive); + clusterSettings.addSettingsUpdateConsumer(SNAPSHOT_SNAPSHOT_CLEANUP_BATCH_SIZE_SETTING, this::setPeriodicTaskBatchSize); } @Override @@ -78,10 +161,56 @@ public void clusterChanged(ClusterChangedEvent event) { } final ShardRouting primary = systemIndexPrimaryShard(state); if (primary == null || Objects.equals(state.nodes().getLocalNodeId(), primary.currentNodeId()) == false) { - return; // system index primary shard does not exist or is not assigned to this data node + // system index primary shard does not exist or is not assigned to this data node + stopPeriodicTask(); + return; } if (event.indicesDeleted().isEmpty() == false) { - threadPool.generic().execute(new MaintenanceTask(event)); + threadPool.generic().execute(new DeletedIndicesMaintenanceTask(event)); + } + if (periodicTask == null || periodicTask.isCancelled()) { + schedulePeriodic = true; + schedulePeriodicTask(); + } + } + + private synchronized void setPeriodicTaskInterval(TimeValue interval) { + this.periodicTaskInterval = interval; + } + + private void setPeriodicTaskKeepAlive(TimeValue keepAlive) { + this.periodicTaskKeepAlive = keepAlive; + } + + public void setPeriodicTaskBatchSize(int batchSize) { + this.periodicTaskBatchSize = batchSize; + } + + private synchronized void schedulePeriodicTask() { + if (schedulePeriodic) { + try { + final TimeValue delay = periodicTaskInterval; + if (delay.getMillis() > 0L) { + final PeriodicMaintenanceTask task = new PeriodicMaintenanceTask(periodicTaskKeepAlive, periodicTaskBatchSize); + periodicTask = threadPool.schedule(task, delay, ThreadPool.Names.GENERIC); + } else { + periodicTask = null; + } + } catch (EsRejectedExecutionException e) { + if (e.isExecutorShutdown()) { + logger.debug("failed to schedule next periodic maintenance task for blob store cache, node is shutting down", e); + } else { + throw e; + } + } + } + } + + private synchronized void stopPeriodicTask() { + schedulePeriodic = false; + if (periodicTask != null && periodicTask.isCancelled() == false) { + periodicTask.cancel(); + periodicTask = null; } } @@ -97,22 +226,13 @@ private ShardRouting systemIndexPrimaryShard(final ClusterState state) { return null; } - private static boolean hasSearchableSnapshotWith(final ClusterState state, final SnapshotId snapshotId, final IndexId indexId) { + private static boolean hasSearchableSnapshotWith(final ClusterState state, final String snapshotId, final String indexId) { for (IndexMetadata indexMetadata : state.metadata()) { final Settings indexSettings = indexMetadata.getSettings(); if (SearchableSnapshotsSettings.isSearchableSnapshotStore(indexSettings)) { - final SnapshotId otherSnapshotId = new SnapshotId( - SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings), - SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings) - ); - if (Objects.equals(snapshotId, otherSnapshotId)) { - final IndexId otherIndexId = new IndexId( - SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings), - SNAPSHOT_INDEX_ID_SETTING.get(indexSettings) - ); - if (Objects.equals(indexId, otherIndexId)) { - return true; - } + if (Objects.equals(snapshotId, SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings)) + && Objects.equals(indexId, SNAPSHOT_INDEX_ID_SETTING.get(indexSettings))) { + return true; } } } @@ -127,11 +247,14 @@ static QueryBuilder buildDeleteByQuery(int numberOfShards, String snapshotUuid, return QueryBuilders.termsQuery("blob.path", paths); } - private class MaintenanceTask extends AbstractRunnable { + /** + * A maintenance task that cleans up the blob store cache index after searchable snapshot indices are deleted + */ + private class DeletedIndicesMaintenanceTask extends AbstractRunnable { private final ClusterChangedEvent event; - MaintenanceTask(ClusterChangedEvent event) { + DeletedIndicesMaintenanceTask(ClusterChangedEvent event) { assert event.indicesDeleted().isEmpty() == false; this.event = Objects.requireNonNull(event); } @@ -149,14 +272,8 @@ protected void doRun() { if (SearchableSnapshotsSettings.isSearchableSnapshotStore(indexSetting)) { assert state.metadata().hasIndex(deletedIndex) == false; - final SnapshotId snapshotId = new SnapshotId( - SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSetting), - SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSetting) - ); - final IndexId indexId = new IndexId( - SNAPSHOT_INDEX_NAME_SETTING.get(indexSetting), - SNAPSHOT_INDEX_ID_SETTING.get(indexSetting) - ); + final String snapshotId = SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSetting); + final String indexId = SNAPSHOT_INDEX_ID_SETTING.get(indexSetting); // we should do nothing if the current cluster state contains another // searchable snapshot index that uses the same index snapshot @@ -170,7 +287,7 @@ protected void doRun() { } final DeleteByQueryRequest request = new DeleteByQueryRequest(systemIndexName); - request.setQuery(buildDeleteByQuery(indexMetadata.getNumberOfShards(), snapshotId.getUUID(), indexId.getId())); + request.setQuery(buildDeleteByQuery(indexMetadata.getNumberOfShards(), snapshotId, indexId)); request.setRefresh(queue.isEmpty()); queue.add(Tuple.tuple(request, new ActionListener<>() { @@ -236,4 +353,289 @@ public void onFailure(Exception e) { ); } } + + private class PeriodicMaintenanceTask implements Runnable, Releasable { + + private final TimeValue keepAlive; + private final int batchSize; + + private final AtomicReference error = new AtomicReference<>(); + private final AtomicBoolean closed = new AtomicBoolean(); + private final AtomicLong processed = new AtomicLong(); + private final AtomicLong deletes = new AtomicLong(); + private final AtomicLong total = new AtomicLong(); + + private volatile SearchResponse searchResponse; + private volatile String pointIntTimeId; + private volatile Object[] searchAfter; + + PeriodicMaintenanceTask(TimeValue keepAlive, int batchSize) { + this.keepAlive = keepAlive; + this.batchSize = batchSize; + } + + @Override + public void run() { + final PeriodicMaintenanceTask maintenanceTask = this; + assert assertGenericThread(); + + try { + ensureOpen(); + if (pointIntTimeId == null) { + final OpenPointInTimeRequest openRequest = new OpenPointInTimeRequest(SNAPSHOT_BLOB_CACHE_INDEX); + openRequest.keepAlive(keepAlive); + clientWithOrigin.execute(OpenPointInTimeAction.INSTANCE, openRequest, new ActionListener<>() { + @Override + public void onResponse(OpenPointInTimeResponse response) { + logger.trace("periodic maintenance task initialized with point-in-time id [{}]", response.getPointInTimeId()); + maintenanceTask.pointIntTimeId = response.getPointInTimeId(); + executeNext(maintenanceTask); + } + + @Override + public void onFailure(Exception e) { + if (TransportActions.isShardNotAvailableException(e)) { + complete(maintenanceTask, null); + } else { + complete(maintenanceTask, e); + } + } + }); + return; + } + + final String pitId = pointIntTimeId; + assert Strings.hasLength(pitId); + + if (searchResponse == null) { + final SearchSourceBuilder searchSource = new SearchSourceBuilder(); + searchSource.trackScores(false); + searchSource.sort("_shard_doc"); + searchSource.size(batchSize); + final PointInTimeBuilder pointInTime = new PointInTimeBuilder(pitId); + searchSource.pointInTimeBuilder(pointInTime); + pointInTime.setKeepAlive(keepAlive); + final SearchRequest searchRequest = new SearchRequest(); + searchRequest.source(searchSource); + if (searchAfter != null) { + searchSource.searchAfter(searchAfter); + } + clientWithOrigin.execute(SearchAction.INSTANCE, searchRequest, new ActionListener<>() { + @Override + public void onResponse(SearchResponse response) { + maintenanceTask.total.compareAndSet(0L, response.getHits().getTotalHits().value); + maintenanceTask.searchResponse = response; + maintenanceTask.searchAfter = null; + executeNext(maintenanceTask); + } + + @Override + public void onFailure(Exception e) { + complete(maintenanceTask, e); + } + }); + return; + } + + final SearchHit[] searchHits = searchResponse.getHits().getHits(); + if (searchHits != null && searchHits.length > 0) { + final ClusterState state = clusterService.state(); + final BulkRequest bulkRequest = new BulkRequest(); + + final RepositoriesMetadata repositories = state.metadata() + .custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY); + + final Set> missingSnapshots = new HashSet<>(); + final Set missingRepositories = new HashSet<>(); + + Object[] lastSortValues = null; + for (SearchHit searchHit : searchHits) { + assert searchHit.getId() != null; + assert searchHit.hasSource(); + boolean delete = false; + try { + // See {@link BlobStoreCacheService#generateId} + // doc id = {repository name}/{snapshot id}/{snapshot index id}/{shard id}/{file name}/@{file offset} + final String[] parts = Objects.requireNonNull(searchHit.getId()).split("/"); + assert parts.length == 6 : Arrays.toString(parts) + " vs " + searchHit.getId(); + + final String repositoryName = parts[0]; + if (missingRepositories.contains(repositoryName) || repositories.repository(repositoryName) == null) { + logger.trace( + "deleting blob store cache entry [id:{}, repository:{}, reason: repository does not exist]", + searchHit.getId(), + repositoryName + ); + missingRepositories.add(repositoryName); + delete = true; + continue; + } + + final Tuple snapshot = Tuple.tuple(parts[1], parts[2]); + if (missingSnapshots.contains(snapshot) + || hasSearchableSnapshotWith(state, snapshot.v1(), snapshot.v2()) == false) { + logger.trace( + "deleting blob store cache entry [id:{}, snapshotId:{}, indexId:{}, reason: unused]", + searchHit.getId(), + snapshot.v1(), + snapshot.v2() + ); + missingSnapshots.add(snapshot); + delete = true; + continue; + } + + final CachedBlob cachedBlob = CachedBlob.fromSource(Objects.requireNonNull(searchHit.getSourceAsMap())); + if (Version.CURRENT.isCompatible(cachedBlob.version()) == false) { + logger.trace( + "deleting blob store cache entry [id:{}, version:{}, reason: incompatible version]", + searchHit.getId(), + cachedBlob.version() + ); + delete = true; + continue; + } + } catch (Exception e) { + logger.warn( + () -> new ParameterizedMessage("failed to parse blob store cache entry [id:{}]", searchHit.getId()), + e + ); + delete = true; + } finally { + if (delete) { + final DeleteRequest deleteRequest = new DeleteRequest().index(searchHit.getIndex()); + deleteRequest.id(searchHit.getId()); + deleteRequest.setIfSeqNo(searchHit.getSeqNo()); + deleteRequest.setIfPrimaryTerm(searchHit.getPrimaryTerm()); + bulkRequest.add(deleteRequest); + } + maintenanceTask.processed.incrementAndGet(); + lastSortValues = searchHit.getSortValues(); + } + } + + assert lastSortValues != null; + if (bulkRequest.numberOfActions() == 0) { + this.searchResponse = null; + this.searchAfter = lastSortValues; + executeNext(this); + return; + } + + final Object[] finalSearchAfter = lastSortValues; + clientWithOrigin.execute(BulkAction.INSTANCE, bulkRequest, new ActionListener<>() { + @Override + public void onResponse(BulkResponse response) { + for (BulkItemResponse itemResponse : response.getItems()) { + if (itemResponse.isFailed() == false) { + assert itemResponse.getResponse() instanceof DeleteResponse; + maintenanceTask.deletes.incrementAndGet(); + } + } + maintenanceTask.searchResponse = null; + maintenanceTask.searchAfter = finalSearchAfter; + executeNext(maintenanceTask); + } + + @Override + public void onFailure(Exception e) { + complete(maintenanceTask, e); + } + }); + return; + } + // we're done, complete the task + complete(this, null); + } catch (Exception e) { + complete(this, e); + } + } + + public boolean isClosed() { + return closed.get(); + } + + private void ensureOpen() { + if (isClosed()) { + assert false : "should not use periodic task after close"; + throw new IllegalStateException("Periodic maintenance task is closed"); + } + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + final Exception e = error.get(); + if (e != null) { + logger.warn( + () -> new ParameterizedMessage( + "periodic maintenance task completed with failure ({} deleted documents out of a total of {})", + deletes.get(), + total.get() + ), + e + ); + } else { + logger.info( + () -> new ParameterizedMessage( + "periodic maintenance task completed ({} deleted documents out of a total of {}, {})", + deletes.get(), + total.get(), + processed.get() + ) + ); + } + } + } + + private boolean assertGenericThread() { + final String threadName = Thread.currentThread().getName(); + assert threadName.contains(ThreadPool.Names.GENERIC) : threadName; + return true; + } + } + + private void executeNext(PeriodicMaintenanceTask maintenanceTask) { + threadPool.generic().execute(maintenanceTask); + } + + private void complete(PeriodicMaintenanceTask maintenanceTask, @Nullable Exception failure) { + assert maintenanceTask.isClosed() == false; + final Releasable releasable = () -> { + try { + final Exception previous = maintenanceTask.error.getAndSet(failure); + assert previous == null : "periodic maintenance task already failed: " + previous; + maintenanceTask.close(); + } finally { + schedulePeriodicTask(); + } + }; + boolean waitForRelease = false; + try { + final String pitId = maintenanceTask.pointIntTimeId; + if (Strings.hasLength(pitId)) { + final ClosePointInTimeRequest closeRequest = new ClosePointInTimeRequest(pitId); + clientWithOrigin.execute(ClosePointInTimeAction.INSTANCE, closeRequest, ActionListener.runAfter(new ActionListener<>() { + @Override + public void onResponse(ClosePointInTimeResponse response) { + if (response.isSucceeded()) { + logger.debug("periodic maintenance task successfully closed point-in-time id [{}]", pitId); + } else { + logger.debug("point-in-time id [{}] not found", pitId); + } + } + + @Override + public void onFailure(Exception e) { + logger.warn(() -> new ParameterizedMessage("failed to close point-in-time id [{}]", pitId), e); + } + }, () -> Releasables.close(releasable))); + waitForRelease = true; + } + } finally { + if (waitForRelease == false) { + Releasables.close(releasable); + } + } + } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/CachedBlob.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/CachedBlob.java index ba0d2b21b659f..5f531b87b5f4a 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/CachedBlob.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/CachedBlob.java @@ -117,6 +117,10 @@ public BytesReference bytes() { return bytes; } + public Version version() { + return version; + } + @SuppressWarnings("unchecked") public static CachedBlob fromSource(final Map source) { final Long creationTimeEpochMillis = (Long) source.get("creation_time"); From 9710972c6bf6ec37ab25652794aa316ad7cc272b Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 29 Sep 2021 11:21:51 +0200 Subject: [PATCH 02/15] nits --- ...tsBlobStoreCacheMaintenanceIntegTests.java | 30 +++-- .../BlobStoreCacheMaintenanceService.java | 120 +++++++++--------- 2 files changed, 76 insertions(+), 74 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java index d6ec2521deb25..d9d588af76251 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java @@ -220,13 +220,17 @@ public void testPeriodicMaintenance() throws Exception { refreshSystemIndex(false); assertThat(numberOfEntriesInCache(), equalTo(0L)); - assertAcked(client().admin().cluster().prepareUpdateSettings() - .setTransientSettings( - Settings.builder() - .put( - BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING.getKey(), - TimeValue.timeValueSeconds(1L)) - ) + assertAcked( + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder() + .put( + BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING.getKey(), + TimeValue.timeValueSeconds(1L) + ) + ) ); try { final RestoreSnapshotResponse restoreResponse = client().admin() @@ -250,11 +254,13 @@ public void testPeriodicMaintenance() throws Exception { }, 3000L, TimeUnit.SECONDS); } finally { - assertAcked(client().admin().cluster().prepareUpdateSettings() - .setTransientSettings( - Settings.builder() - .putNull(BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING.getKey()) - ) + assertAcked( + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder().putNull(BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING.getKey()) + ) ); } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java index ba3124499c4b6..62e5bf5da51b6 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java @@ -10,7 +10,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkAction; import org.elasticsearch.action.bulk.BulkItemResponse; @@ -35,6 +34,7 @@ import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; +import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; @@ -107,13 +107,13 @@ public class BlobStoreCacheMaintenanceService implements ClusterStateListener { */ public static final Setting SNAPSHOT_SNAPSHOT_CLEANUP_KEEP_ALIVE_SETTING = Setting.timeSetting( "searchable_snapshots.blob_cache.periodic_cleanup.pit_keep_alive", - TimeValue.timeValueMinutes(5L), - TimeValue.timeValueSeconds(10), + TimeValue.timeValueMinutes(10L), + TimeValue.timeValueSeconds(30L), Setting.Property.NodeScope, Setting.Property.Dynamic ); /** - * The number of documents that are searched for and bulk-deleted during the periodic cleanup. + * The number of documents that are searched for and bulk-deleted at once during the periodic cleanup. */ public static final Setting SNAPSHOT_SNAPSHOT_CLEANUP_BATCH_SIZE_SETTING = Setting.intSetting( "searchable_snapshots.blob_cache.periodic_cleanup.batch_size", @@ -147,7 +147,7 @@ public BlobStoreCacheMaintenanceService( this.periodicTaskInterval = SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING.get(settings); this.periodicTaskKeepAlive = SNAPSHOT_SNAPSHOT_CLEANUP_KEEP_ALIVE_SETTING.get(settings); this.periodicTaskBatchSize = SNAPSHOT_SNAPSHOT_CLEANUP_BATCH_SIZE_SETTING.get(settings); - ClusterSettings clusterSettings = clusterService.getClusterSettings(); + final ClusterSettings clusterSettings = clusterService.getClusterSettings(); clusterSettings.addSettingsUpdateConsumer(SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING, this::setPeriodicTaskInterval); clusterSettings.addSettingsUpdateConsumer(SNAPSHOT_SNAPSHOT_CLEANUP_KEEP_ALIVE_SETTING, this::setPeriodicTaskKeepAlive); clusterSettings.addSettingsUpdateConsumer(SNAPSHOT_SNAPSHOT_CLEANUP_BATCH_SIZE_SETTING, this::setPeriodicTaskBatchSize); @@ -354,6 +354,17 @@ public void onFailure(Exception e) { } } + /** + * A maintenance task that periodically cleans up unused cache entries from the blob store cache index. + * + * This task first opens a point-in-time context on the blob store cache system index and uses it to search all documents. For each + * document found the task verifies if it belongs to an existing searchable snapshot index. If the doc does not belong to any + * index then it is deleted as part of a bulk request. Once the bulk is executed the next batch of documents is searched for. Once + * all documents from the PIT have been verified the task closes the PIT and completes itself. + * + * The task executes every step (PIT opening, searches, bulk deletes, PIT closing) using the generic thread pool. + * The same task instance is used for all the steps and makes sure that a closed instance is not executed again. + */ private class PeriodicMaintenanceTask implements Runnable, Releasable { private final TimeValue keepAlive; @@ -361,7 +372,6 @@ private class PeriodicMaintenanceTask implements Runnable, Releasable { private final AtomicReference error = new AtomicReference<>(); private final AtomicBoolean closed = new AtomicBoolean(); - private final AtomicLong processed = new AtomicLong(); private final AtomicLong deletes = new AtomicLong(); private final AtomicLong total = new AtomicLong(); @@ -442,75 +452,62 @@ public void onFailure(Exception e) { final ClusterState state = clusterService.state(); final BulkRequest bulkRequest = new BulkRequest(); - final RepositoriesMetadata repositories = state.metadata() - .custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY); - + final Set> knownSnapshots = new HashSet<>(); final Set> missingSnapshots = new HashSet<>(); - final Set missingRepositories = new HashSet<>(); + final Set knownRepositories = state.metadata() + .custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY) + .repositories() + .stream() + .map(RepositoryMetadata::name) + .collect(Collectors.toSet()); Object[] lastSortValues = null; for (SearchHit searchHit : searchHits) { assert searchHit.getId() != null; assert searchHit.hasSource(); - boolean delete = false; - try { - // See {@link BlobStoreCacheService#generateId} - // doc id = {repository name}/{snapshot id}/{snapshot index id}/{shard id}/{file name}/@{file offset} - final String[] parts = Objects.requireNonNull(searchHit.getId()).split("/"); - assert parts.length == 6 : Arrays.toString(parts) + " vs " + searchHit.getId(); - - final String repositoryName = parts[0]; - if (missingRepositories.contains(repositoryName) || repositories.repository(repositoryName) == null) { - logger.trace( - "deleting blob store cache entry [id:{}, repository:{}, reason: repository does not exist]", - searchHit.getId(), - repositoryName - ); - missingRepositories.add(repositoryName); - delete = true; - continue; - } + // See {@link BlobStoreCacheService#generateId} + // doc id = {repository name}/{snapshot id}/{snapshot index id}/{shard id}/{file name}/@{file offset} + final String[] parts = Objects.requireNonNull(searchHit.getId()).split("/"); + assert parts.length == 6 : Arrays.toString(parts) + " vs " + searchHit.getId(); + + boolean delete = false; + lastSortValues = searchHit.getSortValues(); + + final String repositoryName = parts[0]; + if (knownRepositories.contains(repositoryName) == false) { + logger.trace( + "deleting blob store cache entry [id:{}, repository:{}, reason: repository does not exist]", + searchHit.getId(), + repositoryName + ); + delete = true; + } else { final Tuple snapshot = Tuple.tuple(parts[1], parts[2]); - if (missingSnapshots.contains(snapshot) - || hasSearchableSnapshotWith(state, snapshot.v1(), snapshot.v2()) == false) { + boolean isMissing = missingSnapshots.contains(snapshot); + boolean isKnown = knownSnapshots.contains(snapshot); + if (isMissing + || (isKnown == false && hasSearchableSnapshotWith(state, snapshot.v1(), snapshot.v2()) == false)) { logger.trace( "deleting blob store cache entry [id:{}, snapshotId:{}, indexId:{}, reason: unused]", searchHit.getId(), snapshot.v1(), snapshot.v2() ); - missingSnapshots.add(snapshot); - delete = true; - continue; - } - - final CachedBlob cachedBlob = CachedBlob.fromSource(Objects.requireNonNull(searchHit.getSourceAsMap())); - if (Version.CURRENT.isCompatible(cachedBlob.version()) == false) { - logger.trace( - "deleting blob store cache entry [id:{}, version:{}, reason: incompatible version]", - searchHit.getId(), - cachedBlob.version() - ); + if (isMissing == false) { + missingSnapshots.add(snapshot); + } delete = true; - continue; - } - } catch (Exception e) { - logger.warn( - () -> new ParameterizedMessage("failed to parse blob store cache entry [id:{}]", searchHit.getId()), - e - ); - delete = true; - } finally { - if (delete) { - final DeleteRequest deleteRequest = new DeleteRequest().index(searchHit.getIndex()); - deleteRequest.id(searchHit.getId()); - deleteRequest.setIfSeqNo(searchHit.getSeqNo()); - deleteRequest.setIfPrimaryTerm(searchHit.getPrimaryTerm()); - bulkRequest.add(deleteRequest); + } else if (isKnown == false) { + knownSnapshots.add(snapshot); } - maintenanceTask.processed.incrementAndGet(); - lastSortValues = searchHit.getSortValues(); + } + if (delete) { + final DeleteRequest deleteRequest = new DeleteRequest().index(searchHit.getIndex()); + deleteRequest.id(searchHit.getId()); + deleteRequest.setIfSeqNo(searchHit.getSeqNo()); + deleteRequest.setIfPrimaryTerm(searchHit.getPrimaryTerm()); + bulkRequest.add(deleteRequest); } } @@ -578,10 +575,9 @@ public void close() { } else { logger.info( () -> new ParameterizedMessage( - "periodic maintenance task completed ({} deleted documents out of a total of {}, {})", + "periodic maintenance task completed ({} deleted documents out of a total of {})", deletes.get(), - total.get(), - processed.get() + total.get() ) ); } From afb40f72449f8996f5bc1846415651e8592cbdcd Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 29 Sep 2021 11:51:51 +0200 Subject: [PATCH 03/15] tests --- ...tsBlobStoreCacheMaintenanceIntegTests.java | 82 +++++++++++-------- 1 file changed, 46 insertions(+), 36 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java index d9d588af76251..9b6e969c2af3e 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; @@ -189,6 +190,9 @@ public void testCleanUpAfterIndicesAreDeleted() throws Exception { }); } + /** + * Test that obsolete blob cache entries are deleted from the system index by the periodic maintenance task. + */ public void testPeriodicMaintenance() throws Exception { ensureStableCluster(internalCluster().getNodeNames().length, TimeValue.timeValueSeconds(60L)); @@ -206,6 +210,7 @@ public void testPeriodicMaintenance() throws Exception { equalTo(Stream.concat(mountedIndices.values().stream(), otherMountedIndices.values().stream()).mapToLong(Tuple::v2).sum()) ); + // creates a backup of the system index cache to be restored later createRepository("backup", FsRepository.TYPE); createSnapshot("backup", "backup", List.of(SNAPSHOT_BLOB_CACHE_INDEX)); @@ -233,6 +238,7 @@ public void testPeriodicMaintenance() throws Exception { ) ); try { + // restores the .snapshot-blob-cache index with - now obsolete - documents final RestoreSnapshotResponse restoreResponse = client().admin() .cluster() .prepareRestoreSnapshot("backup", "backup") @@ -251,7 +257,7 @@ public void testPeriodicMaintenance() throws Exception { assertBusy(() -> { refreshSystemIndex(true); assertThat(numberOfEntriesInCache(), equalTo(expectNumberOfRemainingCacheEntries)); - }, 3000L, TimeUnit.SECONDS); + }, 30L, TimeUnit.SECONDS); } finally { assertAcked( @@ -313,43 +319,47 @@ private Map> mountRandomIndicesWithCache(String re final String indexName = "index-" + i; createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()); - final List indexRequestBuilders = new ArrayList<>(); - for (int n = 1000; n > 0; n--) { - indexRequestBuilders.add( - client().prepareIndex(indexName) - .setSource( - XContentFactory.smileBuilder() - .startObject() - .field("text_a", randomRealisticUnicodeOfCodepointLength(10)) - .field("text_b", randomRealisticUnicodeOfCodepointLength(10)) - .endObject() - ) - ); + while (true) { + final List indexRequestBuilders = new ArrayList<>(); + for (int n = 500; n > 0; n--) { + final XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.startObject(); + for (int j = 0; j < 10; j++) { + builder.field("text_" + j, randomRealisticUnicodeOfCodepointLength(10)); + builder.field("int_" + j, randomInt()); + } + builder.endObject(); + indexRequestBuilders.add(client().prepareIndex(indexName).setSource(builder)); + } + indexRandom(true, indexRequestBuilders); + + final String snapshot = "snapshot-" + i; + createSnapshot(repositoryName, snapshot, List.of(indexName)); + + final String mountedIndex = "mounted-" + indexName + "-in-" + repositoryName; + mountSnapshot(repositoryName, snapshot, indexName, mountedIndex, Settings.EMPTY, randomFrom(Storage.values())); + + ensureGreen(mountedIndex); + assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME); + assertExecutorIsIdle(SearchableSnapshots.CACHE_PREWARMING_THREAD_POOL_NAME); + waitForBlobCacheFillsToComplete(); + + refreshSystemIndex(false); + final long numberOfEntriesInCache = numberOfEntriesInCache(); + if (numberOfEntriesInCache > previousNumberOfCachedEntries) { + final long nbEntries = numberOfEntriesInCache - previousNumberOfCachedEntries; + logger.info("--> mounted index [{}] has [{}] entries in cache", mountedIndex, nbEntries); + mountedIndices.put(mountedIndex, Tuple.tuple(getIndexSettings(mountedIndex), nbEntries)); + previousNumberOfCachedEntries = numberOfEntriesInCache; + break; + + } else { + logger.info("--> mounted index [{}] did not generate any entry in cache", mountedIndex); + assertAcked(client().admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot).get()); + assertAcked(client().admin().indices().prepareDelete(mountedIndex)); + } } - indexRandom(true, indexRequestBuilders); - - createSnapshot(repositoryName, "snapshot-" + i, List.of(indexName)); assertAcked(client().admin().indices().prepareDelete(indexName)); - - final String mountedIndex = "mounted-index-" + i + "-in-" + repositoryName; - mountSnapshot(repositoryName, "snapshot-" + i, "index-" + i, mountedIndex, Settings.EMPTY, randomFrom(Storage.values())); - - ensureGreen(mountedIndex); - assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME); - assertExecutorIsIdle(SearchableSnapshots.CACHE_PREWARMING_THREAD_POOL_NAME); - waitForBlobCacheFillsToComplete(); - - refreshSystemIndex(false); - final long numberOfEntriesInCache = numberOfEntriesInCache(); - if (numberOfEntriesInCache > previousNumberOfCachedEntries) { - final long nbEntries = numberOfEntriesInCache - previousNumberOfCachedEntries; - logger.info("--> mounted index [{}] has [{}] entries in cache", mountedIndex, nbEntries); - mountedIndices.put(mountedIndex, Tuple.tuple(getIndexSettings(mountedIndex), nbEntries)); - } else { - logger.info("--> mounted index [{}] did not generate any entry in cache, skipping", mountedIndex); - assertAcked(client().admin().indices().prepareDelete(mountedIndex)); - } - previousNumberOfCachedEntries = numberOfEntriesInCache; i += 1; } return Collections.unmodifiableMap(mountedIndices); From 71ff57a88a56f03cda0d16b294f2e47123c37eed Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 30 Sep 2021 13:33:51 +0200 Subject: [PATCH 04/15] startPeriodicTask --- .../cache/blob/BlobStoreCacheMaintenanceService.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java index 62e5bf5da51b6..93e51f85c1804 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java @@ -170,7 +170,7 @@ public void clusterChanged(ClusterChangedEvent event) { } if (periodicTask == null || periodicTask.isCancelled()) { schedulePeriodic = true; - schedulePeriodicTask(); + startPeriodicTask(); } } @@ -186,7 +186,7 @@ public void setPeriodicTaskBatchSize(int batchSize) { this.periodicTaskBatchSize = batchSize; } - private synchronized void schedulePeriodicTask() { + private synchronized void startPeriodicTask() { if (schedulePeriodic) { try { final TimeValue delay = periodicTaskInterval; @@ -603,7 +603,7 @@ private void complete(PeriodicMaintenanceTask maintenanceTask, @Nullable Excepti assert previous == null : "periodic maintenance task already failed: " + previous; maintenanceTask.close(); } finally { - schedulePeriodicTask(); + startPeriodicTask(); } }; boolean waitForRelease = false; From 6ac5a6e344e6b3dc63fab05d1675b934c50ff132 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 30 Sep 2021 16:46:30 +0200 Subject: [PATCH 05/15] move creation time up --- .../searchablesnapshots/SearchableSnapshots.java | 3 +-- .../cache/blob/BlobStoreCacheService.java | 8 +++----- .../store/SearchableSnapshotDirectory.java | 12 +++++++++++- .../cache/blob/BlobStoreCacheServiceTests.java | 12 ++++++------ .../searchablesnapshots/cache/common/TestUtils.java | 4 ++-- 5 files changed, 23 insertions(+), 16 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index b51302cf2317a..918318a66524b 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -347,8 +347,7 @@ public Collection createComponents( final BlobStoreCacheService blobStoreCacheService = new BlobStoreCacheService( clusterService, client, - SNAPSHOT_BLOB_CACHE_INDEX, - threadPool::absoluteTimeInMillis + SNAPSHOT_BLOB_CACHE_INDEX ); this.blobStoreCacheService.set(blobStoreCacheService); clusterService.addListener( diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheService.java index 36a8b2468aef9..e4c7ba7d250ab 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheService.java @@ -49,7 +49,6 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Supplier; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xpack.core.ClientHelper.SEARCHABLE_SNAPSHOTS_ORIGIN; @@ -72,17 +71,15 @@ public class BlobStoreCacheService extends AbstractLifecycleComponent { private final ClusterService clusterService; private final Semaphore inFlightCacheFills; - private final Supplier timeSupplier; private final AtomicBoolean closed; private final Client client; private final String index; - public BlobStoreCacheService(ClusterService clusterService, Client client, String index, Supplier timeSupplier) { + public BlobStoreCacheService(ClusterService clusterService, Client client, String index) { this.client = new OriginSettingClient(client, SEARCHABLE_SNAPSHOTS_ORIGIN); this.inFlightCacheFills = new Semaphore(MAX_IN_FLIGHT_CACHE_FILLS); this.closed = new AtomicBoolean(false); this.clusterService = clusterService; - this.timeSupplier = timeSupplier; this.index = index; } @@ -242,12 +239,13 @@ public final void putAsync( final String name, final ByteRange range, final BytesReference bytes, + final long timeInEpochMillis, final ActionListener listener ) { final String id = generateId(repository, snapshotId, indexId, shardId, name, range); try { final CachedBlob cachedBlob = new CachedBlob( - Instant.ofEpochMilli(timeSupplier.get()), + Instant.ofEpochMilli(timeInEpochMillis), Version.CURRENT, repository, name, diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectory.java index cd403bcb00e44..d1c3c65e78b83 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectory.java @@ -704,7 +704,17 @@ public CachedBlob getCachedBlob(String name, ByteRange range) { } public void putCachedBlob(String name, ByteRange range, BytesReference content, ActionListener listener) { - blobStoreCacheService.putAsync(repository, snapshotId, indexId, shardId, name, range, content, listener); + blobStoreCacheService.putAsync( + repository, + snapshotId, + indexId, + shardId, + name, + range, + content, + threadPool.absoluteTimeInMillis(), + listener + ); } public FrozenCacheFile getFrozenCacheFile(String fileName, long length) { diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheServiceTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheServiceTests.java index 88aad96b1680a..51db7d04337e5 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheServiceTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheServiceTests.java @@ -100,7 +100,7 @@ public void testGetWhenServiceNotStarted() { return null; }).when(mockClient).execute(eq(GetAction.INSTANCE), any(GetRequest.class), any(ActionListener.class)); - BlobStoreCacheService blobCacheService = new BlobStoreCacheService(null, mockClient, SNAPSHOT_BLOB_CACHE_INDEX, () -> 0L); + BlobStoreCacheService blobCacheService = new BlobStoreCacheService(null, mockClient, SNAPSHOT_BLOB_CACHE_INDEX); blobCacheService.start(); PlainActionFuture future = PlainActionFuture.newFuture(); @@ -132,17 +132,17 @@ public void testPutWhenServiceNotStarted() { return null; }).when(mockClient).execute(eq(IndexAction.INSTANCE), any(IndexRequest.class), any(ActionListener.class)); - BlobStoreCacheService blobCacheService = new BlobStoreCacheService(null, mockClient, SNAPSHOT_BLOB_CACHE_INDEX, () -> 0L); + BlobStoreCacheService blobCacheService = new BlobStoreCacheService(null, mockClient, SNAPSHOT_BLOB_CACHE_INDEX); blobCacheService.start(); PlainActionFuture future = PlainActionFuture.newFuture(); - blobCacheService.putAsync(repository, snapshotId, indexId, shardId, fileName, range, BytesArray.EMPTY, future); + blobCacheService.putAsync(repository, snapshotId, indexId, shardId, fileName, range, BytesArray.EMPTY, 0L, future); assertThat(future.actionGet(), nullValue()); blobCacheService.stop(); future = PlainActionFuture.newFuture(); - blobCacheService.putAsync(repository, snapshotId, indexId, shardId, fileName, range, BytesArray.EMPTY, future); + blobCacheService.putAsync(repository, snapshotId, indexId, shardId, fileName, range, BytesArray.EMPTY, 0L, future); IllegalStateException exception = expectThrows(IllegalStateException.class, future::actionGet); assertThat(exception.getMessage(), containsString("Blob cache service is closed")); } @@ -170,7 +170,7 @@ public void testWaitForInFlightCacheFillsToComplete() throws Exception { return null; }).when(mockClient).execute(eq(IndexAction.INSTANCE), any(IndexRequest.class), any(ActionListener.class)); - final BlobStoreCacheService blobCacheService = new BlobStoreCacheService(null, mockClient, SNAPSHOT_BLOB_CACHE_INDEX, () -> 0L); + final BlobStoreCacheService blobCacheService = new BlobStoreCacheService(null, mockClient, SNAPSHOT_BLOB_CACHE_INDEX); blobCacheService.start(); assertThat(blobCacheService.getInFlightCacheFills(), equalTo(0)); @@ -180,7 +180,7 @@ public void testWaitForInFlightCacheFillsToComplete() throws Exception { final PlainActionFuture future = PlainActionFuture.newFuture(); threadPool.generic() .execute( - () -> blobCacheService.putAsync(repository, snapshotId, indexId, shardId, fileName, range, BytesArray.EMPTY, future) + () -> blobCacheService.putAsync(repository, snapshotId, indexId, shardId, fileName, range, BytesArray.EMPTY, 0L, future) ); futures.add(future); } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/TestUtils.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/TestUtils.java index e3f9306a6f145..8196d983768fe 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/TestUtils.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/TestUtils.java @@ -316,7 +316,7 @@ private UnsupportedOperationException unsupportedException() { public static class NoopBlobStoreCacheService extends BlobStoreCacheService { public NoopBlobStoreCacheService() { - super(null, mock(Client.class), SNAPSHOT_BLOB_CACHE_INDEX, () -> 0L); + super(null, mock(Client.class), SNAPSHOT_BLOB_CACHE_INDEX); } @Override @@ -345,7 +345,7 @@ public static class SimpleBlobStoreCacheService extends BlobStoreCacheService { private final ConcurrentHashMap blobs = new ConcurrentHashMap<>(); public SimpleBlobStoreCacheService() { - super(null, mock(Client.class), SNAPSHOT_BLOB_CACHE_INDEX, System::currentTimeMillis); + super(null, mock(Client.class), SNAPSHOT_BLOB_CACHE_INDEX); } @Override From 70b7666719620aaf0e0feac645bf4f9b9f604d57 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 30 Sep 2021 17:56:33 +0200 Subject: [PATCH 06/15] keep existing snapshots + use creation time --- ...tsBlobStoreCacheMaintenanceIntegTests.java | 108 +++++++++++++-- .../SearchableSnapshots.java | 3 +- .../BlobStoreCacheMaintenanceService.java | 126 ++++++++++++------ .../cache/blob/CachedBlob.java | 4 + 4 files changed, 192 insertions(+), 49 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java index 9b6e969c2af3e..dde15068cf713 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.searchablesnapshots.cache.blob; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; @@ -15,32 +16,42 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.LuceneFilesExtensions; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.reindex.ReindexPlugin; +import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest.Storage; import org.elasticsearch.xpack.searchablesnapshots.BaseFrozenSearchableSnapshotsIntegTestCase; import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots; +import org.elasticsearch.xpack.searchablesnapshots.cache.common.ByteRange; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.stream.Collectors; -import java.util.stream.Stream; import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -199,16 +210,29 @@ public void testPeriodicMaintenance() throws Exception { createRepository("repo", FsRepository.TYPE); Map> mountedIndices = mountRandomIndicesWithCache("repo", 1, 3); ensureYellow(SNAPSHOT_BLOB_CACHE_INDEX); + + final long nbEntriesInCacheForMountedIndices = mountedIndices.values().stream().mapToLong(Tuple::v2).sum(); refreshSystemIndex(true); - assertThat(numberOfEntriesInCache(), equalTo(mountedIndices.values().stream().mapToLong(Tuple::v2).sum())); + assertThat(numberOfEntriesInCache(), equalTo(nbEntriesInCacheForMountedIndices)); createRepository("other", FsRepository.TYPE); Map> otherMountedIndices = mountRandomIndicesWithCache("other", 1, 3); + + final long nbEntriesInCacheForOtherIndices = otherMountedIndices.values().stream().mapToLong(Tuple::v2).sum(); refreshSystemIndex(true); - assertThat( - numberOfEntriesInCache(), - equalTo(Stream.concat(mountedIndices.values().stream(), otherMountedIndices.values().stream()).mapToLong(Tuple::v2).sum()) - ); + assertThat(numberOfEntriesInCache(), equalTo(nbEntriesInCacheForMountedIndices + nbEntriesInCacheForOtherIndices)); + + final int oldDocsInCache; + if (randomBoolean()) { + oldDocsInCache = indexRandomDocsInCache(1, 50, Instant.now().minus(Duration.ofDays(7L)).toEpochMilli()); + refreshSystemIndex(true); + assertThat( + numberOfEntriesInCache(), + equalTo(nbEntriesInCacheForMountedIndices + nbEntriesInCacheForOtherIndices + oldDocsInCache) + ); + } else { + oldDocsInCache = 0; + } // creates a backup of the system index cache to be restored later createRepository("backup", FsRepository.TYPE); @@ -248,6 +272,38 @@ public void testPeriodicMaintenance() throws Exception { assertThat(restoreResponse.getRestoreInfo().successfulShards(), equalTo(1)); assertThat(restoreResponse.getRestoreInfo().failedShards(), equalTo(0)); + final int recentDocsInCache; + if (randomBoolean()) { + // recent as in the future, actually + recentDocsInCache = indexRandomDocsInCache(1, 50, Instant.now().plus(Duration.ofDays(10L)).toEpochMilli()); + } else { + recentDocsInCache = 0; + } + + // only very old docs should have been deleted + assertBusy(() -> { + refreshSystemIndex(true); + assertThat( + numberOfEntriesInCache(), + equalTo(nbEntriesInCacheForMountedIndices + nbEntriesInCacheForOtherIndices + recentDocsInCache) + ); + }, 30L, TimeUnit.SECONDS); + + // updating the retention period from 1H to immediate + assertAcked( + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder() + .put( + BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_RETENTION_PERIOD.getKey(), + TimeValue.timeValueSeconds(0L) + ) + ) + ); + + // only used documents should remain final long expectNumberOfRemainingCacheEntries = otherMountedIndices.entrySet() .stream() .filter(e -> indicesToDelete.contains(e.getKey()) == false) @@ -256,7 +312,7 @@ public void testPeriodicMaintenance() throws Exception { assertBusy(() -> { refreshSystemIndex(true); - assertThat(numberOfEntriesInCache(), equalTo(expectNumberOfRemainingCacheEntries)); + assertThat(numberOfEntriesInCache(), equalTo(expectNumberOfRemainingCacheEntries + recentDocsInCache)); }, 30L, TimeUnit.SECONDS); } finally { @@ -265,7 +321,9 @@ public void testPeriodicMaintenance() throws Exception { .cluster() .prepareUpdateSettings() .setTransientSettings( - Settings.builder().putNull(BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING.getKey()) + Settings.builder() + .putNull(BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING.getKey()) + .putNull(BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_RETENTION_PERIOD.getKey()) ) ); } @@ -364,4 +422,38 @@ private Map> mountRandomIndicesWithCache(String re } return Collections.unmodifiableMap(mountedIndices); } + + private int indexRandomDocsInCache(final int minDocs, final int maxDocs, final long creationTimeInEpochMillis) { + final int nbDocs = randomIntBetween(minDocs, maxDocs); + final CountDownLatch latch = new CountDownLatch(nbDocs); + + String repository = randomAlphaOfLength(5).toLowerCase(Locale.ROOT); + SnapshotId snapshotId = new SnapshotId("snap", UUIDs.randomBase64UUID()); + IndexId indexId = new IndexId("index", UUIDs.randomBase64UUID()); + ShardId shardId = new ShardId("index", UUIDs.randomBase64UUID(), randomInt(5)); + String fileName = randomAlphaOfLength(5).toLowerCase(Locale.ROOT) + '.' + randomFrom(LuceneFilesExtensions.values()).getExtension(); + byte[] bytes = randomByteArrayOfLength(randomIntBetween(1, 64)); + + final BlobStoreCacheService blobStoreCacheService = internalCluster().getDataNodeInstance(BlobStoreCacheService.class); + for (int i = 0; i < nbDocs; i++) { + int length = randomIntBetween(1, Math.max(1, bytes.length - 1)); + blobStoreCacheService.putAsync( + repository, + snapshotId, + indexId, + shardId, + fileName, + ByteRange.of(i, i + length), + new BytesArray(bytes, 0, length), + creationTimeInEpochMillis, + ActionListener.wrap(latch::countDown) + ); + } + try { + latch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + return nbDocs; + } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index 918318a66524b..c75a90890894b 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -316,7 +316,8 @@ public List> getSettings() { FrozenCacheService.SNAPSHOT_CACHE_MIN_TIME_DELTA_SETTING, BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING, BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_KEEP_ALIVE_SETTING, - BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_BATCH_SIZE_SETTING + BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_BATCH_SIZE_SETTING, + BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_RETENTION_PERIOD ); } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java index 93e51f85c1804..f514da4819560 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java @@ -62,9 +62,13 @@ import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; +import java.time.Instant; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; +import java.util.Map; import java.util.Objects; import java.util.Queue; import java.util.Set; @@ -122,6 +126,18 @@ public class BlobStoreCacheMaintenanceService implements ClusterStateListener { Setting.Property.Dynamic ); + /** + * The retention period to keep obsolete documents in the blob store cache index. This duration is used during the periodic cleanup in + * order to avoid deleting documents belonging to concurrently mounted searchable snapshots. Defaults to 1h. + */ + public static final Setting SNAPSHOT_SNAPSHOT_CLEANUP_RETENTION_PERIOD = Setting.timeSetting( + "searchable_snapshots.blob_cache.periodic_cleanup.retention_period", + TimeValue.timeValueHours(1L), + TimeValue.ZERO, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + private final ClusterService clusterService; private final Client clientWithOrigin; private final String systemIndexName; @@ -130,6 +146,7 @@ public class BlobStoreCacheMaintenanceService implements ClusterStateListener { private volatile Scheduler.Cancellable periodicTask; private volatile TimeValue periodicTaskInterval; private volatile TimeValue periodicTaskKeepAlive; + private volatile TimeValue periodicTaskRetention; private volatile int periodicTaskBatchSize; private volatile boolean schedulePeriodic; @@ -147,10 +164,12 @@ public BlobStoreCacheMaintenanceService( this.periodicTaskInterval = SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING.get(settings); this.periodicTaskKeepAlive = SNAPSHOT_SNAPSHOT_CLEANUP_KEEP_ALIVE_SETTING.get(settings); this.periodicTaskBatchSize = SNAPSHOT_SNAPSHOT_CLEANUP_BATCH_SIZE_SETTING.get(settings); + this.periodicTaskRetention = SNAPSHOT_SNAPSHOT_CLEANUP_RETENTION_PERIOD.get(settings); final ClusterSettings clusterSettings = clusterService.getClusterSettings(); clusterSettings.addSettingsUpdateConsumer(SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING, this::setPeriodicTaskInterval); clusterSettings.addSettingsUpdateConsumer(SNAPSHOT_SNAPSHOT_CLEANUP_KEEP_ALIVE_SETTING, this::setPeriodicTaskKeepAlive); clusterSettings.addSettingsUpdateConsumer(SNAPSHOT_SNAPSHOT_CLEANUP_BATCH_SIZE_SETTING, this::setPeriodicTaskBatchSize); + clusterSettings.addSettingsUpdateConsumer(SNAPSHOT_SNAPSHOT_CLEANUP_RETENTION_PERIOD, this::setPeriodicTaskRetention); } @Override @@ -182,6 +201,10 @@ private void setPeriodicTaskKeepAlive(TimeValue keepAlive) { this.periodicTaskKeepAlive = keepAlive; } + public void setPeriodicTaskRetention(TimeValue retention) { + this.periodicTaskRetention = retention; + } + public void setPeriodicTaskBatchSize(int batchSize) { this.periodicTaskBatchSize = batchSize; } @@ -239,6 +262,21 @@ private static boolean hasSearchableSnapshotWith(final ClusterState state, final return false; } + private static Map> listSearchableSnapshots(final ClusterState state) { + Map> snapshots = null; + for (IndexMetadata indexMetadata : state.metadata()) { + final Settings indexSettings = indexMetadata.getSettings(); + if (SearchableSnapshotsSettings.isSearchableSnapshotStore(indexSettings)) { + if (snapshots == null) { + snapshots = new HashMap<>(); + } + snapshots.computeIfAbsent(SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings), s -> new HashSet<>()) + .add(SNAPSHOT_INDEX_ID_SETTING.get(indexSettings)); + } + } + return snapshots != null ? Collections.unmodifiableMap(snapshots) : Collections.emptyMap(); + } + static QueryBuilder buildDeleteByQuery(int numberOfShards, String snapshotUuid, String indexUuid) { final Set paths = IntStream.range(0, numberOfShards) .mapToObj(shard -> String.join("/", snapshotUuid, indexUuid, String.valueOf(shard))) @@ -375,6 +413,7 @@ private class PeriodicMaintenanceTask implements Runnable, Releasable { private final AtomicLong deletes = new AtomicLong(); private final AtomicLong total = new AtomicLong(); + private volatile Map> existingSnapshots; private volatile SearchResponse searchResponse; private volatile String pointIntTimeId; private volatile Object[] searchAfter; @@ -452,8 +491,17 @@ public void onFailure(Exception e) { final ClusterState state = clusterService.state(); final BulkRequest bulkRequest = new BulkRequest(); - final Set> knownSnapshots = new HashSet<>(); - final Set> missingSnapshots = new HashSet<>(); + final TimeValue retention = periodicTaskRetention; + final Instant expirationTime = Instant.ofEpochMilli(threadPool.absoluteTimeInMillis()) + .minus(retention.duration(), retention.timeUnit().toChronoUnit()); + + // compute the list of existing searchable snapshots once + Map> knownSnapshots = existingSnapshots; + if (knownSnapshots == null) { + knownSnapshots = listSearchableSnapshots(state); + existingSnapshots = knownSnapshots; + } + final Set knownRepositories = state.metadata() .custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY) .repositories() @@ -463,51 +511,49 @@ public void onFailure(Exception e) { Object[] lastSortValues = null; for (SearchHit searchHit : searchHits) { + lastSortValues = searchHit.getSortValues(); assert searchHit.getId() != null; assert searchHit.hasSource(); - // See {@link BlobStoreCacheService#generateId} - // doc id = {repository name}/{snapshot id}/{snapshot index id}/{shard id}/{file name}/@{file offset} - final String[] parts = Objects.requireNonNull(searchHit.getId()).split("/"); - assert parts.length == 6 : Arrays.toString(parts) + " vs " + searchHit.getId(); + try { + final CachedBlob cachedBlob = CachedBlob.fromSource(searchHit.getSourceAsMap()); + if (cachedBlob.creationTime().isAfter(expirationTime)) { + logger.trace("blob store cache entry with id [{}] was created recently, skipping", searchHit.getId()); + continue; + } + boolean delete = false; - boolean delete = false; - lastSortValues = searchHit.getSortValues(); + // See {@link BlobStoreCacheService#generateId} + // doc id = {repository name}/{snapshot id}/{snapshot index id}/{shard id}/{file name}/@{file offset} + final String[] parts = Objects.requireNonNull(searchHit.getId()).split("/"); + assert parts.length == 6 : Arrays.toString(parts) + " vs " + searchHit.getId(); - final String repositoryName = parts[0]; - if (knownRepositories.contains(repositoryName) == false) { - logger.trace( - "deleting blob store cache entry [id:{}, repository:{}, reason: repository does not exist]", - searchHit.getId(), - repositoryName - ); - delete = true; - } else { - final Tuple snapshot = Tuple.tuple(parts[1], parts[2]); - boolean isMissing = missingSnapshots.contains(snapshot); - boolean isKnown = knownSnapshots.contains(snapshot); - if (isMissing - || (isKnown == false && hasSearchableSnapshotWith(state, snapshot.v1(), snapshot.v2()) == false)) { - logger.trace( - "deleting blob store cache entry [id:{}, snapshotId:{}, indexId:{}, reason: unused]", - searchHit.getId(), - snapshot.v1(), - snapshot.v2() - ); - if (isMissing == false) { - missingSnapshots.add(snapshot); - } + final String repositoryName = parts[0]; + if (knownRepositories.contains(repositoryName) == false) { + logger.trace("deleting blob store cache entry with id [{}]: repository does not exist", searchHit.getId()); delete = true; - } else if (isKnown == false) { - knownSnapshots.add(snapshot); + } else { + final Set knownIndexIds = knownSnapshots.get(parts[1]); + if (knownIndexIds == null || knownIndexIds.contains(parts[2]) == false) { + logger.trace("deleting blob store cache entry with id [{}]: not used", searchHit.getId()); + delete = true; + } } - } - if (delete) { - final DeleteRequest deleteRequest = new DeleteRequest().index(searchHit.getIndex()); - deleteRequest.id(searchHit.getId()); - deleteRequest.setIfSeqNo(searchHit.getSeqNo()); - deleteRequest.setIfPrimaryTerm(searchHit.getPrimaryTerm()); - bulkRequest.add(deleteRequest); + if (delete) { + final DeleteRequest deleteRequest = new DeleteRequest().index(searchHit.getIndex()); + deleteRequest.id(searchHit.getId()); + deleteRequest.setIfSeqNo(searchHit.getSeqNo()); + deleteRequest.setIfPrimaryTerm(searchHit.getPrimaryTerm()); + bulkRequest.add(deleteRequest); + } + } catch (Exception e) { + logger.warn( + () -> new ParameterizedMessage( + "exception when parsing blob store cache entry with id [{}], skipping", + searchHit.getId() + ), + e + ); } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/CachedBlob.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/CachedBlob.java index 5f531b87b5f4a..d5fe34e6c2a3c 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/CachedBlob.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/CachedBlob.java @@ -121,6 +121,10 @@ public Version version() { return version; } + public Instant creationTime() { + return creationTime; + } + @SuppressWarnings("unchecked") public static CachedBlob fromSource(final Map source) { final Long creationTimeEpochMillis = (Long) source.get("creation_time"); From 2a84b4a65854a586054cbe73fad7192e5815a0ef Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 1 Oct 2021 10:14:41 +0200 Subject: [PATCH 07/15] track total hits 1st search --- .../cache/blob/BlobStoreCacheMaintenanceService.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java index 86d90fc45d730..7b907257ca2a3 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java @@ -462,14 +462,17 @@ public void onFailure(Exception e) { searchSource.trackScores(false); searchSource.sort("_shard_doc"); searchSource.size(batchSize); + if (searchAfter != null) { + searchSource.searchAfter(searchAfter); + searchSource.trackTotalHits(false); + } else { + searchSource.trackTotalHits(true); + } final PointInTimeBuilder pointInTime = new PointInTimeBuilder(pitId); - searchSource.pointInTimeBuilder(pointInTime); pointInTime.setKeepAlive(keepAlive); + searchSource.pointInTimeBuilder(pointInTime); final SearchRequest searchRequest = new SearchRequest(); searchRequest.source(searchSource); - if (searchAfter != null) { - searchSource.searchAfter(searchAfter); - } clientWithOrigin.execute(SearchAction.INSTANCE, searchRequest, new ActionListener<>() { @Override public void onResponse(SearchResponse response) { From 15ca8cdd9394fc5e1c9d2c9abec84525a64c4b13 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 1 Oct 2021 11:22:17 +0200 Subject: [PATCH 08/15] docvalue + expiration time fields --- .../BlobStoreCacheMaintenanceService.java | 66 ++++++++++++------- .../cache/blob/CachedBlob.java | 3 +- 2 files changed, 44 insertions(+), 25 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java index 7b907257ca2a3..c989b6a00a118 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java @@ -39,6 +39,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.document.DocumentField; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -58,6 +59,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.PointInTimeBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.ShardDocSortField; import org.elasticsearch.snapshots.SearchableSnapshotsSettings; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; @@ -415,7 +417,9 @@ private class PeriodicMaintenanceTask implements Runnable, Releasable { private final AtomicLong total = new AtomicLong(); private volatile Map> existingSnapshots; + private volatile Set existingRepositories; private volatile SearchResponse searchResponse; + private volatile Instant expirationTime; private volatile String pointIntTimeId; private volatile Object[] searchAfter; @@ -459,8 +463,10 @@ public void onFailure(Exception e) { if (searchResponse == null) { final SearchSourceBuilder searchSource = new SearchSourceBuilder(); + searchSource.fetchField(CachedBlob.CREATION_TIME_FIELD); + searchSource.fetchSource(false); searchSource.trackScores(false); - searchSource.sort("_shard_doc"); + searchSource.sort(ShardDocSortField.NAME); searchSource.size(batchSize); if (searchAfter != null) { searchSource.searchAfter(searchAfter); @@ -476,7 +482,9 @@ public void onFailure(Exception e) { clientWithOrigin.execute(SearchAction.INSTANCE, searchRequest, new ActionListener<>() { @Override public void onResponse(SearchResponse response) { - maintenanceTask.total.compareAndSet(0L, response.getHits().getTotalHits().value); + if (searchAfter == null) { + maintenanceTask.total.compareAndSet(0L, response.getHits().getTotalHits().value); + } maintenanceTask.searchResponse = response; maintenanceTask.searchAfter = null; executeNext(maintenanceTask); @@ -492,36 +500,37 @@ public void onFailure(Exception e) { final SearchHit[] searchHits = searchResponse.getHits().getHits(); if (searchHits != null && searchHits.length > 0) { - final ClusterState state = clusterService.state(); - final BulkRequest bulkRequest = new BulkRequest(); - - final TimeValue retention = periodicTaskRetention; - final Instant expirationTime = Instant.ofEpochMilli(threadPool.absoluteTimeInMillis()) - .minus(retention.duration(), retention.timeUnit().toChronoUnit()); - - // compute the list of existing searchable snapshots once - Map> knownSnapshots = existingSnapshots; - if (knownSnapshots == null) { - knownSnapshots = listSearchableSnapshots(state); - existingSnapshots = knownSnapshots; + if (expirationTime == null) { + final TimeValue retention = periodicTaskRetention; + expirationTime = Instant.ofEpochMilli(threadPool.absoluteTimeInMillis()) + .minus(retention.duration(), retention.timeUnit().toChronoUnit()); + + final ClusterState state = clusterService.state(); + // compute the list of existing searchable snapshots and repositories once + existingSnapshots = listSearchableSnapshots(state); + existingRepositories = state.metadata() + .custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY) + .repositories() + .stream() + .map(RepositoryMetadata::name) + .collect(Collectors.toSet()); } - final Set knownRepositories = state.metadata() - .custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY) - .repositories() - .stream() - .map(RepositoryMetadata::name) - .collect(Collectors.toSet()); + final BulkRequest bulkRequest = new BulkRequest(); + final Map> knownSnapshots = existingSnapshots; + assert knownSnapshots != null; + final Set knownRepositories = existingRepositories; + assert knownRepositories != null; + final Instant expirationTime = this.expirationTime; + assert expirationTime != null; Object[] lastSortValues = null; for (SearchHit searchHit : searchHits) { lastSortValues = searchHit.getSortValues(); assert searchHit.getId() != null; - assert searchHit.hasSource(); - try { - final CachedBlob cachedBlob = CachedBlob.fromSource(searchHit.getSourceAsMap()); - if (cachedBlob.creationTime().isAfter(expirationTime)) { + final Instant creationTime = getCreationTime(searchHit); + if (creationTime.isAfter(expirationTime)) { logger.trace("blob store cache entry with id [{}] was created recently, skipping", searchHit.getId()); continue; } @@ -684,4 +693,13 @@ public void onFailure(Exception e) { } } } + + private static Instant getCreationTime(SearchHit searchHit) { + final DocumentField creationTimeField = searchHit.field(CachedBlob.CREATION_TIME_FIELD); + assert creationTimeField != null; + final Object creationTimeValue = creationTimeField.getValue(); + assert creationTimeValue != null; + assert creationTimeValue instanceof String : "expect a java.lang.String but got " + creationTimeValue.getClass(); + return Instant.ofEpochMilli(Long.parseLong(creationTimeField.getValue())); + } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/CachedBlob.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/CachedBlob.java index d5fe34e6c2a3c..d67b1471002da 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/CachedBlob.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/CachedBlob.java @@ -31,6 +31,7 @@ public class CachedBlob implements ToXContent { public static final CachedBlob CACHE_MISS = new CachedBlob(null, null, null, "CACHE_MISS", null, BytesArray.EMPTY, 0L, 0L); private static final String TYPE = "blob"; + public static final String CREATION_TIME_FIELD = "creation_time"; private final Instant creationTime; private final Version version; @@ -127,7 +128,7 @@ public Instant creationTime() { @SuppressWarnings("unchecked") public static CachedBlob fromSource(final Map source) { - final Long creationTimeEpochMillis = (Long) source.get("creation_time"); + final Long creationTimeEpochMillis = (Long) source.get(CREATION_TIME_FIELD); if (creationTimeEpochMillis == null) { throw new IllegalStateException("cached blob document does not have the [creation_time] field"); } From 1966a839313560d0d624528417e131c09072e6eb Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 1 Oct 2021 11:25:56 +0200 Subject: [PATCH 09/15] log instead of delete --- .../blob/BlobStoreCacheMaintenanceService.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java index c989b6a00a118..cd97ab93aadeb 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java @@ -529,11 +529,6 @@ public void onFailure(Exception e) { lastSortValues = searchHit.getSortValues(); assert searchHit.getId() != null; try { - final Instant creationTime = getCreationTime(searchHit); - if (creationTime.isAfter(expirationTime)) { - logger.trace("blob store cache entry with id [{}] was created recently, skipping", searchHit.getId()); - continue; - } boolean delete = false; // See {@link BlobStoreCacheService#generateId} @@ -553,6 +548,15 @@ public void onFailure(Exception e) { } } if (delete) { + final Instant creationTime = getCreationTime(searchHit); + if (creationTime.isAfter(expirationTime)) { + logger.trace( + "blob store cache entry with id [{}] was created recently, skipping deletion", + searchHit.getId() + ); + continue; + } + final DeleteRequest deleteRequest = new DeleteRequest().index(searchHit.getIndex()); deleteRequest.id(searchHit.getId()); deleteRequest.setIfSeqNo(searchHit.getSeqNo()); From 1d0654f0d12f8432c045f668561630052f6435fc Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 1 Oct 2021 11:31:04 +0200 Subject: [PATCH 10/15] remove seq no --- .../cache/blob/BlobStoreCacheMaintenanceService.java | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java index cd97ab93aadeb..303f1289d0850 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java @@ -306,8 +306,8 @@ protected void doRun() { for (Index deletedIndex : event.indicesDeleted()) { final IndexMetadata indexMetadata = event.previousState().metadata().index(deletedIndex); - assert indexMetadata != null || state.metadata().indexGraveyard().containsIndex(deletedIndex) - : "no previous metadata found for " + deletedIndex; + assert indexMetadata != null + || state.metadata().indexGraveyard().containsIndex(deletedIndex) : "no previous metadata found for " + deletedIndex; if (indexMetadata != null) { final Settings indexSetting = indexMetadata.getSettings(); if (SearchableSnapshotsSettings.isSearchableSnapshotStore(indexSetting)) { @@ -556,12 +556,7 @@ public void onFailure(Exception e) { ); continue; } - - final DeleteRequest deleteRequest = new DeleteRequest().index(searchHit.getIndex()); - deleteRequest.id(searchHit.getId()); - deleteRequest.setIfSeqNo(searchHit.getSeqNo()); - deleteRequest.setIfPrimaryTerm(searchHit.getPrimaryTerm()); - bulkRequest.add(deleteRequest); + bulkRequest.add(new DeleteRequest().index(searchHit.getIndex()).id(searchHit.getId())); } } catch (Exception e) { logger.warn( From f963685226a46351563dd8efc6b56b86021dccc4 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 1 Oct 2021 11:33:45 +0200 Subject: [PATCH 11/15] unnecessary --- ...rchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java index dde15068cf713..93bea63fccf85 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java @@ -222,16 +222,13 @@ public void testPeriodicMaintenance() throws Exception { refreshSystemIndex(true); assertThat(numberOfEntriesInCache(), equalTo(nbEntriesInCacheForMountedIndices + nbEntriesInCacheForOtherIndices)); - final int oldDocsInCache; if (randomBoolean()) { - oldDocsInCache = indexRandomDocsInCache(1, 50, Instant.now().minus(Duration.ofDays(7L)).toEpochMilli()); + final int oldDocsInCache = indexRandomDocsInCache(1, 50, Instant.now().minus(Duration.ofDays(7L)).toEpochMilli()); refreshSystemIndex(true); assertThat( numberOfEntriesInCache(), equalTo(nbEntriesInCacheForMountedIndices + nbEntriesInCacheForOtherIndices + oldDocsInCache) ); - } else { - oldDocsInCache = 0; } // creates a backup of the system index cache to be restored later @@ -246,7 +243,6 @@ public void testPeriodicMaintenance() throws Exception { assertAcked(client().admin().cluster().prepareDeleteRepository("repo")); ensureClusterStateConsistency(); - refreshSystemIndex(false); assertThat(numberOfEntriesInCache(), equalTo(0L)); assertAcked( From 03d178bc7591b0d3a758eecd7f45380791faf4b0 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 1 Oct 2021 11:38:07 +0200 Subject: [PATCH 12/15] active --- .../cache/blob/BlobStoreCacheMaintenanceService.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java index 303f1289d0850..a1c544e1833be 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java @@ -181,7 +181,9 @@ public void clusterChanged(ClusterChangedEvent event) { return; // state not fully recovered } final ShardRouting primary = systemIndexPrimaryShard(state); - if (primary == null || Objects.equals(state.nodes().getLocalNodeId(), primary.currentNodeId()) == false) { + if (primary == null + || primary.active() == false + || Objects.equals(state.nodes().getLocalNodeId(), primary.currentNodeId()) == false) { // system index primary shard does not exist or is not assigned to this data node stopPeriodicTask(); return; From b009c35094d88308c6284012f9d0a760dc1e95cb Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 1 Oct 2021 11:56:40 +0200 Subject: [PATCH 13/15] complete --- .../BlobStoreCacheMaintenanceService.java | 122 +++++++++--------- 1 file changed, 60 insertions(+), 62 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java index a1c544e1833be..942f5554ce1c8 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java @@ -308,8 +308,8 @@ protected void doRun() { for (Index deletedIndex : event.indicesDeleted()) { final IndexMetadata indexMetadata = event.previousState().metadata().index(deletedIndex); - assert indexMetadata != null - || state.metadata().indexGraveyard().containsIndex(deletedIndex) : "no previous metadata found for " + deletedIndex; + assert indexMetadata != null || state.metadata().indexGraveyard().containsIndex(deletedIndex) + : "no previous metadata found for " + deletedIndex; if (indexMetadata != null) { final Settings indexSetting = indexMetadata.getSettings(); if (SearchableSnapshotsSettings.isSearchableSnapshotStore(indexSetting)) { @@ -432,9 +432,7 @@ private class PeriodicMaintenanceTask implements Runnable, Releasable { @Override public void run() { - final PeriodicMaintenanceTask maintenanceTask = this; assert assertGenericThread(); - try { ensureOpen(); if (pointIntTimeId == null) { @@ -444,16 +442,16 @@ public void run() { @Override public void onResponse(OpenPointInTimeResponse response) { logger.trace("periodic maintenance task initialized with point-in-time id [{}]", response.getPointInTimeId()); - maintenanceTask.pointIntTimeId = response.getPointInTimeId(); - executeNext(maintenanceTask); + PeriodicMaintenanceTask.this.pointIntTimeId = response.getPointInTimeId(); + executeNext(PeriodicMaintenanceTask.this); } @Override public void onFailure(Exception e) { if (TransportActions.isShardNotAvailableException(e)) { - complete(maintenanceTask, null); + complete(null); } else { - complete(maintenanceTask, e); + complete(e); } } }); @@ -485,16 +483,16 @@ public void onFailure(Exception e) { @Override public void onResponse(SearchResponse response) { if (searchAfter == null) { - maintenanceTask.total.compareAndSet(0L, response.getHits().getTotalHits().value); + PeriodicMaintenanceTask.this.total.compareAndSet(0L, response.getHits().getTotalHits().value); } - maintenanceTask.searchResponse = response; - maintenanceTask.searchAfter = null; - executeNext(maintenanceTask); + PeriodicMaintenanceTask.this.searchResponse = response; + PeriodicMaintenanceTask.this.searchAfter = null; + executeNext(PeriodicMaintenanceTask.this); } @Override public void onFailure(Exception e) { - complete(maintenanceTask, e); + complete(e); } }); return; @@ -586,25 +584,25 @@ public void onResponse(BulkResponse response) { for (BulkItemResponse itemResponse : response.getItems()) { if (itemResponse.isFailed() == false) { assert itemResponse.getResponse() instanceof DeleteResponse; - maintenanceTask.deletes.incrementAndGet(); + PeriodicMaintenanceTask.this.deletes.incrementAndGet(); } } - maintenanceTask.searchResponse = null; - maintenanceTask.searchAfter = finalSearchAfter; - executeNext(maintenanceTask); + PeriodicMaintenanceTask.this.searchResponse = null; + PeriodicMaintenanceTask.this.searchAfter = finalSearchAfter; + executeNext(PeriodicMaintenanceTask.this); } @Override public void onFailure(Exception e) { - complete(maintenanceTask, e); + complete(e); } }); return; } // we're done, complete the task - complete(this, null); + complete(null); } catch (Exception e) { - complete(this, e); + complete(e); } } @@ -644,10 +642,44 @@ public void close() { } } - private boolean assertGenericThread() { - final String threadName = Thread.currentThread().getName(); - assert threadName.contains(ThreadPool.Names.GENERIC) : threadName; - return true; + private void complete(@Nullable Exception failure) { + assert isClosed() == false; + final Releasable releasable = () -> { + try { + final Exception previous = error.getAndSet(failure); + assert previous == null : "periodic maintenance task already failed: " + previous; + close(); + } finally { + startPeriodicTask(); + } + }; + boolean waitForRelease = false; + try { + final String pitId = pointIntTimeId; + if (Strings.hasLength(pitId)) { + final ClosePointInTimeRequest closeRequest = new ClosePointInTimeRequest(pitId); + clientWithOrigin.execute(ClosePointInTimeAction.INSTANCE, closeRequest, ActionListener.runAfter(new ActionListener<>() { + @Override + public void onResponse(ClosePointInTimeResponse response) { + if (response.isSucceeded()) { + logger.debug("periodic maintenance task successfully closed point-in-time id [{}]", pitId); + } else { + logger.debug("point-in-time id [{}] not found", pitId); + } + } + + @Override + public void onFailure(Exception e) { + logger.warn(() -> new ParameterizedMessage("failed to close point-in-time id [{}]", pitId), e); + } + }, () -> Releasables.close(releasable))); + waitForRelease = true; + } + } finally { + if (waitForRelease == false) { + Releasables.close(releasable); + } + } } } @@ -655,44 +687,10 @@ private void executeNext(PeriodicMaintenanceTask maintenanceTask) { threadPool.generic().execute(maintenanceTask); } - private void complete(PeriodicMaintenanceTask maintenanceTask, @Nullable Exception failure) { - assert maintenanceTask.isClosed() == false; - final Releasable releasable = () -> { - try { - final Exception previous = maintenanceTask.error.getAndSet(failure); - assert previous == null : "periodic maintenance task already failed: " + previous; - maintenanceTask.close(); - } finally { - startPeriodicTask(); - } - }; - boolean waitForRelease = false; - try { - final String pitId = maintenanceTask.pointIntTimeId; - if (Strings.hasLength(pitId)) { - final ClosePointInTimeRequest closeRequest = new ClosePointInTimeRequest(pitId); - clientWithOrigin.execute(ClosePointInTimeAction.INSTANCE, closeRequest, ActionListener.runAfter(new ActionListener<>() { - @Override - public void onResponse(ClosePointInTimeResponse response) { - if (response.isSucceeded()) { - logger.debug("periodic maintenance task successfully closed point-in-time id [{}]", pitId); - } else { - logger.debug("point-in-time id [{}] not found", pitId); - } - } - - @Override - public void onFailure(Exception e) { - logger.warn(() -> new ParameterizedMessage("failed to close point-in-time id [{}]", pitId), e); - } - }, () -> Releasables.close(releasable))); - waitForRelease = true; - } - } finally { - if (waitForRelease == false) { - Releasables.close(releasable); - } - } + private static boolean assertGenericThread() { + final String threadName = Thread.currentThread().getName(); + assert threadName.contains(ThreadPool.Names.GENERIC) : threadName; + return true; } private static Instant getCreationTime(SearchHit searchHit) { From 8909ab8c291abb85d036dccb8d0672898dae2af4 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 1 Oct 2021 16:31:20 +0200 Subject: [PATCH 14/15] doc --- .../searchable-snapshots/index.asciidoc | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/docs/reference/searchable-snapshots/index.asciidoc b/docs/reference/searchable-snapshots/index.asciidoc index d615bb5dbfd96..e9a6d9a5f89fb 100644 --- a/docs/reference/searchable-snapshots/index.asciidoc +++ b/docs/reference/searchable-snapshots/index.asciidoc @@ -198,6 +198,34 @@ IMPORTANT: You can only configure these settings on nodes with the <> role. Additionally, nodes with a shared cache can only have a single <>. +{es} also uses a dedicated system index named `.snapshot-blob-cache` to speed +up the recoveries of {search-snap} shards. This index is used as an additional +caching layer on top of the partially or fully mounted data and contains the +minimal required data to start the {search-snap} shards. {es} automatically +deletes the documents that are no longer used in this index. This periodic +clean up can be tuned using the following settings: + +`searchable_snapshots.blob_cache.periodic_cleanup.interval`:: +(<>) +The interval at which the periodic cleanup of the `.snapshot-blob-cache` +index is scheduled. Defaults to every hour (`1h`). + +`searchable_snapshots.blob_cache.periodic_cleanup.retention_period`:: +(<>) +The retention period to keep obsolete documents in the `.snapshot-blob-cache` +index. Defaults to every hour (`1h`). + +`searchable_snapshots.blob_cache.periodic_cleanup.batch_size`:: +(<>) +The number of documents that are searched for and bulk-deleted at once during +the periodic cleanup of the `.snapshot-blob-cache` index. Defaults to `100`. + +`searchable_snapshots.blob_cache.periodic_cleanup.pit_keep_alive`:: +(<>) +The value used for the > +requests executed during the periodic cleanup of the `.snapshot-blob-cache` +index. Defaults to `10m`. + [discrete] [[searchable-snapshots-costs]] === Reduce costs with {search-snaps} From c8d1067592be52667d2eb365ba009158cd0aeac3 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 4 Oct 2021 10:03:48 +0200 Subject: [PATCH 15/15] nits --- .../cache/blob/BlobStoreCacheMaintenanceService.java | 6 ++++-- .../xpack/searchablesnapshots/cache/blob/CachedBlob.java | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java index 942f5554ce1c8..d24265dfd5979 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java @@ -59,6 +59,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.PointInTimeBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.fetch.subphase.FieldAndFormat; import org.elasticsearch.search.sort.ShardDocSortField; import org.elasticsearch.snapshots.SearchableSnapshotsSettings; import org.elasticsearch.threadpool.Scheduler; @@ -463,7 +464,7 @@ public void onFailure(Exception e) { if (searchResponse == null) { final SearchSourceBuilder searchSource = new SearchSourceBuilder(); - searchSource.fetchField(CachedBlob.CREATION_TIME_FIELD); + searchSource.fetchField(new FieldAndFormat(CachedBlob.CREATION_TIME_FIELD, "epoch_millis")); searchSource.fetchSource(false); searchSource.trackScores(false); searchSource.sort(ShardDocSortField.NAME); @@ -483,7 +484,8 @@ public void onFailure(Exception e) { @Override public void onResponse(SearchResponse response) { if (searchAfter == null) { - PeriodicMaintenanceTask.this.total.compareAndSet(0L, response.getHits().getTotalHits().value); + assert PeriodicMaintenanceTask.this.total.get() == 0L; + PeriodicMaintenanceTask.this.total.set(response.getHits().getTotalHits().value); } PeriodicMaintenanceTask.this.searchResponse = response; PeriodicMaintenanceTask.this.searchAfter = null; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/CachedBlob.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/CachedBlob.java index d67b1471002da..8bb08554829a9 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/CachedBlob.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/CachedBlob.java @@ -81,7 +81,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(); { builder.field("type", TYPE); - builder.field("creation_time", creationTime.toEpochMilli()); + builder.field(CREATION_TIME_FIELD, creationTime.toEpochMilli()); builder.field("version", version.id); builder.field("repository", repository); builder.startObject("blob");