diff --git a/docs/reference/searchable-snapshots/index.asciidoc b/docs/reference/searchable-snapshots/index.asciidoc index d9c56827feef2..7e4bc755555bd 100644 --- a/docs/reference/searchable-snapshots/index.asciidoc +++ b/docs/reference/searchable-snapshots/index.asciidoc @@ -200,6 +200,34 @@ node that does not have the <> role, it will be is set to `0b`. Additionally, nodes with a shared cache can only have a single <>. +{es} also uses a dedicated system index named `.snapshot-blob-cache` to speed +up the recoveries of {search-snap} shards. This index is used as an additional +caching layer on top of the partially or fully mounted data and contains the +minimal required data to start the {search-snap} shards. {es} automatically +deletes the documents that are no longer used in this index. This periodic +clean up can be tuned using the following settings: + +`searchable_snapshots.blob_cache.periodic_cleanup.interval`:: +(<>) +The interval at which the periodic cleanup of the `.snapshot-blob-cache` +index is scheduled. Defaults to every hour (`1h`). + +`searchable_snapshots.blob_cache.periodic_cleanup.retention_period`:: +(<>) +The retention period to keep obsolete documents in the `.snapshot-blob-cache` +index. Defaults to every hour (`1h`). + +`searchable_snapshots.blob_cache.periodic_cleanup.batch_size`:: +(<>) +The number of documents that are searched for and bulk-deleted at once during +the periodic cleanup of the `.snapshot-blob-cache` index. Defaults to `100`. + +`searchable_snapshots.blob_cache.periodic_cleanup.pit_keep_alive`:: +(<>) +The value used for the > +requests executed during the periodic cleanup of the `.snapshot-blob-cache` +index. Defaults to `10m`. + [discrete] [[searchable-snapshots-costs]] === Reduce costs with {search-snaps} diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java index a34ad19c75f49..973f41a9a9ca2 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java @@ -7,30 +7,49 @@ package org.elasticsearch.xpack.searchablesnapshots.cache.blob; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.LuceneFilesExtensions; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.reindex.ReindexPlugin; +import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest.Storage; import org.elasticsearch.xpack.searchablesnapshots.BaseFrozenSearchableSnapshotsIntegTestCase; import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots; +import org.elasticsearch.xpack.searchablesnapshots.cache.common.ByteRange; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING; @@ -58,71 +77,17 @@ protected Collection> nodePlugins() { /** * Test that snapshot blob cache entries are deleted from the system index after the corresponding searchable snapshot index is deleted */ - public void testMaintenance() throws Exception { + public void testCleanUpAfterIndicesAreDeleted() throws Exception { final String repositoryName = "repository"; createRepository(repositoryName, FsRepository.TYPE); - final int nbIndices = randomIntBetween(3, 10); - - logger.info("--> generating [{}] indices with cached entries in system index...", nbIndices); - final Map mountedIndices = new HashMap<>(); - final Map mountedIndicesSettings = new HashMap<>(); - - int i = 0; - long previousNumberOfCachedEntries = 0; - while (mountedIndices.size() < nbIndices) { - final String indexName = "index-" + i; - createIndex(indexName); - - final List indexRequestBuilders = new ArrayList<>(); - for (int n = 100; n > 0; n--) { - indexRequestBuilders.add( - client().prepareIndex(indexName, SINGLE_MAPPING_NAME) - .setSource( - XContentFactory.smileBuilder() - .startObject() - .field("text", randomRealisticUnicodeOfCodepointLength(10)) - .endObject() - ) - ); - } - indexRandom(true, indexRequestBuilders); - - createSnapshot(repositoryName, "snapshot-" + i, Collections.singletonList(indexName)); - assertAcked(client().admin().indices().prepareDelete(indexName)); - - final String mountedIndex = "mounted-index-" + i; - mountSnapshot(repositoryName, "snapshot-" + i, "index-" + i, mountedIndex, Settings.EMPTY, randomFrom(Storage.values())); - - ensureGreen(mountedIndex); - assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME); - assertExecutorIsIdle(SearchableSnapshots.CACHE_PREWARMING_THREAD_POOL_NAME); - waitForBlobCacheFillsToComplete(); - - refreshSystemIndex(false); - - final long numberOfEntriesInCache = numberOfEntriesInCache(); - if (numberOfEntriesInCache > previousNumberOfCachedEntries) { - final long nbEntries = numberOfEntriesInCache - previousNumberOfCachedEntries; - logger.info("--> mounted index [{}] has [{}] entries in cache", mountedIndex, nbEntries); - mountedIndices.put(mountedIndex, nbEntries); - mountedIndicesSettings.put(mountedIndex, getIndexSettings(mountedIndex)); - - } else { - logger.info("--> mounted index [{}] did not generate any entry in cache, skipping", mountedIndex); - assertAcked(client().admin().indices().prepareDelete(mountedIndex)); - } - - previousNumberOfCachedEntries = numberOfEntriesInCache; - i += 1; - } - + final Map> mountedIndices = mountRandomIndicesWithCache(repositoryName, 3, 10); ensureYellow(SNAPSHOT_BLOB_CACHE_INDEX); refreshSystemIndex(true); final long numberOfEntriesInCache = numberOfEntriesInCache(); logger.info("--> found [{}] entries in snapshot blob cache", numberOfEntriesInCache); - assertThat(numberOfEntriesInCache, equalTo(mountedIndices.values().stream().mapToLong(l -> l).sum())); + assertThat(numberOfEntriesInCache, equalTo(mountedIndices.values().stream().mapToLong(Tuple::v2).sum())); final List indicesToDelete = randomSubsetOf(randomIntBetween(1, mountedIndices.size()), mountedIndices.keySet()); assertAcked(client().admin().indices().prepareDelete(indicesToDelete.toArray(new String[0]))); @@ -130,7 +95,7 @@ public void testMaintenance() throws Exception { final long expectedDeletedEntriesInCache = mountedIndices.entrySet() .stream() .filter(e -> indicesToDelete.contains(e.getKey())) - .mapToLong(Map.Entry::getValue) + .mapToLong(entry -> entry.getValue().v2()) .sum(); logger.info("--> deleting indices [{}] with [{}] entries in snapshot blob cache", indicesToDelete, expectedDeletedEntriesInCache); @@ -139,7 +104,7 @@ public void testMaintenance() throws Exception { assertThat(numberOfEntriesInCache(), equalTo(numberOfEntriesInCache - expectedDeletedEntriesInCache)); for (String mountedIndex : mountedIndices.keySet()) { - final Settings indexSettings = mountedIndicesSettings.get(mountedIndex); + final Settings indexSettings = mountedIndices.get(mountedIndex).v1(); assertHitCount( systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX) .setQuery( @@ -151,7 +116,7 @@ public void testMaintenance() throws Exception { ) .setSize(0) .get(), - indicesToDelete.contains(mountedIndex) ? 0L : mountedIndices.get(mountedIndex) + indicesToDelete.contains(mountedIndex) ? 0L : mountedIndices.get(mountedIndex).v2() ); } }); @@ -179,13 +144,12 @@ public void testMaintenance() throws Exception { Settings.EMPTY, randomFrom(Storage.values()) ); - ensureGreen(remainingMountedIndex); - mountedIndicesSettings.put(remainingMountedIndex, getIndexSettings(remainingMountedIndex)); assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME); assertExecutorIsIdle(SearchableSnapshots.CACHE_PREWARMING_THREAD_POOL_NAME); waitForBlobCacheFillsToComplete(); + ensureClusterStateConsistency(); logger.info( "--> deleting more mounted indices [{}] with snapshot [{}/{}] of index [{}] is still mounted as index [{}]", @@ -201,7 +165,7 @@ public void testMaintenance() throws Exception { refreshSystemIndex(true); for (String mountedIndex : mountedIndices.keySet()) { - final Settings indexSettings = mountedIndicesSettings.get(mountedIndex); + final Settings indexSettings = mountedIndices.get(mountedIndex).v1(); final long remainingEntriesInCache = systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX) .setQuery( @@ -219,11 +183,11 @@ public void testMaintenance() throws Exception { if (indicesToDelete.contains(mountedIndex)) { assertThat(remainingEntriesInCache, equalTo(0L)); } else if (snapshotId.equals(SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings))) { - assertThat(remainingEntriesInCache, greaterThanOrEqualTo(mountedIndices.get(randomMountedIndex))); + assertThat(remainingEntriesInCache, greaterThanOrEqualTo(mountedIndices.get(randomMountedIndex).v2())); } else if (moreIndicesToDelete.contains(mountedIndex)) { assertThat(remainingEntriesInCache, equalTo(0L)); } else { - assertThat(remainingEntriesInCache, equalTo(mountedIndices.get(mountedIndex))); + assertThat(remainingEntriesInCache, equalTo(mountedIndices.get(mountedIndex).v2())); } } }); @@ -237,6 +201,130 @@ public void testMaintenance() throws Exception { }); } + /** + * Test that obsolete blob cache entries are deleted from the system index by the periodic maintenance task. + */ + public void testPeriodicMaintenance() throws Exception { + ensureStableCluster(internalCluster().getNodeNames().length, TimeValue.timeValueSeconds(60L)); + + createRepository("repo", FsRepository.TYPE); + Map> mountedIndices = mountRandomIndicesWithCache("repo", 1, 3); + ensureYellow(SNAPSHOT_BLOB_CACHE_INDEX); + + final long nbEntriesInCacheForMountedIndices = mountedIndices.values().stream().mapToLong(Tuple::v2).sum(); + refreshSystemIndex(true); + assertThat(numberOfEntriesInCache(), equalTo(nbEntriesInCacheForMountedIndices)); + + createRepository("other", FsRepository.TYPE); + Map> otherMountedIndices = mountRandomIndicesWithCache("other", 1, 3); + + final long nbEntriesInCacheForOtherIndices = otherMountedIndices.values().stream().mapToLong(Tuple::v2).sum(); + refreshSystemIndex(true); + assertThat(numberOfEntriesInCache(), equalTo(nbEntriesInCacheForMountedIndices + nbEntriesInCacheForOtherIndices)); + + if (randomBoolean()) { + final int oldDocsInCache = indexRandomDocsInCache(1, 50, Instant.now().minus(Duration.ofDays(7L)).toEpochMilli()); + refreshSystemIndex(true); + assertThat( + numberOfEntriesInCache(), + equalTo(nbEntriesInCacheForMountedIndices + nbEntriesInCacheForOtherIndices + oldDocsInCache) + ); + } + + // creates a backup of the system index cache to be restored later + createRepository("backup", FsRepository.TYPE); + createSnapshot("backup", "backup", Collections.singletonList(SNAPSHOT_BLOB_CACHE_INDEX)); + + final Set indicesToDelete = new HashSet<>(mountedIndices.keySet()); + indicesToDelete.add(randomFrom(otherMountedIndices.keySet())); + + assertAcked(systemClient().admin().indices().prepareDelete(SNAPSHOT_BLOB_CACHE_INDEX)); + assertAcked(client().admin().indices().prepareDelete(indicesToDelete.toArray(new String[0]))); + assertAcked(client().admin().cluster().prepareDeleteRepository("repo")); + ensureClusterStateConsistency(); + + assertThat(numberOfEntriesInCache(), equalTo(0L)); + + assertAcked( + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder() + .put( + BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING.getKey(), + TimeValue.timeValueSeconds(1L) + ) + ) + ); + try { + // restores the .snapshot-blob-cache index with - now obsolete - documents + final RestoreSnapshotResponse restoreResponse = client().admin() + .cluster() + .prepareRestoreSnapshot("backup", "backup") + .setIndices(SNAPSHOT_BLOB_CACHE_INDEX) + .setWaitForCompletion(true) + .get(); + assertThat(restoreResponse.getRestoreInfo().successfulShards(), equalTo(1)); + assertThat(restoreResponse.getRestoreInfo().failedShards(), equalTo(0)); + + final int recentDocsInCache; + if (randomBoolean()) { + // recent as in the future, actually + recentDocsInCache = indexRandomDocsInCache(1, 50, Instant.now().plus(Duration.ofDays(10L)).toEpochMilli()); + } else { + recentDocsInCache = 0; + } + + // only very old docs should have been deleted + assertBusy(() -> { + refreshSystemIndex(true); + assertThat( + numberOfEntriesInCache(), + equalTo(nbEntriesInCacheForMountedIndices + nbEntriesInCacheForOtherIndices + recentDocsInCache) + ); + }, 30L, TimeUnit.SECONDS); + + // updating the retention period from 1H to immediate + assertAcked( + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder() + .put( + BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_RETENTION_PERIOD.getKey(), + TimeValue.timeValueSeconds(0L) + ) + ) + ); + + // only used documents should remain + final long expectNumberOfRemainingCacheEntries = otherMountedIndices.entrySet() + .stream() + .filter(e -> indicesToDelete.contains(e.getKey()) == false) + .mapToLong(e -> e.getValue().v2()) + .sum(); + + assertBusy(() -> { + refreshSystemIndex(true); + assertThat(numberOfEntriesInCache(), equalTo(expectNumberOfRemainingCacheEntries + recentDocsInCache)); + }, 30L, TimeUnit.SECONDS); + + } finally { + assertAcked( + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder() + .putNull(BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING.getKey()) + .putNull(BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_RETENTION_PERIOD.getKey()) + ) + ); + } + } + /** * @return a {@link Client} that can be used to query the blob store cache system index */ @@ -271,4 +359,97 @@ private void refreshSystemIndex(boolean failIfNotExist) { private Settings getIndexSettings(String indexName) { return client().admin().indices().prepareGetSettings(indexName).get().getIndexToSettings().get(indexName); } + + private Map> mountRandomIndicesWithCache(String repositoryName, int min, int max) throws Exception { + refreshSystemIndex(false); + long previousNumberOfCachedEntries = numberOfEntriesInCache(); + + final int nbIndices = randomIntBetween(min, max); + logger.info("--> generating [{}] indices with cached entries in system index...", nbIndices); + final Map> mountedIndices = new HashMap<>(); + + int i = 0; + while (mountedIndices.size() < nbIndices) { + final String indexName = "index-" + i; + createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()); + + while (true) { + final List indexRequestBuilders = new ArrayList<>(); + for (int n = 500; n > 0; n--) { + final XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.startObject(); + for (int j = 0; j < 10; j++) { + builder.field("text_" + j, randomRealisticUnicodeOfCodepointLength(10)); + builder.field("int_" + j, randomInt()); + } + builder.endObject(); + indexRequestBuilders.add(client().prepareIndex(indexName, SINGLE_MAPPING_NAME).setSource(builder)); + } + indexRandom(true, indexRequestBuilders); + + final String snapshot = "snapshot-" + i; + createSnapshot(repositoryName, snapshot, Collections.singletonList(indexName)); + + final String mountedIndex = "mounted-" + indexName + "-in-" + repositoryName; + mountSnapshot(repositoryName, snapshot, indexName, mountedIndex, Settings.EMPTY, randomFrom(Storage.values())); + + ensureGreen(mountedIndex); + assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME); + assertExecutorIsIdle(SearchableSnapshots.CACHE_PREWARMING_THREAD_POOL_NAME); + waitForBlobCacheFillsToComplete(); + + refreshSystemIndex(false); + final long numberOfEntriesInCache = numberOfEntriesInCache(); + if (numberOfEntriesInCache > previousNumberOfCachedEntries) { + final long nbEntries = numberOfEntriesInCache - previousNumberOfCachedEntries; + logger.info("--> mounted index [{}] has [{}] entries in cache", mountedIndex, nbEntries); + mountedIndices.put(mountedIndex, Tuple.tuple(getIndexSettings(mountedIndex), nbEntries)); + previousNumberOfCachedEntries = numberOfEntriesInCache; + break; + + } else { + logger.info("--> mounted index [{}] did not generate any entry in cache", mountedIndex); + assertAcked(client().admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot).get()); + assertAcked(client().admin().indices().prepareDelete(mountedIndex)); + } + } + assertAcked(client().admin().indices().prepareDelete(indexName)); + i += 1; + } + return Collections.unmodifiableMap(mountedIndices); + } + + private int indexRandomDocsInCache(final int minDocs, final int maxDocs, final long creationTimeInEpochMillis) { + final int nbDocs = randomIntBetween(minDocs, maxDocs); + final CountDownLatch latch = new CountDownLatch(nbDocs); + + String repository = randomAlphaOfLength(5).toLowerCase(Locale.ROOT); + SnapshotId snapshotId = new SnapshotId("snap", UUIDs.randomBase64UUID()); + IndexId indexId = new IndexId("index", UUIDs.randomBase64UUID()); + ShardId shardId = new ShardId("index", UUIDs.randomBase64UUID(), randomInt(5)); + String fileName = randomAlphaOfLength(5).toLowerCase(Locale.ROOT) + '.' + randomFrom(LuceneFilesExtensions.values()).getExtension(); + byte[] bytes = randomByteArrayOfLength(randomIntBetween(1, 64)); + + final BlobStoreCacheService blobStoreCacheService = internalCluster().getDataNodeInstance(BlobStoreCacheService.class); + for (int i = 0; i < nbDocs; i++) { + int length = randomIntBetween(1, Math.max(1, bytes.length - 1)); + blobStoreCacheService.putAsync( + repository, + snapshotId, + indexId, + shardId, + fileName, + ByteRange.of(i, i + length), + new BytesArray(bytes, 0, length), + creationTimeInEpochMillis, + ActionListener.wrap(latch::countDown) + ); + } + try { + latch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + return nbDocs; + } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index 1b239bdc4a1a3..6c6cac35fa31b 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -305,7 +305,11 @@ public List> getSettings() { FrozenCacheService.FROZEN_CACHE_RECOVERY_RANGE_SIZE_SETTING, FrozenCacheService.SNAPSHOT_CACHE_MAX_FREQ_SETTING, FrozenCacheService.SNAPSHOT_CACHE_DECAY_INTERVAL_SETTING, - FrozenCacheService.SNAPSHOT_CACHE_MIN_TIME_DELTA_SETTING + FrozenCacheService.SNAPSHOT_CACHE_MIN_TIME_DELTA_SETTING, + BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING, + BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_KEEP_ALIVE_SETTING, + BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_BATCH_SIZE_SETTING, + BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_RETENTION_PERIOD ); } @@ -336,11 +340,12 @@ public Collection createComponents( final BlobStoreCacheService blobStoreCacheService = new BlobStoreCacheService( clusterService, client, - SNAPSHOT_BLOB_CACHE_INDEX, - threadPool::absoluteTimeInMillis + SNAPSHOT_BLOB_CACHE_INDEX ); this.blobStoreCacheService.set(blobStoreCacheService); - clusterService.addListener(new BlobStoreCacheMaintenanceService(threadPool, client, SNAPSHOT_BLOB_CACHE_INDEX)); + clusterService.addListener( + new BlobStoreCacheMaintenanceService(settings, clusterService, threadPool, client, SNAPSHOT_BLOB_CACHE_INDEX) + ); components.add(blobStoreCacheService); } else { PersistentCache.cleanUp(settings, nodeEnvironment); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java index 09674528766ff..ff6ff96373640 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java @@ -11,17 +11,44 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkAction; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.search.ClosePointInTimeAction; +import org.elasticsearch.action.search.ClosePointInTimeRequest; +import org.elasticsearch.action.search.ClosePointInTimeResponse; +import org.elasticsearch.action.search.OpenPointInTimeAction; +import org.elasticsearch.action.search.OpenPointInTimeRequest; +import org.elasticsearch.action.search.OpenPointInTimeResponse; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.RepositoriesMetadata; +import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.document.DocumentField; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; import org.elasticsearch.index.query.QueryBuilder; @@ -29,24 +56,37 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; -import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.PointInTimeBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.fetch.subphase.FieldAndFormat; +import org.elasticsearch.search.sort.ShardDocSortField; import org.elasticsearch.snapshots.SearchableSnapshotsSettings; -import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; +import java.util.Map; import java.util.Objects; import java.util.Queue; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; import static org.elasticsearch.xpack.core.ClientHelper.SEARCHABLE_SNAPSHOTS_ORIGIN; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_BLOB_CACHE_INDEX; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING; /** * A service that delete documents in the snapshot blob cache index when they are not required anymore. @@ -60,14 +100,80 @@ public class BlobStoreCacheMaintenanceService implements ClusterStateListener { private static final Logger logger = LogManager.getLogger(BlobStoreCacheMaintenanceService.class); + /** + * The interval at which the periodic cleanup of the blob store cache index is scheduled. + */ + public static final Setting SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING = Setting.timeSetting( + "searchable_snapshots.blob_cache.periodic_cleanup.interval", + TimeValue.timeValueHours(1), + TimeValue.ZERO, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + /** + * The keep alive value for the internal point-in-time requests executed during the periodic cleanup. + */ + public static final Setting SNAPSHOT_SNAPSHOT_CLEANUP_KEEP_ALIVE_SETTING = Setting.timeSetting( + "searchable_snapshots.blob_cache.periodic_cleanup.pit_keep_alive", + TimeValue.timeValueMinutes(10L), + TimeValue.timeValueSeconds(30L), + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + /** + * The number of documents that are searched for and bulk-deleted at once during the periodic cleanup. + */ + public static final Setting SNAPSHOT_SNAPSHOT_CLEANUP_BATCH_SIZE_SETTING = Setting.intSetting( + "searchable_snapshots.blob_cache.periodic_cleanup.batch_size", + 100, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** + * The retention period to keep obsolete documents in the blob store cache index. This duration is used during the periodic cleanup in + * order to avoid deleting documents belonging to concurrently mounted searchable snapshots. Defaults to 1h. + */ + public static final Setting SNAPSHOT_SNAPSHOT_CLEANUP_RETENTION_PERIOD = Setting.timeSetting( + "searchable_snapshots.blob_cache.periodic_cleanup.retention_period", + TimeValue.timeValueHours(1L), + TimeValue.ZERO, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + private final ClusterService clusterService; private final Client clientWithOrigin; private final String systemIndexName; private final ThreadPool threadPool; - public BlobStoreCacheMaintenanceService(ThreadPool threadPool, Client client, String systemIndexName) { + private volatile Scheduler.Cancellable periodicTask; + private volatile TimeValue periodicTaskInterval; + private volatile TimeValue periodicTaskKeepAlive; + private volatile TimeValue periodicTaskRetention; + private volatile int periodicTaskBatchSize; + private volatile boolean schedulePeriodic; + + public BlobStoreCacheMaintenanceService( + Settings settings, + ClusterService clusterService, + ThreadPool threadPool, + Client client, + String systemIndexName + ) { this.clientWithOrigin = new OriginSettingClient(Objects.requireNonNull(client), SEARCHABLE_SNAPSHOTS_ORIGIN); this.systemIndexName = Objects.requireNonNull(systemIndexName); + this.clusterService = Objects.requireNonNull(clusterService); this.threadPool = Objects.requireNonNull(threadPool); + this.periodicTaskInterval = SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING.get(settings); + this.periodicTaskKeepAlive = SNAPSHOT_SNAPSHOT_CLEANUP_KEEP_ALIVE_SETTING.get(settings); + this.periodicTaskBatchSize = SNAPSHOT_SNAPSHOT_CLEANUP_BATCH_SIZE_SETTING.get(settings); + this.periodicTaskRetention = SNAPSHOT_SNAPSHOT_CLEANUP_RETENTION_PERIOD.get(settings); + final ClusterSettings clusterSettings = clusterService.getClusterSettings(); + clusterSettings.addSettingsUpdateConsumer(SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING, this::setPeriodicTaskInterval); + clusterSettings.addSettingsUpdateConsumer(SNAPSHOT_SNAPSHOT_CLEANUP_KEEP_ALIVE_SETTING, this::setPeriodicTaskKeepAlive); + clusterSettings.addSettingsUpdateConsumer(SNAPSHOT_SNAPSHOT_CLEANUP_BATCH_SIZE_SETTING, this::setPeriodicTaskBatchSize); + clusterSettings.addSettingsUpdateConsumer(SNAPSHOT_SNAPSHOT_CLEANUP_RETENTION_PERIOD, this::setPeriodicTaskRetention); } @Override @@ -77,11 +183,63 @@ public void clusterChanged(ClusterChangedEvent event) { return; // state not fully recovered } final ShardRouting primary = systemIndexPrimaryShard(state); - if (primary == null || Objects.equals(state.nodes().getLocalNodeId(), primary.currentNodeId()) == false) { - return; // system index primary shard does not exist or is not assigned to this data node + if (primary == null + || primary.active() == false + || Objects.equals(state.nodes().getLocalNodeId(), primary.currentNodeId()) == false) { + // system index primary shard does not exist or is not assigned to this data node + stopPeriodicTask(); + return; } if (event.indicesDeleted().isEmpty() == false) { - threadPool.generic().execute(new MaintenanceTask(event)); + threadPool.generic().execute(new DeletedIndicesMaintenanceTask(event)); + } + if (periodicTask == null || periodicTask.isCancelled()) { + schedulePeriodic = true; + startPeriodicTask(); + } + } + + private synchronized void setPeriodicTaskInterval(TimeValue interval) { + this.periodicTaskInterval = interval; + } + + private void setPeriodicTaskKeepAlive(TimeValue keepAlive) { + this.periodicTaskKeepAlive = keepAlive; + } + + public void setPeriodicTaskRetention(TimeValue retention) { + this.periodicTaskRetention = retention; + } + + public void setPeriodicTaskBatchSize(int batchSize) { + this.periodicTaskBatchSize = batchSize; + } + + private synchronized void startPeriodicTask() { + if (schedulePeriodic) { + try { + final TimeValue delay = periodicTaskInterval; + if (delay.getMillis() > 0L) { + final PeriodicMaintenanceTask task = new PeriodicMaintenanceTask(periodicTaskKeepAlive, periodicTaskBatchSize); + periodicTask = threadPool.schedule(task, delay, ThreadPool.Names.GENERIC); + } else { + periodicTask = null; + } + } catch (EsRejectedExecutionException e) { + if (e.isExecutorShutdown()) { + logger.debug("failed to schedule next periodic maintenance task for blob store cache, node is shutting down", e); + } else { + throw e; + } + } + } + } + + private synchronized void stopPeriodicTask() { + schedulePeriodic = false; + if (periodicTask != null && periodicTask.isCancelled() == false) { + periodicTask.cancel(); + periodicTask = null; } } @@ -97,28 +255,34 @@ private ShardRouting systemIndexPrimaryShard(final ClusterState state) { return null; } - private static boolean hasSearchableSnapshotWith(final ClusterState state, final SnapshotId snapshotId, final IndexId indexId) { + private static boolean hasSearchableSnapshotWith(final ClusterState state, final String snapshotId, final String indexId) { for (IndexMetadata indexMetadata : state.metadata()) { final Settings indexSettings = indexMetadata.getSettings(); if (SearchableSnapshotsSettings.isSearchableSnapshotStore(indexSettings)) { - final SnapshotId otherSnapshotId = new SnapshotId( - SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings), - SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings) - ); - if (Objects.equals(snapshotId, otherSnapshotId)) { - final IndexId otherIndexId = new IndexId( - SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings), - SNAPSHOT_INDEX_ID_SETTING.get(indexSettings) - ); - if (Objects.equals(indexId, otherIndexId)) { - return true; - } + if (Objects.equals(snapshotId, SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings)) + && Objects.equals(indexId, SNAPSHOT_INDEX_ID_SETTING.get(indexSettings))) { + return true; } } } return false; } + private static Map> listSearchableSnapshots(final ClusterState state) { + Map> snapshots = null; + for (IndexMetadata indexMetadata : state.metadata()) { + final Settings indexSettings = indexMetadata.getSettings(); + if (SearchableSnapshotsSettings.isSearchableSnapshotStore(indexSettings)) { + if (snapshots == null) { + snapshots = new HashMap<>(); + } + snapshots.computeIfAbsent(SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings), s -> new HashSet<>()) + .add(SNAPSHOT_INDEX_ID_SETTING.get(indexSettings)); + } + } + return snapshots != null ? Collections.unmodifiableMap(snapshots) : Collections.emptyMap(); + } + static QueryBuilder buildDeleteByQuery(int numberOfShards, String snapshotUuid, String indexUuid) { final Set paths = IntStream.range(0, numberOfShards) .mapToObj(shard -> String.join("/", snapshotUuid, indexUuid, String.valueOf(shard))) @@ -127,11 +291,14 @@ static QueryBuilder buildDeleteByQuery(int numberOfShards, String snapshotUuid, return QueryBuilders.termsQuery("blob.path", paths); } - private class MaintenanceTask extends AbstractRunnable { + /** + * A maintenance task that cleans up the blob store cache index after searchable snapshot indices are deleted + */ + private class DeletedIndicesMaintenanceTask extends AbstractRunnable { private final ClusterChangedEvent event; - MaintenanceTask(ClusterChangedEvent event) { + DeletedIndicesMaintenanceTask(ClusterChangedEvent event) { assert event.indicesDeleted().isEmpty() == false; this.event = Objects.requireNonNull(event); } @@ -150,14 +317,8 @@ protected void doRun() { if (SearchableSnapshotsSettings.isSearchableSnapshotStore(indexSetting)) { assert state.metadata().hasIndex(deletedIndex) == false; - final SnapshotId snapshotId = new SnapshotId( - SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSetting), - SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSetting) - ); - final IndexId indexId = new IndexId( - SNAPSHOT_INDEX_NAME_SETTING.get(indexSetting), - SNAPSHOT_INDEX_ID_SETTING.get(indexSetting) - ); + final String snapshotId = SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSetting); + final String indexId = SNAPSHOT_INDEX_ID_SETTING.get(indexSetting); // we should do nothing if the current cluster state contains another // searchable snapshot index that uses the same index snapshot @@ -171,7 +332,7 @@ protected void doRun() { } final DeleteByQueryRequest request = new DeleteByQueryRequest(systemIndexName); - request.setQuery(buildDeleteByQuery(indexMetadata.getNumberOfShards(), snapshotId.getUUID(), indexId.getId())); + request.setQuery(buildDeleteByQuery(indexMetadata.getNumberOfShards(), snapshotId, indexId)); request.setRefresh(queue.isEmpty()); queue.add(Tuple.tuple(request, new ActionListener() { @@ -237,4 +398,314 @@ public void onFailure(Exception e) { ); } } + + /** + * A maintenance task that periodically cleans up unused cache entries from the blob store cache index. + * + * This task first opens a point-in-time context on the blob store cache system index and uses it to search all documents. For each + * document found the task verifies if it belongs to an existing searchable snapshot index. If the doc does not belong to any + * index then it is deleted as part of a bulk request. Once the bulk is executed the next batch of documents is searched for. Once + * all documents from the PIT have been verified the task closes the PIT and completes itself. + * + * The task executes every step (PIT opening, searches, bulk deletes, PIT closing) using the generic thread pool. + * The same task instance is used for all the steps and makes sure that a closed instance is not executed again. + */ + private class PeriodicMaintenanceTask implements Runnable, Releasable { + + private final TimeValue keepAlive; + private final int batchSize; + + private final AtomicReference error = new AtomicReference<>(); + private final AtomicBoolean closed = new AtomicBoolean(); + private final AtomicLong deletes = new AtomicLong(); + private final AtomicLong total = new AtomicLong(); + + private volatile Map> existingSnapshots; + private volatile Set existingRepositories; + private volatile SearchResponse searchResponse; + private volatile Instant expirationTime; + private volatile String pointIntTimeId; + private volatile Object[] searchAfter; + + PeriodicMaintenanceTask(TimeValue keepAlive, int batchSize) { + this.keepAlive = keepAlive; + this.batchSize = batchSize; + } + + @Override + public void run() { + assert assertGenericThread(); + try { + ensureOpen(); + if (pointIntTimeId == null) { + final OpenPointInTimeRequest openRequest = new OpenPointInTimeRequest(SNAPSHOT_BLOB_CACHE_INDEX); + openRequest.keepAlive(keepAlive); + clientWithOrigin.execute(OpenPointInTimeAction.INSTANCE, openRequest, new ActionListener() { + @Override + public void onResponse(OpenPointInTimeResponse response) { + logger.trace("periodic maintenance task initialized with point-in-time id [{}]", response.getPointInTimeId()); + PeriodicMaintenanceTask.this.pointIntTimeId = response.getPointInTimeId(); + executeNext(PeriodicMaintenanceTask.this); + } + + @Override + public void onFailure(Exception e) { + if (TransportActions.isShardNotAvailableException(e)) { + complete(null); + } else { + complete(e); + } + } + }); + return; + } + + final String pitId = pointIntTimeId; + assert Strings.hasLength(pitId); + + if (searchResponse == null) { + final SearchSourceBuilder searchSource = new SearchSourceBuilder(); + searchSource.fetchField(new FieldAndFormat(CachedBlob.CREATION_TIME_FIELD, "epoch_millis")); + searchSource.fetchSource(false); + searchSource.trackScores(false); + searchSource.sort(ShardDocSortField.NAME); + searchSource.size(batchSize); + if (searchAfter != null) { + searchSource.searchAfter(searchAfter); + searchSource.trackTotalHits(false); + } else { + searchSource.trackTotalHits(true); + } + final PointInTimeBuilder pointInTime = new PointInTimeBuilder(pitId); + pointInTime.setKeepAlive(keepAlive); + searchSource.pointInTimeBuilder(pointInTime); + final SearchRequest searchRequest = new SearchRequest(); + searchRequest.source(searchSource); + clientWithOrigin.execute(SearchAction.INSTANCE, searchRequest, new ActionListener() { + @Override + public void onResponse(SearchResponse response) { + if (searchAfter == null) { + assert PeriodicMaintenanceTask.this.total.get() == 0L; + PeriodicMaintenanceTask.this.total.set(response.getHits().getTotalHits().value); + } + PeriodicMaintenanceTask.this.searchResponse = response; + PeriodicMaintenanceTask.this.searchAfter = null; + executeNext(PeriodicMaintenanceTask.this); + } + + @Override + public void onFailure(Exception e) { + complete(e); + } + }); + return; + } + + final SearchHit[] searchHits = searchResponse.getHits().getHits(); + if (searchHits != null && searchHits.length > 0) { + if (expirationTime == null) { + final TimeValue retention = periodicTaskRetention; + expirationTime = Instant.ofEpochMilli(threadPool.absoluteTimeInMillis()) + .minus(retention.millis(), ChronoUnit.MILLIS); + + final ClusterState state = clusterService.state(); + // compute the list of existing searchable snapshots and repositories once + existingSnapshots = listSearchableSnapshots(state); + existingRepositories = state.metadata() + .custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY) + .repositories() + .stream() + .map(RepositoryMetadata::name) + .collect(Collectors.toSet()); + } + + final BulkRequest bulkRequest = new BulkRequest(); + final Map> knownSnapshots = existingSnapshots; + assert knownSnapshots != null; + final Set knownRepositories = existingRepositories; + assert knownRepositories != null; + final Instant expirationTime = this.expirationTime; + assert expirationTime != null; + + Object[] lastSortValues = null; + for (SearchHit searchHit : searchHits) { + lastSortValues = searchHit.getSortValues(); + assert searchHit.getId() != null; + try { + boolean delete = false; + + // See {@link BlobStoreCacheService#generateId} + // doc id = {repository name}/{snapshot id}/{snapshot index id}/{shard id}/{file name}/@{file offset} + final String[] parts = Objects.requireNonNull(searchHit.getId()).split("/"); + assert parts.length == 6 : Arrays.toString(parts) + " vs " + searchHit.getId(); + + final String repositoryName = parts[0]; + if (knownRepositories.contains(repositoryName) == false) { + logger.trace("deleting blob store cache entry with id [{}]: repository does not exist", searchHit.getId()); + delete = true; + } else { + final Set knownIndexIds = knownSnapshots.get(parts[1]); + if (knownIndexIds == null || knownIndexIds.contains(parts[2]) == false) { + logger.trace("deleting blob store cache entry with id [{}]: not used", searchHit.getId()); + delete = true; + } + } + if (delete) { + final Instant creationTime = getCreationTime(searchHit); + if (creationTime.isAfter(expirationTime)) { + logger.trace( + "blob store cache entry with id [{}] was created recently, skipping deletion", + searchHit.getId() + ); + continue; + } + bulkRequest.add(new DeleteRequest().index(searchHit.getIndex()).id(searchHit.getId())); + } + } catch (Exception e) { + logger.warn( + () -> new ParameterizedMessage( + "exception when parsing blob store cache entry with id [{}], skipping", + searchHit.getId() + ), + e + ); + } + } + + assert lastSortValues != null; + if (bulkRequest.numberOfActions() == 0) { + this.searchResponse = null; + this.searchAfter = lastSortValues; + executeNext(this); + return; + } + + final Object[] finalSearchAfter = lastSortValues; + clientWithOrigin.execute(BulkAction.INSTANCE, bulkRequest, new ActionListener() { + @Override + public void onResponse(BulkResponse response) { + for (BulkItemResponse itemResponse : response.getItems()) { + if (itemResponse.isFailed() == false) { + assert itemResponse.getResponse() instanceof DeleteResponse; + PeriodicMaintenanceTask.this.deletes.incrementAndGet(); + } + } + PeriodicMaintenanceTask.this.searchResponse = null; + PeriodicMaintenanceTask.this.searchAfter = finalSearchAfter; + executeNext(PeriodicMaintenanceTask.this); + } + + @Override + public void onFailure(Exception e) { + complete(e); + } + }); + return; + } + // we're done, complete the task + complete(null); + } catch (Exception e) { + complete(e); + } + } + + public boolean isClosed() { + return closed.get(); + } + + private void ensureOpen() { + if (isClosed()) { + assert false : "should not use periodic task after close"; + throw new IllegalStateException("Periodic maintenance task is closed"); + } + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + final Exception e = error.get(); + if (e != null) { + logger.warn( + () -> new ParameterizedMessage( + "periodic maintenance task completed with failure ({} deleted documents out of a total of {})", + deletes.get(), + total.get() + ), + e + ); + } else { + logger.info( + () -> new ParameterizedMessage( + "periodic maintenance task completed ({} deleted documents out of a total of {})", + deletes.get(), + total.get() + ) + ); + } + } + } + + private void complete(@Nullable Exception failure) { + assert isClosed() == false; + final Releasable releasable = () -> { + try { + final Exception previous = error.getAndSet(failure); + assert previous == null : "periodic maintenance task already failed: " + previous; + close(); + } finally { + startPeriodicTask(); + } + }; + boolean waitForRelease = false; + try { + final String pitId = pointIntTimeId; + if (Strings.hasLength(pitId)) { + final ClosePointInTimeRequest closeRequest = new ClosePointInTimeRequest(pitId); + clientWithOrigin.execute( + ClosePointInTimeAction.INSTANCE, + closeRequest, + ActionListener.runAfter(new ActionListener() { + @Override + public void onResponse(ClosePointInTimeResponse response) { + if (response.isSucceeded()) { + logger.debug("periodic maintenance task successfully closed point-in-time id [{}]", pitId); + } else { + logger.debug("point-in-time id [{}] not found", pitId); + } + } + + @Override + public void onFailure(Exception e) { + logger.warn(() -> new ParameterizedMessage("failed to close point-in-time id [{}]", pitId), e); + } + }, () -> Releasables.close(releasable)) + ); + waitForRelease = true; + } + } finally { + if (waitForRelease == false) { + Releasables.close(releasable); + } + } + } + } + + private void executeNext(PeriodicMaintenanceTask maintenanceTask) { + threadPool.generic().execute(maintenanceTask); + } + + private static boolean assertGenericThread() { + final String threadName = Thread.currentThread().getName(); + assert threadName.contains(ThreadPool.Names.GENERIC) : threadName; + return true; + } + + private static Instant getCreationTime(SearchHit searchHit) { + final DocumentField creationTimeField = searchHit.field(CachedBlob.CREATION_TIME_FIELD); + assert creationTimeField != null; + final Object creationTimeValue = creationTimeField.getValue(); + assert creationTimeValue != null; + assert creationTimeValue instanceof String : "expect a java.lang.String but got " + creationTimeValue.getClass(); + return Instant.ofEpochMilli(Long.parseLong(creationTimeField.getValue())); + } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheService.java index b4edbc198f539..4faab2a9e37fc 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheService.java @@ -49,7 +49,6 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Supplier; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xpack.core.ClientHelper.SEARCHABLE_SNAPSHOTS_ORIGIN; @@ -72,17 +71,15 @@ public class BlobStoreCacheService extends AbstractLifecycleComponent { private final ClusterService clusterService; private final Semaphore inFlightCacheFills; - private final Supplier timeSupplier; private final AtomicBoolean closed; private final Client client; private final String index; - public BlobStoreCacheService(ClusterService clusterService, Client client, String index, Supplier timeSupplier) { + public BlobStoreCacheService(ClusterService clusterService, Client client, String index) { this.client = new OriginSettingClient(client, SEARCHABLE_SNAPSHOTS_ORIGIN); this.inFlightCacheFills = new Semaphore(MAX_IN_FLIGHT_CACHE_FILLS); this.closed = new AtomicBoolean(false); this.clusterService = clusterService; - this.timeSupplier = timeSupplier; this.index = index; } @@ -242,12 +239,13 @@ public final void putAsync( final String name, final ByteRange range, final BytesReference bytes, + final long timeInEpochMillis, final ActionListener listener ) { final String id = generateId(repository, snapshotId, indexId, shardId, name, range); try { final CachedBlob cachedBlob = new CachedBlob( - Instant.ofEpochMilli(timeSupplier.get()), + Instant.ofEpochMilli(timeInEpochMillis), Version.CURRENT, repository, name, diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/CachedBlob.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/CachedBlob.java index ba0d2b21b659f..8bb08554829a9 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/CachedBlob.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/CachedBlob.java @@ -31,6 +31,7 @@ public class CachedBlob implements ToXContent { public static final CachedBlob CACHE_MISS = new CachedBlob(null, null, null, "CACHE_MISS", null, BytesArray.EMPTY, 0L, 0L); private static final String TYPE = "blob"; + public static final String CREATION_TIME_FIELD = "creation_time"; private final Instant creationTime; private final Version version; @@ -80,7 +81,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(); { builder.field("type", TYPE); - builder.field("creation_time", creationTime.toEpochMilli()); + builder.field(CREATION_TIME_FIELD, creationTime.toEpochMilli()); builder.field("version", version.id); builder.field("repository", repository); builder.startObject("blob"); @@ -117,9 +118,17 @@ public BytesReference bytes() { return bytes; } + public Version version() { + return version; + } + + public Instant creationTime() { + return creationTime; + } + @SuppressWarnings("unchecked") public static CachedBlob fromSource(final Map source) { - final Long creationTimeEpochMillis = (Long) source.get("creation_time"); + final Long creationTimeEpochMillis = (Long) source.get(CREATION_TIME_FIELD); if (creationTimeEpochMillis == null) { throw new IllegalStateException("cached blob document does not have the [creation_time] field"); } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectory.java index 2057620b1b54d..a169eb0d0b8da 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectory.java @@ -713,7 +713,17 @@ public CachedBlob getCachedBlob(String name, ByteRange range) { } public void putCachedBlob(String name, ByteRange range, BytesReference content, ActionListener listener) { - blobStoreCacheService.putAsync(repository, snapshotId, indexId, shardId, name, range, content, listener); + blobStoreCacheService.putAsync( + repository, + snapshotId, + indexId, + shardId, + name, + range, + content, + threadPool.absoluteTimeInMillis(), + listener + ); } public FrozenCacheFile getFrozenCacheFile(String fileName, long length) { diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheServiceTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheServiceTests.java index ea3dbfdf65cb1..7f195890bea78 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheServiceTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheServiceTests.java @@ -104,7 +104,7 @@ public void testGetWhenServiceNotStarted() { return null; }).when(mockClient).execute(eq(GetAction.INSTANCE), any(GetRequest.class), any(ActionListener.class)); - BlobStoreCacheService blobCacheService = new BlobStoreCacheService(null, mockClient, SNAPSHOT_BLOB_CACHE_INDEX, () -> 0L); + BlobStoreCacheService blobCacheService = new BlobStoreCacheService(null, mockClient, SNAPSHOT_BLOB_CACHE_INDEX); blobCacheService.start(); PlainActionFuture future = PlainActionFuture.newFuture(); @@ -137,17 +137,17 @@ public void testPutWhenServiceNotStarted() { return null; }).when(mockClient).execute(eq(IndexAction.INSTANCE), any(IndexRequest.class), any(ActionListener.class)); - BlobStoreCacheService blobCacheService = new BlobStoreCacheService(null, mockClient, SNAPSHOT_BLOB_CACHE_INDEX, () -> 0L); + BlobStoreCacheService blobCacheService = new BlobStoreCacheService(null, mockClient, SNAPSHOT_BLOB_CACHE_INDEX); blobCacheService.start(); PlainActionFuture future = PlainActionFuture.newFuture(); - blobCacheService.putAsync(repository, snapshotId, indexId, shardId, fileName, range, BytesArray.EMPTY, future); + blobCacheService.putAsync(repository, snapshotId, indexId, shardId, fileName, range, BytesArray.EMPTY, 0L, future); assertThat(future.actionGet(), nullValue()); blobCacheService.stop(); future = PlainActionFuture.newFuture(); - blobCacheService.putAsync(repository, snapshotId, indexId, shardId, fileName, range, BytesArray.EMPTY, future); + blobCacheService.putAsync(repository, snapshotId, indexId, shardId, fileName, range, BytesArray.EMPTY, 0L, future); IllegalStateException exception = expectThrows(IllegalStateException.class, future::actionGet); assertThat(exception.getMessage(), containsString("Blob cache service is closed")); } @@ -176,7 +176,7 @@ public void testWaitForInFlightCacheFillsToComplete() throws Exception { return null; }).when(mockClient).execute(eq(IndexAction.INSTANCE), any(IndexRequest.class), any(ActionListener.class)); - final BlobStoreCacheService blobCacheService = new BlobStoreCacheService(null, mockClient, SNAPSHOT_BLOB_CACHE_INDEX, () -> 0L); + final BlobStoreCacheService blobCacheService = new BlobStoreCacheService(null, mockClient, SNAPSHOT_BLOB_CACHE_INDEX); blobCacheService.start(); assertThat(blobCacheService.getInFlightCacheFills(), equalTo(0)); @@ -186,7 +186,7 @@ public void testWaitForInFlightCacheFillsToComplete() throws Exception { final PlainActionFuture future = PlainActionFuture.newFuture(); threadPool.generic() .execute( - () -> blobCacheService.putAsync(repository, snapshotId, indexId, shardId, fileName, range, BytesArray.EMPTY, future) + () -> blobCacheService.putAsync(repository, snapshotId, indexId, shardId, fileName, range, BytesArray.EMPTY, 0L, future) ); futures.add(future); } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/TestUtils.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/TestUtils.java index f70f1edb6cfa5..ba337150d4b05 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/TestUtils.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/TestUtils.java @@ -324,7 +324,7 @@ private static Client mockClient() { public static class NoopBlobStoreCacheService extends BlobStoreCacheService { public NoopBlobStoreCacheService() { - super(null, mockClient(), SNAPSHOT_BLOB_CACHE_INDEX, () -> 0L); + super(null, mockClient(), SNAPSHOT_BLOB_CACHE_INDEX); } @Override @@ -353,7 +353,7 @@ public static class SimpleBlobStoreCacheService extends BlobStoreCacheService { private final ConcurrentHashMap blobs = new ConcurrentHashMap<>(); public SimpleBlobStoreCacheService() { - super(null, mockClient(), SNAPSHOT_BLOB_CACHE_INDEX, System::currentTimeMillis); + super(null, mockClient(), SNAPSHOT_BLOB_CACHE_INDEX); } @Override