From 102d9aad4b98c34bc2783a8020968083c2cce85e Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 1 Dec 2020 18:12:40 +0100 Subject: [PATCH 01/20] Add persistent cache --- .../BaseSearchableSnapshotsIntegTestCase.java | 2 + ...leSnapshotsPersistentCacheIntegTests.java} | 81 +-- .../store/SearchableSnapshotDirectory.java | 33 +- .../SearchableSnapshotIndexEventListener.java | 54 +- .../SearchableSnapshots.java | 13 +- .../cache/CacheService.java | 56 +- .../cache/NodeEnvironmentCacheCleaner.java | 50 -- .../cache/PersistentCache.java | 613 ++++++++++++++++++ .../index/store/cache/CacheFileTests.java | 6 +- .../index/store/cache/TestUtils.java | 10 +- .../AbstractSearchableSnapshotsTestCase.java | 18 +- .../cache/CacheServiceTests.java | 9 +- .../cache/PersistentCacheTests.java | 203 ++++++ 13 files changed, 980 insertions(+), 168 deletions(-) rename x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/{SearchableSnapshotsCacheClearingIntegTests.java => cache/SearchableSnapshotsPersistentCacheIntegTests.java} (71%) delete mode 100644 x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/NodeEnvironmentCacheCleaner.java create mode 100644 x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java create mode 100644 x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java index fcbcc2f8db6b3..40e9abd1d77a4 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java @@ -24,6 +24,7 @@ */ package org.elasticsearch.xpack.searchablesnapshots; +import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.common.Strings; @@ -46,6 +47,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; +@LuceneTestCase.SuppressFileSystems("*") public abstract class BaseSearchableSnapshotsIntegTestCase extends AbstractSnapshotIntegTestCase { @Override protected boolean addMockInternalEngine() { diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsCacheClearingIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/SearchableSnapshotsPersistentCacheIntegTests.java similarity index 71% rename from x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsCacheClearingIntegTests.java rename to x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/SearchableSnapshotsPersistentCacheIntegTests.java index e4e9115393d32..627c8e6733a48 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsCacheClearingIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/SearchableSnapshotsPersistentCacheIntegTests.java @@ -4,16 +4,13 @@ * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.searchablesnapshots; +package org.elasticsearch.xpack.searchablesnapshots.cache; -import org.apache.lucene.mockfile.FilterFileSystemProvider; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.io.PathUtils; -import org.elasticsearch.common.io.PathUtilsForTesting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -24,19 +21,14 @@ import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction; import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest; -import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.elasticsearch.xpack.searchablesnapshots.BaseSearchableSnapshotsIntegTestCase; -import java.io.IOException; import java.nio.file.DirectoryStream; -import java.nio.file.FileSystem; import java.nio.file.Files; import java.nio.file.Path; import java.util.HashSet; import java.util.Locale; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_PREFIX; import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING; @@ -44,48 +36,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; -public class SearchableSnapshotsCacheClearingIntegTests extends BaseSearchableSnapshotsIntegTestCase { - - private static DeleteBlockingFileSystemProvider deleteBlockingFileSystemProvider; - - @BeforeClass - public static void installDeleteBlockingFileSystemProvider() { - FileSystem current = PathUtils.getDefaultFileSystem(); - deleteBlockingFileSystemProvider = new DeleteBlockingFileSystemProvider(current); - PathUtilsForTesting.installMock(deleteBlockingFileSystemProvider.getFileSystem(null)); - } - - @AfterClass - public static void removeDeleteBlockingFileSystemProvider() { - PathUtilsForTesting.teardown(); - } - - void startBlockingDeletes() { - deleteBlockingFileSystemProvider.injectFailures.set(true); - } - - void stopBlockingDeletes() { - deleteBlockingFileSystemProvider.injectFailures.set(false); - } - - private static class DeleteBlockingFileSystemProvider extends FilterFileSystemProvider { - - AtomicBoolean injectFailures = new AtomicBoolean(); - - DeleteBlockingFileSystemProvider(FileSystem inner) { - super("deleteblocking://", inner); - } - - @Override - public boolean deleteIfExists(Path path) throws IOException { - if (injectFailures.get()) { - throw new IOException("blocked deletion of " + path); - } else { - return super.deleteIfExists(path); - } - } - - } +public class SearchableSnapshotsPersistentCacheIntegTests extends BaseSearchableSnapshotsIntegTestCase { @Override protected Settings nodeSettings(int nodeOrdinal) { @@ -96,7 +47,7 @@ protected Settings nodeSettings(int nodeOrdinal) { .build(); } - public void testCacheDirectoriesRemovedOnStartup() throws Exception { + public void testCacheSurviveRestart() throws Exception { final String fsRepoName = randomAlphaOfLength(10); final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); final String restoredIndexName = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT); @@ -159,25 +110,37 @@ public void testCacheDirectoriesRemovedOnStartup() throws Exception { } assertFalse("no cache files found", cacheFiles.isEmpty()); - startBlockingDeletes(); + CacheService cacheService = internalCluster().getInstance(CacheService.class, dataNode); + cacheService.synchronizeCache(); + + PersistentCache persistentCache = cacheService.getPersistentCache(); + assertThat(persistentCache.getNumDocs(), equalTo((long) cacheFiles.size())); + internalCluster().restartNode(dataNode, new InternalTestCluster.RestartCallback() { @Override public Settings onNodeStopped(String nodeName) { assertTrue(Files.isDirectory(shardCachePath)); for (Path cacheFile : cacheFiles) { - assertTrue(cacheFile + " should not have been cleaned up yet", Files.isRegularFile(cacheFile)); + assertTrue(cacheFile + " should exist on disk", Files.isRegularFile(cacheFile)); } - stopBlockingDeletes(); return Settings.EMPTY; } }); + persistentCache = internalCluster().getInstance(CacheService.class, dataNode).getPersistentCache(); + assertThat(persistentCache.getNumDocs(), equalTo((long) cacheFiles.size())); + ensureGreen(restoredIndexName); - for (Path cacheFile : cacheFiles) { - assertFalse(cacheFile + " should have been cleaned up", Files.exists(cacheFile)); - } + cacheFiles.forEach(cacheFile -> assertTrue(cacheFile + " should have survived node restart", Files.exists(cacheFile))); assertAcked(client().admin().indices().prepareDelete(restoredIndexName)); + + assertBusy(() -> cacheFiles.forEach(cacheFile -> assertFalse(cacheFile + " should have been cleaned up", Files.exists(cacheFile)))); + cacheService = internalCluster().getInstance(CacheService.class, dataNode); + cacheService.synchronizeCache(); + + persistentCache = cacheService.getPersistentCache(); + assertThat(persistentCache.getNumDocs(), equalTo(0L)); } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java index acf506cec7bee..b6456e0302e54 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java @@ -61,7 +61,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.UncheckedIOException; -import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; import java.util.Collection; @@ -127,6 +126,7 @@ public class SearchableSnapshotDirectory extends BaseDirectory { private final Path cacheDir; private final ShardPath shardPath; private final AtomicBoolean closed; + private final AtomicBoolean clearCacheOnClose; // volatile fields are updated once under `this` lock, all together, iff loaded is not true. private volatile BlobStoreIndexShardSnapshot snapshot; @@ -163,6 +163,7 @@ public SearchableSnapshotDirectory( this.cacheDir = Objects.requireNonNull(cacheDir); this.shardPath = Objects.requireNonNull(shardPath); this.closed = new AtomicBoolean(false); + this.clearCacheOnClose = new AtomicBoolean(false); this.useCache = SNAPSHOT_CACHE_ENABLED_SETTING.get(indexSettings); this.prewarmCache = useCache ? SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.get(indexSettings) : false; this.excludedFileTypes = new HashSet<>(SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING.get(indexSettings)); @@ -326,18 +327,28 @@ private static UnsupportedOperationException unsupportedException() { return new UnsupportedOperationException("Searchable snapshot directory does not support this operation"); } + /** + * Indicate if the cache should be cleared for this directory + */ + public void clearCacheOnClose() { + final boolean value = clearCacheOnClose.getAndSet(true); + assert value == false : "cache clearing is already set"; + } + @Override public final void close() { if (closed.compareAndSet(false, true)) { isOpen = false; - // Ideally we could let the cache evict/remove cached files by itself after the - // directory has been closed. - clearCache(); + if (useCache && clearCacheOnClose.get()) { + clearCache(); + } } } public void clearCache() { - cacheService.removeFromCache(cacheKey -> cacheKey.belongsTo(snapshotId, indexId, shardId)); + for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : files()) { + cacheService.removeFromCache(createCacheKey(fileInfo.physicalName())); + } } protected IndexInputStats createIndexInputStats(final long fileLength) { @@ -563,7 +574,6 @@ public static Directory create( final Path cacheDir = CacheService.getShardCachePath(shardPath).resolve(snapshotId.getUUID()); Files.createDirectories(cacheDir); - assert assertCacheIsEmpty(cacheDir); return new InMemoryNoOpCommitDirectory( new SearchableSnapshotDirectory( @@ -584,17 +594,6 @@ public static Directory create( ); } - private static boolean assertCacheIsEmpty(Path cacheDir) { - try (DirectoryStream cacheDirStream = Files.newDirectoryStream(cacheDir)) { - final Set cacheFiles = new HashSet<>(); - cacheDirStream.forEach(cacheFiles::add); - assert cacheFiles.isEmpty() : "should start with empty cache, but found " + cacheFiles; - } catch (IOException e) { - assert false : e; - } - return true; - } - public static SearchableSnapshotDirectory unwrapDirectory(Directory dir) { while (dir != null) { if (dir instanceof SearchableSnapshotDirectory) { diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java index 2e4e96b929d49..d5d8488b4307d 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java @@ -5,8 +5,12 @@ */ package org.elasticsearch.xpack.searchablesnapshots; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.index.SegmentInfos; import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexEventListener; @@ -15,14 +19,30 @@ import org.elasticsearch.index.store.SearchableSnapshotDirectory; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogException; +import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; import java.nio.file.Path; +import static org.elasticsearch.index.store.SearchableSnapshotDirectory.unwrapDirectory; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.isSearchableSnapshotStore; public class SearchableSnapshotIndexEventListener implements IndexEventListener { + private static final Logger logger = LogManager.getLogger(SearchableSnapshotIndexEventListener.class); + private final CacheService cacheService; + + public SearchableSnapshotIndexEventListener(CacheService cacheService) { + this.cacheService = cacheService; + } + @Override public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexSettings) { assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC); @@ -30,8 +50,40 @@ public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexS associateNewEmptyTranslogWithIndex(indexShard); } + @Override + public void beforeIndexRemoved(IndexService indexService, IndexRemovalReason reason) { + if (reason == IndexRemovalReason.DELETED || reason == IndexRemovalReason.FAILURE) { + if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexService.getIndexSettings().getSettings())) { + for (IndexShard indexShard : indexService) { + try { + final SearchableSnapshotDirectory directory = unwrapDirectory(indexShard.store().directory()); + assert directory != null : "expect a searchable snapshot directory instance"; + directory.clearCacheOnClose(); + } catch (Exception e) { + logger.warn("failed to close shard", e); + } + } + } + } + } + + @Override + public void beforeIndexShardDeleted(ShardId shardId, Settings indexSettings) { + if (SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING.get(indexSettings)) { + final SnapshotId snapshotId = new SnapshotId( + SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings), + SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings) + ); + final IndexId indexId = new IndexId( + SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings), + SNAPSHOT_INDEX_ID_SETTING.get(indexSettings) + ); + // cacheService.removeFromCache(cacheKey -> cacheKey.belongsTo(snapshotId, indexId, shardId)); + } + } + private static void ensureSnapshotIsLoaded(IndexShard indexShard) { - final SearchableSnapshotDirectory directory = SearchableSnapshotDirectory.unwrapDirectory(indexShard.store().directory()); + final SearchableSnapshotDirectory directory = unwrapDirectory(indexShard.store().directory()); assert directory != null; final boolean success = directory.loadSnapshot(indexShard.recoveryState()); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index bf5a869d44fe6..5dc9638e2e269 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -63,7 +63,7 @@ import org.elasticsearch.xpack.searchablesnapshots.action.TransportMountSearchableSnapshotAction; import org.elasticsearch.xpack.searchablesnapshots.action.TransportSearchableSnapshotsStatsAction; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; -import org.elasticsearch.xpack.searchablesnapshots.cache.NodeEnvironmentCacheCleaner; +import org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache; import org.elasticsearch.xpack.searchablesnapshots.rest.RestClearSearchableSnapshotsCacheAction; import org.elasticsearch.xpack.searchablesnapshots.rest.RestMountSearchableSnapshotAction; import org.elasticsearch.xpack.searchablesnapshots.rest.RestSearchableSnapshotsStatsAction; @@ -210,12 +210,7 @@ public Collection createComponents( this.threadPool.set(threadPool); this.failShardsListener.set(new FailShardsOnInvalidLicenseClusterListener(getLicenseState(), clusterService.getRerouteService())); if (DiscoveryNode.isDataNode(settings)) { - final CacheService cacheService = new CacheService( - settings, - clusterService, - threadPool, - new NodeEnvironmentCacheCleaner(nodeEnvironment) - ); + final CacheService cacheService = new CacheService(settings, clusterService, threadPool, new PersistentCache(nodeEnvironment)); this.cacheService.set(cacheService); components.add(cacheService); final BlobStoreCacheService blobStoreCacheService = new BlobStoreCacheService( @@ -226,6 +221,8 @@ public Collection createComponents( ); this.blobStoreCacheService.set(blobStoreCacheService); components.add(blobStoreCacheService); + } else { + PersistentCache.cleanUp(settings, nodeEnvironment); } return Collections.unmodifiableList(components); } @@ -233,7 +230,7 @@ public Collection createComponents( @Override public void onIndexModule(IndexModule indexModule) { if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexModule.getSettings())) { - indexModule.addIndexEventListener(new SearchableSnapshotIndexEventListener()); + indexModule.addIndexEventListener(new SearchableSnapshotIndexEventListener(cacheService.get())); indexModule.addIndexEventListener(failShardsListener.get()); } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java index c35a7bfcfff63..0a0dbfa7d7d8c 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java @@ -40,7 +40,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Predicate; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.toIntBytes; @@ -106,9 +105,9 @@ public class CacheService extends AbstractLifecycleComponent { private final CacheSynchronizationTask cacheSyncTask; private final TimeValue cacheSyncStopTimeout; private final ReentrantLock cacheSyncLock; + private final PersistentCache persistentCache; private final Cache cache; private final ByteSizeValue cacheSize; - private final Runnable cacheCleaner; private final ByteSizeValue rangeSize; private volatile int maxCacheFilesToSyncAtOnce; @@ -117,11 +116,10 @@ public CacheService( final Settings settings, final ClusterService clusterService, final ThreadPool threadPool, - final Runnable cacheCleaner + final PersistentCache persistentCache ) { this.threadPool = Objects.requireNonNull(threadPool); this.cacheSize = SNAPSHOT_CACHE_SIZE_SETTING.get(settings); - this.cacheCleaner = Objects.requireNonNull(cacheCleaner); this.rangeSize = SNAPSHOT_CACHE_RANGE_SIZE_SETTING.get(settings); this.cache = CacheBuilder.builder() .setMaximumWeight(cacheSize.getBytes()) @@ -130,6 +128,7 @@ public CacheService( // are done with reading/writing the cache file .removalListener(notification -> onCacheFileRemoval(notification.getValue())) .build(); + this.persistentCache = Objects.requireNonNull(persistentCache); this.numberOfCacheFilesToSync = new AtomicLong(); this.cacheSyncLock = new ReentrantLock(); this.cacheFilesToSync = new ConcurrentLinkedQueue<>(); @@ -151,8 +150,8 @@ static Path resolveSnapshotCache(Path path) { @Override protected void doStart() { + persistentCache.loadCacheFiles(this); cacheSyncTask.rescheduleIfNecessary(); - cacheCleaner.run(); } @Override @@ -169,10 +168,15 @@ protected void doStop() { logger.warn("interrupted while waiting for cache sync lock", e); } cacheSyncTask.close(); - cache.invalidateAll(); } finally { - if (acquired) { - cacheSyncLock.unlock(); + try { + persistentCache.close(); + } catch (Exception e) { + logger.warn("failed to close persistent cache", e); + } finally { + if (acquired) { + cacheSyncLock.unlock(); + } } } } @@ -270,17 +274,12 @@ void put( } /** - * Invalidate cache entries with keys matching the given predicate + * Evicts the {@link CacheFile} associated to the given {@link CacheKey}. * - * @param predicate the predicate to evaluate + * @param cacheKey the key of the cache file to evict */ - public void removeFromCache(final Predicate predicate) { - for (CacheKey cacheKey : cache.keys()) { - if (predicate.test(cacheKey)) { - cache.invalidate(cacheKey); - } - } - cache.refresh(); + public void removeFromCache(CacheKey cacheKey) { + cache.invalidate(cacheKey); } void setCacheSyncInterval(TimeValue interval) { @@ -307,12 +306,17 @@ private void onCacheFileUpdate(CacheFile cacheFile) { /** * This method is invoked after a {@link CacheFile} is evicted from the cache. *

- * It notifies the {@link CacheFile}'s eviction listeners that the instance is evicted. + * It notifies the {@link CacheFile}'s eviction listeners that the instance is evicted and removes it from the persistent cache. * * @param cacheFile the evicted instance */ private void onCacheFileRemoval(CacheFile cacheFile) { IOUtils.closeWhileHandlingException(cacheFile::startEviction); + try { + persistentCache.removeCacheFile(cacheFile); + } catch (Exception e) { + logger.warn("failed to remove cache file from persistent cache", e); + } } // used in tests @@ -320,6 +324,11 @@ boolean isCacheFileToSync(CacheFile cacheFile) { return cacheFilesToSync.contains(cacheFile); } + // used in tests + PersistentCache getPersistentCache() { + return persistentCache; + } + /** * Synchronize the cache files and their parent directories on disk. * @@ -362,14 +371,14 @@ protected void synchronizeCache() { final Path cacheDir = cacheFilePath.toAbsolutePath().getParent(); if (cacheDirs.add(cacheDir)) { try { - IOUtils.fsync(cacheDir, true, false); + IOUtils.fsync(cacheDir, true, false); // TODO evict cache file if fsync failed logger.trace("cache directory [{}] synchronized", cacheDir); } catch (Exception e) { assert e instanceof IOException : e; logger.warn(() -> new ParameterizedMessage("failed to synchronize cache directory [{}]", cacheDir), e); } } - // TODO Index searchable snapshot shard information + cache file ranges in Lucene + persistentCache.addCacheFile(cacheFile, ranges); count += 1L; } } catch (Exception e) { @@ -377,6 +386,13 @@ protected void synchronizeCache() { logger.warn(() -> new ParameterizedMessage("failed to fsync cache file [{}]", cacheFilePath.getFileName()), e); } } + if (count > 0 || persistentCache.hasDeletions()) { + try { + persistentCache.commit(); + } catch (IOException e) { + logger.error("failed to commit persistent cache after synchronization", e); + } + } if (logger.isDebugEnabled()) { final long elapsedNanos = threadPool.relativeTimeInNanos() - startTimeNanos; logger.debug( diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/NodeEnvironmentCacheCleaner.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/NodeEnvironmentCacheCleaner.java deleted file mode 100644 index f7edc2c663bb8..0000000000000 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/NodeEnvironmentCacheCleaner.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -package org.elasticsearch.xpack.searchablesnapshots.cache; - -import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.env.NodeEnvironment; -import org.elasticsearch.index.Index; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.shard.ShardPath; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.nio.file.Files; -import java.nio.file.Path; - -/** - * Cleans any leftover searchable snapshot caches when a node is starting up. - */ -public class NodeEnvironmentCacheCleaner implements Runnable { - - private final NodeEnvironment nodeEnvironment; - - public NodeEnvironmentCacheCleaner(NodeEnvironment nodeEnvironment) { - this.nodeEnvironment = nodeEnvironment; - } - - @Override - public void run() { - try { - for (NodeEnvironment.NodePath nodePath : nodeEnvironment.nodePaths()) { - for (String indexUUID : nodeEnvironment.availableIndexFoldersForPath(nodePath)) { - for (ShardId shardId : nodeEnvironment.findAllShardIds(new Index("_unknown_", indexUUID))) { - final Path shardDataPath = nodePath.resolve(shardId); - final ShardPath shardPath = new ShardPath(false, shardDataPath, shardDataPath, shardId); - final Path shardCachePath = CacheService.getShardCachePath(shardPath); - if (Files.isDirectory(shardCachePath)) { - IOUtils.rm(shardCachePath); - } - } - } - } - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } -} diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java new file mode 100644 index 0000000000000..2c5d6fc22edbf --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java @@ -0,0 +1,613 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.searchablesnapshots.cache; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.analysis.core.KeywordAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StoredField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexNotFoundException; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SerialMergeScheduler; +import org.apache.lucene.index.Term; +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.io.stream.ByteBufferStreamInput; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardPath; +import org.elasticsearch.index.store.cache.CacheFile; +import org.elasticsearch.index.store.cache.CacheKey; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.snapshots.SnapshotId; + +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static java.util.Collections.unmodifiableList; +import static java.util.Collections.unmodifiableSortedSet; +import static org.elasticsearch.xpack.searchablesnapshots.cache.CacheService.getShardCachePath; + +public class PersistentCache implements Closeable { + + private static final Logger logger = LogManager.getLogger(PersistentCache.class); + + private static final String NODE_VERSION_COMMIT_KEY = "node_version"; + + private final NodeEnvironment nodeEnvironment; + private final List writers; + private final AtomicBoolean closed; + + public PersistentCache(NodeEnvironment nodeEnvironment) { + this.writers = createWriters(nodeEnvironment); + this.nodeEnvironment = nodeEnvironment; + this.closed = new AtomicBoolean(); + } + + private void ensureOpen() { + if (closed.get()) { + throw new AlreadyClosedException("Persistent cache is already closed"); + } + } + + /** + * @return the {@link CacheIndexWriter} to use for the given {@link CacheFile} + */ + private CacheIndexWriter getWriter(CacheFile cacheFile) { + ensureOpen(); + if (writers.size() == 1) { + return writers.get(0); + } else { + final Path path = cacheFile.getFile().toAbsolutePath(); + return writers.stream() + .filter(writer -> path.startsWith(writer.nodePath().path)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("Failed to find a Lucene index for cache file path [" + path + ']')); + } + } + + public void addCacheFile(CacheFile cacheFile, SortedSet> ranges) throws IOException { + getWriter(cacheFile).updateCacheFile(cacheFile, ranges); + } + + public void removeCacheFile(CacheFile cacheFile) throws IOException { + getWriter(cacheFile).deleteCacheFile(cacheFile); + } + + /** + * This method repopulates the {@link CacheService} by looking at the files on the disk and for each file found, retrieves the latest + * synchronized information and puts the cache file into the searchable snapshots cache. + * + * This method iterates over all node data paths and all shard directories in order to found the "snapshot_cache" directories that + * contain the cache files. When such a directory is found, the method iterates over the cache files and looks up their name/UUID in + * the existing Lucene documents that were loaded when instanciating the persistent cache index). If no information is found (ie no + * matching docs in the map of Lucene documents) then the file is deleted from disk. If a doc is found the stored fields are extracted + * from the Lucene document and are used to rebuild the necessary {@link CacheKey}, {@link SnapshotId}, {@link IndexId}, {@link ShardId} + * and cache file ranges objects. The Lucene document is then indexed again in the new persistent cache index (the current + * {@link CacheIndexWriter}) and the cache file is added back to the searchable snapshots cache again. Note that adding cache + * file to the cache service might trigger evictions so previously reindexed Lucene cache files might be delete again (see + * CacheService#onCacheFileRemoval(CacheFile) method which calls {@link #removeCacheFile(CacheFile)}. + * + * @param cacheService the {@link CacheService} to use when repopulating {@link CacheFile}. + */ + void loadCacheFiles(CacheService cacheService) { + ensureOpen(); + try { + for (CacheIndexWriter writer : writers) { + final NodeEnvironment.NodePath nodePath = writer.nodePath(); + logger.debug("loading persistent cache on data path [{}]", nodePath); + + for (String indexUUID : nodeEnvironment.availableIndexFoldersForPath(writer.nodePath())) { + for (ShardId shardId : nodeEnvironment.findAllShardIds(new Index("_unknown_", indexUUID))) { + final Path shardDataPath = writer.nodePath().resolve(shardId); + final Path shardCachePath = getShardCachePath(new ShardPath(false, shardDataPath, shardDataPath, shardId)); + + if (Files.isDirectory(shardCachePath)) { + logger.trace("found snapshot cache dir at [{}], loading cache files from disk and index", shardCachePath); + Files.walkFileTree(shardCachePath, new CacheFileVisitor(cacheService, writer)); + } + } + } + } + for (CacheIndexWriter writer : writers) { + writer.prepareCommit(); + } + for (CacheIndexWriter writer : writers) { + writer.commit(); + } + logger.info("persistent cache index loaded"); + } catch (IOException e) { + try { + close(); + } catch (Exception e2) { + logger.warn("failed to close persistent cache index", e2); + e.addSuppressed(e2); + } + throw new UncheckedIOException("Failed to load persistent cache", e); + } finally { + closeIfAnyIndexWriterHasTragedyOrIsClosed(); + } + } + + void commit() throws IOException { + ensureOpen(); + try { + for (CacheIndexWriter writer : writers) { + writer.prepareCommit(); + } + for (CacheIndexWriter writer : writers) { + writer.commit(); + } + } catch (IOException e) { + try { + close(); + } catch (Exception e2) { + logger.warn("failed to close persistent cache index writer", e2); + e.addSuppressed(e2); + } + throw e; + } finally { + closeIfAnyIndexWriterHasTragedyOrIsClosed(); + } + } + + private void closeIfAnyIndexWriterHasTragedyOrIsClosed() { + if (writers.stream().map(writer -> writer.indexWriter).anyMatch(iw -> iw.getTragicException() != null || iw.isOpen() == false)) { + try { + close(); + } catch (Exception e) { + logger.warn("failed to close persistent cache index", e); + } + } + } + + public boolean hasDeletions() { + ensureOpen(); + for (CacheIndexWriter writer : writers) { + if (writer.indexWriter.hasDeletions()) { + return true; + } + } + return false; + } + + public long getNumDocs() { + ensureOpen(); + long count = 0L; + for (CacheIndexWriter writer : writers) { + count += writer.indexWriter.getPendingNumDocs(); + } + return count; + } + + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + IOUtils.close(writers); + } + } + + /** + * Creates a list of {@link CacheIndexWriter}, one for each data path of the specified {@link NodeEnvironment}. + * + * @param nodeEnvironment the data node environment + * @return a list of {@link CacheIndexWriter} + */ + private static List createWriters(NodeEnvironment nodeEnvironment) { + final List writers = new ArrayList<>(); + boolean success = false; + try { + final NodeEnvironment.NodePath[] nodePaths = nodeEnvironment.nodePaths(); + for (NodeEnvironment.NodePath nodePath : nodePaths) { + writers.add(createCacheIndexWriter(nodePath)); + } + success = true; + } catch (IOException e) { + throw new UncheckedIOException("Failed to create persistent cache writers", e); + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(writers); + } + } + return unmodifiableList(writers); + } + + /** + * Creates a new {@link CacheIndexWriter} for the specified data path. The is a single instance per data path. + * + * @param nodePath the data path + * @return a new {@link CacheIndexWriter} instance + * @throws IOException if something went wrong + */ + static CacheIndexWriter createCacheIndexWriter(NodeEnvironment.NodePath nodePath) throws IOException { + final List closeables = new ArrayList<>(); + boolean success = false; + try { + Path directoryPath = createCacheIndexFolder(nodePath); + final Directory directory = FSDirectory.open(directoryPath); + closeables.add(directory); + + final Map documents = new HashMap<>(); + try (IndexReader indexReader = DirectoryReader.open(directory)) { + for (LeafReaderContext leafReaderContext : indexReader.leaves()) { + final LeafReader leafReader = leafReaderContext.reader(); + final Bits liveDocs = leafReader.getLiveDocs(); + for (int i = 0; i < leafReader.maxDoc(); i++) { + if (liveDocs == null || liveDocs.get(i)) { + final Document document = leafReader.document(i); + documents.put(getValue(document, CACHE_ID_FIELD), document); + } + } + } + } catch (IndexNotFoundException e) { + logger.debug("persistent cache index does not exist yet", e); + } + + final IndexWriterConfig config = new IndexWriterConfig(new KeywordAnalyzer()); + config.setIndexDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()); + config.setOpenMode(IndexWriterConfig.OpenMode.CREATE); + config.setMergeScheduler(new SerialMergeScheduler()); + config.setRAMBufferSizeMB(1.0); + config.setCommitOnClose(false); + + final IndexWriter indexWriter = new IndexWriter(directory, config); + closeables.add(indexWriter); + + final CacheIndexWriter cacheIndexWriter = new CacheIndexWriter(nodePath, directory, indexWriter, documents); + success = true; + return cacheIndexWriter; + } finally { + if (success == false) { + IOUtils.close(closeables); + } + } + } + + /** + * Cleans any leftover searchable snapshot caches (files and Lucene indices) when a non-data node is starting up. + * This is useful when the node is repurposed and is not a data node anymore. + * + * @param nodeEnvironment the {@link NodeEnvironment} to cleanup + */ + public static void cleanUp(Settings settings, NodeEnvironment nodeEnvironment) { + final boolean isDataNode = DiscoveryNode.isDataNode(settings); + if (isDataNode) { + assert false : "should not be called on data nodes"; + throw new IllegalStateException("Cannot clean searchable snapshot caches: node is a data node"); + } + try { + for (NodeEnvironment.NodePath nodePath : nodeEnvironment.nodePaths()) { + for (String indexUUID : nodeEnvironment.availableIndexFoldersForPath(nodePath)) { + for (ShardId shardId : nodeEnvironment.findAllShardIds(new Index("_unknown_", indexUUID))) { + final Path shardDataPath = nodePath.resolve(shardId); + final ShardPath shardPath = new ShardPath(false, shardDataPath, shardDataPath, shardId); + final Path cacheDir = getShardCachePath(shardPath); + if (Files.isDirectory(cacheDir)) { + logger.debug("deleting searchable snapshot shard cache directory [{}]", cacheDir); + IOUtils.rm(cacheDir); + } + } + } + final Path cacheIndexDir = resolveCacheIndexFolder(nodePath); + if (Files.isDirectory(cacheIndexDir)) { + logger.debug("deleting searchable snapshot lucene directory [{}]", cacheIndexDir); + IOUtils.rm(cacheIndexDir); + } + } + } catch (IOException e) { + throw new UncheckedIOException("Failed to clean up searchable snapshots cache", e); + } + } + + /** + * A {@link CacheIndexWriter} contains a Lucene {@link Directory} with an {@link IndexWriter} that can be used to index documents in + * the persistent cache index. The list of existing cache documents is loaded at startup and kept around until a first commit is done. + * There is one {@link CacheIndexWriter} for each data path. + */ + static class CacheIndexWriter implements Closeable { + + private final AtomicReference> documentsRef; + private final NodeEnvironment.NodePath nodePath; + private final IndexWriter indexWriter; + private final Directory directory; + + private CacheIndexWriter( + NodeEnvironment.NodePath nodePath, + Directory directory, + IndexWriter indexWriter, + Map documents + ) { + this.documentsRef = new AtomicReference<>(Objects.requireNonNull(documents)); + this.nodePath = nodePath; + this.directory = directory; + this.indexWriter = indexWriter; + } + + NodeEnvironment.NodePath nodePath() { + return nodePath; + } + + Map getDocuments() { + return documentsRef.get(); + } + + @Nullable + Document getDocument(String cacheFileId) { + final Map documents = getDocuments(); + if (documents == null) { + assert false : "this method should only be used when loading persistent cache, before any prior commit"; + throw new IllegalStateException("Persistent cache index was already committed"); + } + return documents.get(cacheFileId); + } + + void updateCacheFile(CacheFile cacheFile, SortedSet> cacheRanges) throws IOException { + assert getDocuments() == null : "this method should only be used after loading persistent cache"; + final Term term = buildTerm(cacheFile); + logger.debug("updating document with term [{}]", term); + indexWriter.updateDocument(term, buildDocument(cacheFile, cacheRanges)); + } + + void updateCacheFile(String cacheFileId, Document cacheFileDocument) throws IOException { + assert getDocuments() != null : "this method should only be used when loading persistent cache, before any prior commit"; + final Term term = buildTerm(cacheFileId); + logger.debug("updating document with term [{}]", term); + indexWriter.updateDocument(term, cacheFileDocument); + } + + void deleteCacheFile(CacheFile cacheFile) throws IOException { + deleteCacheFile(buildId(cacheFile)); + } + + void deleteCacheFile(String cacheFileId) throws IOException { + final Term term = buildTerm(cacheFileId); + logger.debug("deleting document with term [{}]", term); + indexWriter.deleteDocuments(term); + } + + void prepareCommit() throws IOException { + logger.debug("preparing commit"); + final Map commitData = new HashMap<>(1); + commitData.put(NODE_VERSION_COMMIT_KEY, Integer.toString(Version.CURRENT.id)); + indexWriter.setLiveCommitData(commitData.entrySet()); + indexWriter.prepareCommit(); + } + + void commit() throws IOException { + boolean success = false; + try { + logger.debug("committing"); + indexWriter.commit(); + success = true; + } finally { + if (success) { + Map documents = documentsRef.getAndSet(null); + if (documents != null) { + logger.trace("clearing existing cache documents"); + documents.clear(); + } + } + } + } + + @Override + public void close() throws IOException { + logger.debug("closing persistent cache index"); + IOUtils.close(indexWriter, directory); + } + + @Override + public String toString() { + return "[persistent cache index][" + nodePath + ']'; + } + } + + /** + * {@link CacheFileVisitor} is used to visit cache files on disk and find information about them using the Lucene documents loaded + * at startup from the persistent cache index. If there are no corresponding document for a cache file, the cache file is deleted + * from disk. If a corresponding document is found, the cache file is added to the current persistent cache index and inserted in + * the searchable snapshots cache. + */ + private static class CacheFileVisitor extends SimpleFileVisitor { + + private final CacheService cacheService; + private final CacheIndexWriter writer; + + private CacheFileVisitor(CacheService cacheService, CacheIndexWriter writer) { + this.cacheService = Objects.requireNonNull(cacheService); + this.writer = Objects.requireNonNull(writer); + } + + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { + try { + final String id = buildId(file); + final Document cacheDocument = writer.getDocument(id); + if (cacheDocument != null) { + logger.trace("indexing cache file with id [{}] in persistent cache index", id); + writer.updateCacheFile(id, cacheDocument); + + final CacheKey cacheKey = buildCacheKey(cacheDocument); + logger.trace("adding cache file with [id={}, cache key={}]", id, cacheKey); + final long fileLength = getFileLength(cacheDocument); + cacheService.put(cacheKey, fileLength, file.getParent(), id, buildCacheFileRanges(cacheDocument)); + } else { + logger.trace("deleting cache file [{}] (does not exist in persistent cache index)", file); + Files.delete(file); + } + } catch (Exception e) { + throw ExceptionsHelper.convertToRuntime(e); + } + return FileVisitResult.CONTINUE; + } + } + + private static final String CACHE_ID_FIELD = "cache_id"; + private static final String CACHE_PATH_FIELD = "cache_path"; + private static final String CACHE_RANGES_FIELD = "cache_ranges"; + private static final String SNAPSHOT_ID_FIELD = "snapshot_id"; + private static final String SNAPSHOT_NAME_FIELD = "snapshot_name"; + private static final String INDEX_ID_FIELD = "index_id"; + private static final String INDEX_NAME_FIELD = "index_name"; + private static final String SHARD_INDEX_NAME_FIELD = "shard_index_name"; + private static final String SHARD_INDEX_ID_FIELD = "shard_index_id"; + private static final String SHARD_ID_FIELD = "shard_id"; + private static final String FILE_NAME_FIELD = "file_name"; + private static final String FILE_LENGTH_FIELD = "file_length"; + + private static String buildId(CacheFile cacheFile) { + return buildId(cacheFile.getFile()); + } + + private static String buildId(Path path) { + return path.getFileName().toString(); + } + + private static Term buildTerm(CacheFile cacheFile) { + return buildTerm(buildId(cacheFile)); + } + + private static Term buildTerm(String cacheFileUuid) { + return new Term(CACHE_ID_FIELD, cacheFileUuid); + } + + private static Document buildDocument(CacheFile cacheFile, SortedSet> cacheRanges) throws IOException { + final Document document = new Document(); + document.add(new StringField(CACHE_ID_FIELD, buildId(cacheFile), Field.Store.YES)); + document.add(new StringField(CACHE_PATH_FIELD, cacheFile.getFile().toString(), Field.Store.YES)); + + try (BytesStreamOutput output = new BytesStreamOutput()) { + output.writeVInt(cacheRanges.size()); + for (Tuple cacheRange : cacheRanges) { + output.writeVLong(cacheRange.v1()); + output.writeVLong(cacheRange.v2()); + } + output.flush(); + document.add(new StoredField(CACHE_RANGES_FIELD, output.bytes().toBytesRef())); + } + + final CacheKey cacheKey = cacheFile.getCacheKey(); + document.add(new StringField(FILE_NAME_FIELD, cacheKey.getFileName(), Field.Store.YES)); + document.add(new StringField(FILE_LENGTH_FIELD, Long.toString(cacheFile.getLength()), Field.Store.YES)); + + final SnapshotId snapshotId = cacheKey.getSnapshotId(); + document.add(new StringField(SNAPSHOT_NAME_FIELD, snapshotId.getName(), Field.Store.YES)); + document.add(new StringField(SNAPSHOT_ID_FIELD, snapshotId.getUUID(), Field.Store.YES)); + + final IndexId indexId = cacheKey.getIndexId(); + document.add(new StringField(INDEX_NAME_FIELD, indexId.getName(), Field.Store.YES)); + document.add(new StringField(INDEX_ID_FIELD, indexId.getId(), Field.Store.YES)); + + final ShardId shardId = cacheKey.getShardId(); + document.add(new StringField(SHARD_INDEX_NAME_FIELD, shardId.getIndex().getName(), Field.Store.YES)); + document.add(new StringField(SHARD_INDEX_ID_FIELD, shardId.getIndex().getUUID(), Field.Store.YES)); + document.add(new StringField(SHARD_ID_FIELD, Integer.toString(shardId.getId()), Field.Store.YES)); + + return document; + } + + private static String getValue(Document document, String fieldName) { + final String value = document.get(fieldName); + assert value != null : "no value found for field [" + fieldName + "] and document [" + document + ']'; + return value; + } + + private static CacheKey buildCacheKey(Document document) { + return new CacheKey( + new SnapshotId(getValue(document, SNAPSHOT_NAME_FIELD), getValue(document, SNAPSHOT_ID_FIELD)), + new IndexId(getValue(document, INDEX_NAME_FIELD), getValue(document, INDEX_ID_FIELD)), + new ShardId( + new Index(getValue(document, SHARD_INDEX_NAME_FIELD), getValue(document, SHARD_INDEX_ID_FIELD)), + Integer.parseInt(getValue(document, SHARD_ID_FIELD)) + ), + getValue(document, FILE_NAME_FIELD) + ); + } + + private static long getFileLength(Document document) { + final String fileLength = getValue(document, FILE_LENGTH_FIELD); + assert fileLength != null; + return Long.parseLong(fileLength); + } + + private static SortedSet> buildCacheFileRanges(Document document) throws IOException { + final BytesRef cacheRangesBytesRef = document.getBinaryValue(CACHE_RANGES_FIELD); + assert cacheRangesBytesRef != null; + + final SortedSet> cacheRanges = new TreeSet<>(Comparator.comparingLong(Tuple::v1)); + try (StreamInput input = new ByteBufferStreamInput(ByteBuffer.wrap(cacheRangesBytesRef.bytes))) { + final int length = input.readVInt(); + assert length > 0 : "empty cache ranges"; + Tuple previous = null; + for (int i = 0; i < length; i++) { + final Tuple range = Tuple.tuple(input.readVLong(), input.readVLong()); + assert range.v1() < range.v2() : range; + assert range.v2() <= getFileLength(document); + assert previous == null || previous.v2() < range.v1(); + final boolean added = cacheRanges.add(range); + assert added : range + " already exist in " + cacheRanges; + } + } + return unmodifiableSortedSet(cacheRanges); + } + + static Path resolveCacheIndexFolder(NodeEnvironment.NodePath nodePath) { + return CacheService.resolveSnapshotCache(nodePath.path); + } + + /** + * Creates a directory for the snapshot cache Lucene index. + */ + private static Path createCacheIndexFolder(NodeEnvironment.NodePath nodePath) throws IOException { + // "snapshot_cache" directory at the root of the specified data path + final Path snapshotCacheRootDir = resolveCacheIndexFolder(nodePath); + if (Files.exists(snapshotCacheRootDir) == false) { + logger.debug("creating new persistent cache index directory [{}]", snapshotCacheRootDir); + Files.createDirectories(snapshotCacheRootDir); + } + return snapshotCacheRootDir; + } +} diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java index 106c43ac1608b..32eb7eca6260d 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java @@ -214,7 +214,7 @@ public void testFSync() throws Exception { final AtomicBoolean needsFSyncCalled = new AtomicBoolean(); final CacheFile cacheFile = new CacheFile( CACHE_KEY, - randomLongBetween(100, 1000), + randomLongBetween(0L, 1000L), fileSystem.resolve("test"), () -> assertFalse(needsFSyncCalled.getAndSet(true)) ); @@ -261,7 +261,7 @@ public void testFSyncOnEvictedFile() throws Exception { final AtomicBoolean needsFSyncCalled = new AtomicBoolean(); final CacheFile cacheFile = new CacheFile( CACHE_KEY, - randomLongBetween(1L, 1000L), + randomLongBetween(0L, 1000L), fileSystem.resolve("test"), () -> assertFalse(needsFSyncCalled.getAndSet(true)) ); @@ -315,7 +315,7 @@ public void testFSyncFailure() throws Exception { final AtomicBoolean needsFSyncCalled = new AtomicBoolean(); final CacheFile cacheFile = new CacheFile( CACHE_KEY, - randomLongBetween(1L, 1000L), + randomLongBetween(0L, 1000L), fileSystem.resolve("test"), () -> assertFalse(needsFSyncCalled.getAndSet(true)) ); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java index 6073e040bf668..922cd5646806f 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java @@ -14,6 +14,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.coordination.DeterministicTaskQueue; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.TriConsumer; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetadata; import org.elasticsearch.common.blobstore.BlobPath; @@ -62,6 +63,10 @@ public final class TestUtils { private TestUtils() {} public static SortedSet> randomPopulateAndReads(final CacheFile cacheFile) { + return randomPopulateAndReads(cacheFile, (fileChannel, aLong, aLong2) -> {}); + } + + public static SortedSet> randomPopulateAndReads(CacheFile cacheFile, TriConsumer consumer) { final SortedSet> ranges = synchronizedNavigableSet(new TreeSet<>(Comparator.comparingLong(Tuple::v1))); final List> futures = new ArrayList<>(); final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue( @@ -69,11 +74,12 @@ public static SortedSet> randomPopulateAndReads(final CacheFil random() ); for (int i = 0; i < between(0, 10); i++) { - final long start = randomLongBetween(0L, cacheFile.getLength() - 1L); - final long end = randomLongBetween(start + 1L, cacheFile.getLength()); + final long start = randomLongBetween(0L, Math.max(0L, cacheFile.getLength() - 1L)); + final long end = randomLongBetween(Math.min(start + 1L, cacheFile.getLength()), cacheFile.getLength()); final Tuple range = Tuple.tuple(start, end); futures.add( cacheFile.populateAndRead(range, range, channel -> Math.toIntExact(end - start), (channel, from, to, progressUpdater) -> { + consumer.apply(channel, from, to); ranges.add(Tuple.tuple(from, to)); progressUpdater.accept(to); }, deterministicTaskQueue.getThreadPool().generic()) diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java index 8edf4aa3f7865..65b288e98c9d8 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java @@ -35,9 +35,11 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPoolStats; import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; +import org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache; import org.junit.After; import org.junit.Before; +import java.nio.file.Path; import java.util.Collections; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -60,6 +62,7 @@ public abstract class AbstractSearchableSnapshotsTestCase extends ESIndexInputTe protected ThreadPool threadPool; protected ClusterService clusterService; protected NodeEnvironment nodeEnvironment; + protected PersistentCache persistentCache; @Before public void setUpTest() throws Exception { @@ -77,7 +80,7 @@ public void setUpTest() throws Exception { @After public void tearDownTest() throws Exception { - IOUtils.close(nodeEnvironment, clusterService); + IOUtils.close(persistentCache, nodeEnvironment, clusterService); assertTrue(ThreadPool.terminate(threadPool, 30L, TimeUnit.SECONDS)); } @@ -85,7 +88,7 @@ public void tearDownTest() throws Exception { * @return a new {@link CacheService} instance configured with default settings */ protected CacheService defaultCacheService() { - return new CacheService(Settings.EMPTY, clusterService, threadPool, AbstractSearchableSnapshotsTestCase::noOpCacheCleaner); + return new CacheService(Settings.EMPTY, clusterService, threadPool, new PersistentCache(nodeEnvironment)); } /** @@ -105,7 +108,7 @@ protected CacheService randomCacheService() { TimeValue.timeValueSeconds(scaledRandomIntBetween(1, 120)) ); } - return new CacheService(cacheSettings.build(), clusterService, threadPool, AbstractSearchableSnapshotsTestCase::noOpCacheCleaner); + return new CacheService(cacheSettings.build(), clusterService, threadPool, new PersistentCache(nodeEnvironment)); } /** @@ -119,11 +122,16 @@ protected CacheService createCacheService(final ByteSizeValue cacheSize, final B .build(), clusterService, threadPool, - AbstractSearchableSnapshotsTestCase::noOpCacheCleaner + new PersistentCache(nodeEnvironment) ); } - protected static void noOpCacheCleaner() {} + /** + * Returns a random shard data path for the specified {@link ShardId}. The returned path can be located on any of the data node paths. + */ + protected Path randomShardPath(ShardId shardId) { + return randomFrom(nodeEnvironment.availableShardPaths(shardId)); + } /** * @return a random {@link ByteSizeValue} that can be used to set {@link CacheService#SNAPSHOT_CACHE_SIZE_SETTING}. diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java index f95b58b880971..2492b3f9ae44e 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.searchablesnapshots.cache; import org.apache.lucene.util.Constants; +import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.PathUtils; @@ -41,6 +42,7 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +@LuceneTestCase.SuppressFileSystems("ExtrasFS") // we don't want extra empty dirs in snapshot cache root dirs public class CacheServiceTests extends AbstractSearchableSnapshotsTestCase { private static FSyncTrackingFileSystemProvider fileSystemProvider; @@ -66,7 +68,7 @@ public void testCacheSynchronization() throws Exception { logger.debug("--> creating shard cache directories on disk"); final Path[] shardsCacheDirs = new Path[numShards]; for (int i = 0; i < numShards; i++) { - final Path shardDataPath = randomFrom(nodeEnvironment.availableShardPaths(new ShardId(index, i))); + final Path shardDataPath = randomShardPath(new ShardId(index, i)); assertFalse(Files.exists(shardDataPath)); logger.debug("--> creating directories [{}] for shard [{}]", shardDataPath.toAbsolutePath(), i); @@ -107,7 +109,7 @@ public void testCacheSynchronization() throws Exception { final ShardId shardId = new ShardId(index, randomIntBetween(0, numShards - 1)); final String fileName = String.format(Locale.ROOT, "file_%d_%d", iteration, i); final CacheKey cacheKey = new CacheKey(snapshotId, indexId, shardId, fileName); - final CacheFile cacheFile = cacheService.get(cacheKey, randomIntBetween(1, 10_000), shardsCacheDirs[shardId.id()]); + final CacheFile cacheFile = cacheService.get(cacheKey, randomIntBetween(0, 10_000), shardsCacheDirs[shardId.id()]); final CacheFile.EvictionListener listener = evictedCacheFile -> {}; cacheFile.acquire(listener); @@ -121,7 +123,7 @@ public void testCacheSynchronization() throws Exception { logger.trace("--> evicting random cache files"); final Map evictions = new HashMap<>(); for (CacheKey evictedCacheKey : randomSubsetOf(Sets.union(previous.keySet(), updates.keySet()))) { - cacheService.removeFromCache(evictedCacheKey::equals); + cacheService.removeFromCache(evictedCacheKey); Tuple evicted = previous.remove(evictedCacheKey); if (evicted != null) { evictions.put(evicted.v1(), evicted.v2()); @@ -204,6 +206,7 @@ public void testPut() throws Exception { FileNotFoundException.class, () -> cacheService.put(cacheKey, fileLength, cacheDir, cacheFileUuid, cacheFileRanges) ); + cacheService.start(); assertThat(exception.getMessage(), containsString(cacheFileUuid)); } } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java new file mode 100644 index 0000000000000..5f0b58ca88e6e --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java @@ -0,0 +1,203 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.searchablesnapshots.cache; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StringField; +import org.apache.lucene.util.LuceneTestCase; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.cache.CacheFile; +import org.elasticsearch.index.store.cache.CacheKey; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.xpack.searchablesnapshots.AbstractSearchableSnapshotsTestCase; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; + +import static org.elasticsearch.cluster.node.DiscoveryNodeRole.BUILT_IN_ROLES; +import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_ROLE; +import static org.elasticsearch.index.store.cache.TestUtils.randomPopulateAndReads; +import static org.elasticsearch.node.NodeRoleSettings.NODE_ROLES_SETTING; +import static org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache.createCacheIndexWriter; +import static org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache.resolveCacheIndexFolder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; + +@LuceneTestCase.SuppressFileSystems("ExtrasFS") // we don't want extra empty dirs in snapshot cache root dirs +public class PersistentCacheTests extends AbstractSearchableSnapshotsTestCase { + + public void testCacheIndexWriter() throws Exception { + final NodeEnvironment.NodePath nodePath = randomFrom(nodeEnvironment.nodePaths()); + + int docId = 0; + final Map liveDocs = new HashMap<>(); + final Set deletedDocs = new HashSet<>(); + + for (int iter = 0; iter < 20; iter++) { + + final Path snapshotCacheIndexDir = resolveCacheIndexFolder(nodePath); + assertThat(Files.exists(snapshotCacheIndexDir), equalTo(iter > 0)); + + try (PersistentCache.CacheIndexWriter writer = createCacheIndexWriter(nodePath)) { + assertThat(writer.nodePath(), sameInstance(nodePath)); + assertThat(writer.getDocuments(), notNullValue()); + assertThat(writer.getDocuments().size(), equalTo(liveDocs.size())); + + // verify that existing documents are loaded + for (Map.Entry liveDoc : liveDocs.entrySet()) { + final Document document = writer.getDocument(liveDoc.getKey()); + assertThat("Document should be loaded", document, notNullValue()); + final String iteration = document.get("update_iteration"); + assertThat(iteration, equalTo(String.valueOf(liveDoc.getValue()))); + writer.updateCacheFile(liveDoc.getKey(), document); + } + + // verify that deleted documents are not loaded + for (String deletedDoc : deletedDocs) { + final Document document = writer.getDocument(deletedDoc); + assertThat("Document should not be loaded", document, nullValue()); + } + + // random updates of existing documents + final Map updatedDocs = new HashMap<>(); + for (String cacheId : randomSubsetOf(liveDocs.keySet())) { + final Document document = new Document(); + document.add(new StringField("cache_id", cacheId, Field.Store.YES)); + document.add(new StringField("update_iteration", String.valueOf(iter), Field.Store.YES)); + writer.updateCacheFile(cacheId, document); + + updatedDocs.put(cacheId, iter); + } + + // create new random documents + final Map newDocs = new HashMap<>(); + for (int i = 0; i < between(1, 10); i++) { + final String cacheId = String.valueOf(docId++); + final Document document = new Document(); + document.add(new StringField("cache_id", cacheId, Field.Store.YES)); + document.add(new StringField("update_iteration", String.valueOf(iter), Field.Store.YES)); + writer.updateCacheFile(cacheId, document); + + newDocs.put(cacheId, iter); + } + + // deletes random documents + final Map removedDocs = new HashMap<>(); + for (String cacheId : randomSubsetOf(Sets.union(liveDocs.keySet(), newDocs.keySet()))) { + writer.deleteCacheFile(cacheId); + + removedDocs.put(cacheId, iter); + } + + boolean commit = false; + if (randomBoolean()) { + writer.prepareCommit(); + if (randomBoolean()) { + writer.commit(); + commit = true; + } + } + + if (commit) { + assertThat(writer.getDocuments(), nullValue()); + liveDocs.putAll(updatedDocs); + liveDocs.putAll(newDocs); + for (String cacheId : removedDocs.keySet()) { + liveDocs.remove(cacheId); + deletedDocs.add(cacheId); + } + } + } + } + } + + private static final byte[] buffer; + static { + buffer = new byte[1024]; + Arrays.fill(buffer, (byte) 0xff); + } + + public void testCleanUp() throws Exception { + final List cacheFiles = new ArrayList<>(); + try (CacheService cacheService = defaultCacheService()) { + cacheService.start(); + + for (int snapshots = 0; snapshots < between(1, 2); snapshots++) { + SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())); + for (int indices = 0; indices < between(1, 2); indices++) { + IndexId indexId = new IndexId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())); + for (int shards = 0; shards < between(1, 2); shards++) { + ShardId shardId = new ShardId(indexId.getName(), indexId.getId(), shards); + + final Path cacheDir = Files.createDirectories( + CacheService.resolveSnapshotCache(randomShardPath(shardId)).resolve(snapshotId.getUUID()) + ); + cacheFiles.add(cacheDir); + + for (int files = 0; files < between(1, 2); files++) { + final CacheKey cacheKey = new CacheKey(snapshotId, indexId, shardId, "file_" + files); + final CacheFile cacheFile = cacheService.get(cacheKey, randomLongBetween(0L, buffer.length), cacheDir); + + final CacheFile.EvictionListener listener = evictedCacheFile -> {}; + cacheFile.acquire(listener); + try { + randomPopulateAndReads(cacheFile, (channel, from, to) -> { + try { + channel.write(ByteBuffer.wrap(buffer, Math.toIntExact(from), Math.toIntExact(to))); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + cacheFiles.add(cacheFile.getFile()); + } finally { + cacheFile.release(listener); + } + } + } + } + } + if (randomBoolean()) { + cacheService.synchronizeCache(); + } + } + + final Settings nodeSettings = Settings.builder() + .put(NODE_ROLES_SETTING.getKey(), randomValueOtherThan(DATA_ROLE, () -> randomFrom(BUILT_IN_ROLES)).roleName()) + .build(); + + assertTrue(cacheFiles.stream().allMatch(Files::exists)); + PersistentCache.cleanUp(nodeSettings, nodeEnvironment); + assertTrue(cacheFiles.stream().noneMatch(Files::exists)); + } + + private static CacheKey randomCacheKey() { + return new CacheKey( + new SnapshotId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())), + new IndexId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())), + new ShardId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), randomAlphaOfLength(5).toLowerCase(Locale.ROOT), randomInt(10)), + randomAlphaOfLength(5).toLowerCase(Locale.ROOT) + ); + } +} From 6ff96eb6d5c89c48cf09e6743b57f0d326f332ad Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 2 Dec 2020 12:31:32 +0100 Subject: [PATCH 02/20] indexed cache file path should be relative --- .../xpack/searchablesnapshots/cache/PersistentCache.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java index 2c5d6fc22edbf..e2a8ebf904b53 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java @@ -388,7 +388,7 @@ void updateCacheFile(CacheFile cacheFile, SortedSet> cacheRang assert getDocuments() == null : "this method should only be used after loading persistent cache"; final Term term = buildTerm(cacheFile); logger.debug("updating document with term [{}]", term); - indexWriter.updateDocument(term, buildDocument(cacheFile, cacheRanges)); + indexWriter.updateDocument(term, buildDocument(nodePath, cacheFile, cacheRanges)); } void updateCacheFile(String cacheFileId, Document cacheFileDocument) throws IOException { @@ -514,10 +514,11 @@ private static Term buildTerm(String cacheFileUuid) { return new Term(CACHE_ID_FIELD, cacheFileUuid); } - private static Document buildDocument(CacheFile cacheFile, SortedSet> cacheRanges) throws IOException { + private static Document buildDocument(NodeEnvironment.NodePath nodePath, CacheFile cacheFile, SortedSet> cacheRanges) + throws IOException { final Document document = new Document(); document.add(new StringField(CACHE_ID_FIELD, buildId(cacheFile), Field.Store.YES)); - document.add(new StringField(CACHE_PATH_FIELD, cacheFile.getFile().toString(), Field.Store.YES)); + document.add(new StringField(CACHE_PATH_FIELD, nodePath.indicesPath.relativize(cacheFile.getFile()).toString(), Field.Store.YES)); try (BytesStreamOutput output = new BytesStreamOutput()) { output.writeVInt(cacheRanges.size()); @@ -587,8 +588,10 @@ private static SortedSet> buildCacheFileRanges(Document docume assert range.v1() < range.v2() : range; assert range.v2() <= getFileLength(document); assert previous == null || previous.v2() < range.v1(); + final boolean added = cacheRanges.add(range); assert added : range + " already exist in " + cacheRanges; + previous = range; } } return unmodifiableSortedSet(cacheRanges); From 4b697a8c7ef35de7f56e6478cddfa735259b8bd1 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 2 Dec 2020 12:31:50 +0100 Subject: [PATCH 03/20] mute test + remove suppress filesystems --- .../BaseSearchableSnapshotsIntegTestCase.java | 2 -- .../SearchableSnapshotsIntegTests.java | 1 + .../cache/PersistentCacheTests.java | 11 ----------- 3 files changed, 1 insertion(+), 13 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java index 40e9abd1d77a4..fcbcc2f8db6b3 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java @@ -24,7 +24,6 @@ */ package org.elasticsearch.xpack.searchablesnapshots; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.common.Strings; @@ -47,7 +46,6 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; -@LuceneTestCase.SuppressFileSystems("*") public abstract class BaseSearchableSnapshotsIntegTestCase extends AbstractSnapshotIntegTestCase { @Override protected boolean addMockInternalEngine() { diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java index f12a7d87df671..194c9d31fbdc6 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java @@ -92,6 +92,7 @@ public class SearchableSnapshotsIntegTests extends BaseSearchableSnapshotsIntegTestCase { + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/65725") // TODO under investigation public void testCreateAndRestoreSearchableSnapshot() throws Exception { final String fsRepoName = randomAlphaOfLength(10); final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java index 5f0b58ca88e6e..f75098765e1dd 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java @@ -9,7 +9,6 @@ import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.StringField; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; @@ -45,7 +44,6 @@ import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; -@LuceneTestCase.SuppressFileSystems("ExtrasFS") // we don't want extra empty dirs in snapshot cache root dirs public class PersistentCacheTests extends AbstractSearchableSnapshotsTestCase { public void testCacheIndexWriter() throws Exception { @@ -191,13 +189,4 @@ public void testCleanUp() throws Exception { PersistentCache.cleanUp(nodeSettings, nodeEnvironment); assertTrue(cacheFiles.stream().noneMatch(Files::exists)); } - - private static CacheKey randomCacheKey() { - return new CacheKey( - new SnapshotId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())), - new IndexId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())), - new ShardId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), randomAlphaOfLength(5).toLowerCase(Locale.ROOT), randomInt(10)), - randomAlphaOfLength(5).toLowerCase(Locale.ROOT) - ); - } } From 039e8eadb5d231352809f9da0eba18fdb3ae8cbb Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 2 Dec 2020 12:47:29 +0100 Subject: [PATCH 04/20] remove leftovers --- .../store/SearchableSnapshotDirectory.java | 2 +- .../SearchableSnapshotIndexEventListener.java | 30 +------------------ .../SearchableSnapshots.java | 2 +- 3 files changed, 3 insertions(+), 31 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java index b6456e0302e54..44108e014211d 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java @@ -328,7 +328,7 @@ private static UnsupportedOperationException unsupportedException() { } /** - * Indicate if the cache should be cleared for this directory + * Flag the current directory so that cache files associated to it will be evicted when the directory is closed. */ public void clearCacheOnClose() { final boolean value = clearCacheOnClose.getAndSet(true); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java index d5d8488b4307d..8940bdc2e030e 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java @@ -9,7 +9,6 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.index.SegmentInfos; import org.elasticsearch.cluster.routing.RecoverySource; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -20,28 +19,16 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogException; import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; -import org.elasticsearch.repositories.IndexId; -import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; import java.nio.file.Path; import static org.elasticsearch.index.store.SearchableSnapshotDirectory.unwrapDirectory; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING; -import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.isSearchableSnapshotStore; public class SearchableSnapshotIndexEventListener implements IndexEventListener { private static final Logger logger = LogManager.getLogger(SearchableSnapshotIndexEventListener.class); - private final CacheService cacheService; - - public SearchableSnapshotIndexEventListener(CacheService cacheService) { - this.cacheService = cacheService; - } @Override public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexSettings) { @@ -60,28 +47,13 @@ public void beforeIndexRemoved(IndexService indexService, IndexRemovalReason rea assert directory != null : "expect a searchable snapshot directory instance"; directory.clearCacheOnClose(); } catch (Exception e) { - logger.warn("failed to close shard", e); + logger.warn("something went wrong when setting clear-on-close flag", e); } } } } } - @Override - public void beforeIndexShardDeleted(ShardId shardId, Settings indexSettings) { - if (SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING.get(indexSettings)) { - final SnapshotId snapshotId = new SnapshotId( - SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings), - SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings) - ); - final IndexId indexId = new IndexId( - SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings), - SNAPSHOT_INDEX_ID_SETTING.get(indexSettings) - ); - // cacheService.removeFromCache(cacheKey -> cacheKey.belongsTo(snapshotId, indexId, shardId)); - } - } - private static void ensureSnapshotIsLoaded(IndexShard indexShard) { final SearchableSnapshotDirectory directory = unwrapDirectory(indexShard.store().directory()); assert directory != null; diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java index 5dc9638e2e269..6e4027f41c6a6 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java @@ -230,7 +230,7 @@ public Collection createComponents( @Override public void onIndexModule(IndexModule indexModule) { if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexModule.getSettings())) { - indexModule.addIndexEventListener(new SearchableSnapshotIndexEventListener(cacheService.get())); + indexModule.addIndexEventListener(new SearchableSnapshotIndexEventListener()); indexModule.addIndexEventListener(failShardsListener.get()); } } From 9bef60dea1477da2abcbd27b9ddb2c86a40d9d37 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 3 Dec 2020 12:02:14 +0100 Subject: [PATCH 05/20] nodePath --- .../xpack/searchablesnapshots/cache/PersistentCache.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java index e2a8ebf904b53..4cdad20f2e8c0 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java @@ -140,7 +140,7 @@ void loadCacheFiles(CacheService cacheService) { final NodeEnvironment.NodePath nodePath = writer.nodePath(); logger.debug("loading persistent cache on data path [{}]", nodePath); - for (String indexUUID : nodeEnvironment.availableIndexFoldersForPath(writer.nodePath())) { + for (String indexUUID : nodeEnvironment.availableIndexFoldersForPath(nodePath)) { for (ShardId shardId : nodeEnvironment.findAllShardIds(new Index("_unknown_", indexUUID))) { final Path shardDataPath = writer.nodePath().resolve(shardId); final Path shardCachePath = getShardCachePath(new ShardPath(false, shardDataPath, shardDataPath, shardId)); From 5dcd53aaede280dc6a2114ff5bf74ce0af3c3214 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 3 Dec 2020 13:15:32 +0100 Subject: [PATCH 06/20] extract map --- .../cache/PersistentCache.java | 115 ++++++++---------- .../cache/PersistentCacheTests.java | 15 +-- 2 files changed, 61 insertions(+), 69 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java index 4cdad20f2e8c0..6db77d8330a5c 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java @@ -31,7 +31,6 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.ByteBufferStreamInput; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -65,8 +64,8 @@ import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; +import static java.util.Collections.synchronizedMap; import static java.util.Collections.unmodifiableList; import static java.util.Collections.unmodifiableSortedSet; import static org.elasticsearch.xpack.searchablesnapshots.cache.CacheService.getShardCachePath; @@ -78,10 +77,12 @@ public class PersistentCache implements Closeable { private static final String NODE_VERSION_COMMIT_KEY = "node_version"; private final NodeEnvironment nodeEnvironment; + private final Map documents; private final List writers; private final AtomicBoolean closed; public PersistentCache(NodeEnvironment nodeEnvironment) { + this.documents = synchronizedMap(loadExistingDocuments(nodeEnvironment)); this.writers = createWriters(nodeEnvironment); this.nodeEnvironment = nodeEnvironment; this.closed = new AtomicBoolean(); @@ -147,7 +148,7 @@ void loadCacheFiles(CacheService cacheService) { if (Files.isDirectory(shardCachePath)) { logger.trace("found snapshot cache dir at [{}], loading cache files from disk and index", shardCachePath); - Files.walkFileTree(shardCachePath, new CacheFileVisitor(cacheService, writer)); + Files.walkFileTree(shardCachePath, new CacheFileVisitor(cacheService, writer, documents)); } } } @@ -159,6 +160,7 @@ void loadCacheFiles(CacheService cacheService) { writer.commit(); } logger.info("persistent cache index loaded"); + documents.clear(); } catch (IOException e) { try { close(); @@ -227,6 +229,7 @@ public long getNumDocs() { public void close() throws IOException { if (closed.compareAndSet(false, true)) { IOUtils.close(writers); + documents.clear(); } } @@ -270,22 +273,6 @@ static CacheIndexWriter createCacheIndexWriter(NodeEnvironment.NodePath nodePath final Directory directory = FSDirectory.open(directoryPath); closeables.add(directory); - final Map documents = new HashMap<>(); - try (IndexReader indexReader = DirectoryReader.open(directory)) { - for (LeafReaderContext leafReaderContext : indexReader.leaves()) { - final LeafReader leafReader = leafReaderContext.reader(); - final Bits liveDocs = leafReader.getLiveDocs(); - for (int i = 0; i < leafReader.maxDoc(); i++) { - if (liveDocs == null || liveDocs.get(i)) { - final Document document = leafReader.document(i); - documents.put(getValue(document, CACHE_ID_FIELD), document); - } - } - } - } catch (IndexNotFoundException e) { - logger.debug("persistent cache index does not exist yet", e); - } - final IndexWriterConfig config = new IndexWriterConfig(new KeywordAnalyzer()); config.setIndexDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()); config.setOpenMode(IndexWriterConfig.OpenMode.CREATE); @@ -296,7 +283,7 @@ static CacheIndexWriter createCacheIndexWriter(NodeEnvironment.NodePath nodePath final IndexWriter indexWriter = new IndexWriter(directory, config); closeables.add(indexWriter); - final CacheIndexWriter cacheIndexWriter = new CacheIndexWriter(nodePath, directory, indexWriter, documents); + final CacheIndexWriter cacheIndexWriter = new CacheIndexWriter(nodePath, directory, indexWriter); success = true; return cacheIndexWriter; } finally { @@ -306,6 +293,44 @@ static CacheIndexWriter createCacheIndexWriter(NodeEnvironment.NodePath nodePath } } + /** + * Load existing documents from persistent cache indices located at the root of every node path. + * + * @param nodeEnvironment the data node environment + * @return a map of {cache file uuid, Lucene document} + */ + static Map loadExistingDocuments(NodeEnvironment nodeEnvironment) { + final Map documents = new HashMap<>(); + try { + for (NodeEnvironment.NodePath nodePath : nodeEnvironment.nodePaths()) { + final Path directoryPath = resolveCacheIndexFolder(nodePath); + if (Files.exists(directoryPath)) { + try (Directory directory = FSDirectory.open(directoryPath)) { + try (IndexReader indexReader = DirectoryReader.open(directory)) { + logger.trace("loading documents from persistent cache index [{}]", directoryPath); + for (LeafReaderContext leafReaderContext : indexReader.leaves()) { + final LeafReader leafReader = leafReaderContext.reader(); + final Bits liveDocs = leafReader.getLiveDocs(); + for (int i = 0; i < leafReader.maxDoc(); i++) { + if (liveDocs == null || liveDocs.get(i)) { + final Document document = leafReader.document(i); + logger.trace("loading document [{}]", document); + documents.put(getValue(document, CACHE_ID_FIELD), document); + } + } + } + } catch (IndexNotFoundException e) { + logger.debug("persistent cache index does not exist yet", e); + } + } + } + } + } catch (IOException e) { + throw new UncheckedIOException("Failed to load existing documents from persistent cache index", e); + } + return documents; + } + /** * Cleans any leftover searchable snapshot caches (files and Lucene indices) when a non-data node is starting up. * This is useful when the node is repurposed and is not a data node anymore. @@ -344,23 +369,15 @@ public static void cleanUp(Settings settings, NodeEnvironment nodeEnvironment) { /** * A {@link CacheIndexWriter} contains a Lucene {@link Directory} with an {@link IndexWriter} that can be used to index documents in - * the persistent cache index. The list of existing cache documents is loaded at startup and kept around until a first commit is done. - * There is one {@link CacheIndexWriter} for each data path. + * the persistent cache index. There is one {@link CacheIndexWriter} for each data path. */ static class CacheIndexWriter implements Closeable { - private final AtomicReference> documentsRef; private final NodeEnvironment.NodePath nodePath; private final IndexWriter indexWriter; private final Directory directory; - private CacheIndexWriter( - NodeEnvironment.NodePath nodePath, - Directory directory, - IndexWriter indexWriter, - Map documents - ) { - this.documentsRef = new AtomicReference<>(Objects.requireNonNull(documents)); + private CacheIndexWriter(NodeEnvironment.NodePath nodePath, Directory directory, IndexWriter indexWriter) { this.nodePath = nodePath; this.directory = directory; this.indexWriter = indexWriter; @@ -370,29 +387,13 @@ NodeEnvironment.NodePath nodePath() { return nodePath; } - Map getDocuments() { - return documentsRef.get(); - } - - @Nullable - Document getDocument(String cacheFileId) { - final Map documents = getDocuments(); - if (documents == null) { - assert false : "this method should only be used when loading persistent cache, before any prior commit"; - throw new IllegalStateException("Persistent cache index was already committed"); - } - return documents.get(cacheFileId); - } - void updateCacheFile(CacheFile cacheFile, SortedSet> cacheRanges) throws IOException { - assert getDocuments() == null : "this method should only be used after loading persistent cache"; final Term term = buildTerm(cacheFile); logger.debug("updating document with term [{}]", term); indexWriter.updateDocument(term, buildDocument(nodePath, cacheFile, cacheRanges)); } void updateCacheFile(String cacheFileId, Document cacheFileDocument) throws IOException { - assert getDocuments() != null : "this method should only be used when loading persistent cache, before any prior commit"; final Term term = buildTerm(cacheFileId); logger.debug("updating document with term [{}]", term); indexWriter.updateDocument(term, cacheFileDocument); @@ -417,20 +418,8 @@ void prepareCommit() throws IOException { } void commit() throws IOException { - boolean success = false; - try { - logger.debug("committing"); - indexWriter.commit(); - success = true; - } finally { - if (success) { - Map documents = documentsRef.getAndSet(null); - if (documents != null) { - logger.trace("clearing existing cache documents"); - documents.clear(); - } - } - } + logger.debug("committing"); + indexWriter.commit(); } @Override @@ -454,10 +443,12 @@ public String toString() { private static class CacheFileVisitor extends SimpleFileVisitor { private final CacheService cacheService; + private final Map documents; private final CacheIndexWriter writer; - private CacheFileVisitor(CacheService cacheService, CacheIndexWriter writer) { + private CacheFileVisitor(CacheService cacheService, CacheIndexWriter writer, Map documents) { this.cacheService = Objects.requireNonNull(cacheService); + this.documents = Objects.requireNonNull(documents); this.writer = Objects.requireNonNull(writer); } @@ -465,7 +456,7 @@ private CacheFileVisitor(CacheService cacheService, CacheIndexWriter writer) { public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { try { final String id = buildId(file); - final Document cacheDocument = writer.getDocument(id); + final Document cacheDocument = documents.get(id); if (cacheDocument != null) { logger.trace("indexing cache file with id [{}] in persistent cache index", id); writer.updateCacheFile(id, cacheDocument); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java index f75098765e1dd..af7e0a2b0d4d2 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java @@ -58,14 +58,16 @@ public void testCacheIndexWriter() throws Exception { final Path snapshotCacheIndexDir = resolveCacheIndexFolder(nodePath); assertThat(Files.exists(snapshotCacheIndexDir), equalTo(iter > 0)); + // load existing documents from persistent cache index before each iteration + final Map documents = PersistentCache.loadExistingDocuments(nodeEnvironment); + assertThat(documents.size(), equalTo(liveDocs.size())); + try (PersistentCache.CacheIndexWriter writer = createCacheIndexWriter(nodePath)) { assertThat(writer.nodePath(), sameInstance(nodePath)); - assertThat(writer.getDocuments(), notNullValue()); - assertThat(writer.getDocuments().size(), equalTo(liveDocs.size())); // verify that existing documents are loaded for (Map.Entry liveDoc : liveDocs.entrySet()) { - final Document document = writer.getDocument(liveDoc.getKey()); + final Document document = documents.get(liveDoc.getKey()); assertThat("Document should be loaded", document, notNullValue()); final String iteration = document.get("update_iteration"); assertThat(iteration, equalTo(String.valueOf(liveDoc.getValue()))); @@ -74,7 +76,7 @@ public void testCacheIndexWriter() throws Exception { // verify that deleted documents are not loaded for (String deletedDoc : deletedDocs) { - final Document document = writer.getDocument(deletedDoc); + final Document document = documents.get(deletedDoc); assertThat("Document should not be loaded", document, nullValue()); } @@ -110,16 +112,15 @@ public void testCacheIndexWriter() throws Exception { } boolean commit = false; - if (randomBoolean()) { + if (frequently()) { writer.prepareCommit(); - if (randomBoolean()) { + if (frequently()) { writer.commit(); commit = true; } } if (commit) { - assertThat(writer.getDocuments(), nullValue()); liveDocs.putAll(updatedDocs); liveDocs.putAll(newDocs); for (String cacheId : removedDocs.keySet()) { From 381188f6fa67cff7c63647d6e3442b50ad56cc3e Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 4 Dec 2020 17:02:19 +0100 Subject: [PATCH 07/20] Also match persistent cache index docs with cache files --- ...bleSnapshotsPersistentCacheIntegTests.java | 30 ++++++++-- .../cache/PersistentCache.java | 58 ++++++++++++------- .../cache/PersistentCacheTests.java | 2 +- 3 files changed, 63 insertions(+), 27 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/SearchableSnapshotsPersistentCacheIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/SearchableSnapshotsPersistentCacheIntegTests.java index 627c8e6733a48..48a2e0ba9d84a 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/SearchableSnapshotsPersistentCacheIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/SearchableSnapshotsPersistentCacheIntegTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.searchablesnapshots.cache; +import org.apache.lucene.document.Document; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -16,6 +17,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.test.InternalTestCluster; @@ -23,18 +25,22 @@ import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest; import org.elasticsearch.xpack.searchablesnapshots.BaseSearchableSnapshotsIntegTestCase; +import java.io.IOException; import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; import java.util.HashSet; import java.util.Locale; +import java.util.Map; import java.util.Set; import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_PREFIX; import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache.resolveCacheIndexFolder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.notNullValue; public class SearchableSnapshotsPersistentCacheIntegTests extends BaseSearchableSnapshotsIntegTestCase { @@ -94,7 +100,9 @@ public void testCacheSurviveRestart() throws Exception { .getIndex(); final IndexService indexService = internalCluster().getInstance(IndicesService.class, dataNode).indexService(restoredIndex); - final Path shardCachePath = CacheService.getShardCachePath(indexService.getShard(0).shardPath()); + final ShardPath shardPath = indexService.getShard(0).shardPath(); + final Path shardCachePath = CacheService.getShardCachePath(shardPath); + assertTrue(Files.isDirectory(shardCachePath)); final Set cacheFiles = new HashSet<>(); try (DirectoryStream snapshotCacheStream = Files.newDirectoryStream(shardCachePath)) { @@ -119,9 +127,22 @@ public void testCacheSurviveRestart() throws Exception { internalCluster().restartNode(dataNode, new InternalTestCluster.RestartCallback() { @Override public Settings onNodeStopped(String nodeName) { - assertTrue(Files.isDirectory(shardCachePath)); - for (Path cacheFile : cacheFiles) { - assertTrue(cacheFile + " should exist on disk", Files.isRegularFile(cacheFile)); + try { + assertTrue(Files.isDirectory(shardCachePath)); + + final Path persistentCacheIndexDir = resolveCacheIndexFolder(shardPath.getRootDataPath()); + assertTrue(Files.isDirectory(persistentCacheIndexDir)); + + final Map documents = PersistentCache.loadDocuments(persistentCacheIndexDir); + assertThat(documents.size(), equalTo(cacheFiles.size())); + + for (Path cacheFile : cacheFiles) { + final String cacheFileName = cacheFile.getFileName().toString(); + assertTrue(cacheFileName + " should exist on disk", Files.isRegularFile(cacheFile)); + assertThat(cacheFileName + " should exist in persistent cache index", documents.get(cacheFileName), notNullValue()); + } + } catch (IOException e) { + throw new AssertionError(e); } return Settings.EMPTY; } @@ -129,7 +150,6 @@ public Settings onNodeStopped(String nodeName) { persistentCache = internalCluster().getInstance(CacheService.class, dataNode).getPersistentCache(); assertThat(persistentCache.getNumDocs(), equalTo((long) cacheFiles.size())); - ensureGreen(restoredIndexName); cacheFiles.forEach(cacheFile -> assertTrue(cacheFile + " should have survived node restart", Files.exists(cacheFile))); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java index 6db77d8330a5c..752e2cd6558da 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java @@ -82,7 +82,7 @@ public class PersistentCache implements Closeable { private final AtomicBoolean closed; public PersistentCache(NodeEnvironment nodeEnvironment) { - this.documents = synchronizedMap(loadExistingDocuments(nodeEnvironment)); + this.documents = synchronizedMap(loadDocuments(nodeEnvironment)); this.writers = createWriters(nodeEnvironment); this.nodeEnvironment = nodeEnvironment; this.closed = new AtomicBoolean(); @@ -299,30 +299,13 @@ static CacheIndexWriter createCacheIndexWriter(NodeEnvironment.NodePath nodePath * @param nodeEnvironment the data node environment * @return a map of {cache file uuid, Lucene document} */ - static Map loadExistingDocuments(NodeEnvironment nodeEnvironment) { + static Map loadDocuments(NodeEnvironment nodeEnvironment) { final Map documents = new HashMap<>(); try { for (NodeEnvironment.NodePath nodePath : nodeEnvironment.nodePaths()) { final Path directoryPath = resolveCacheIndexFolder(nodePath); if (Files.exists(directoryPath)) { - try (Directory directory = FSDirectory.open(directoryPath)) { - try (IndexReader indexReader = DirectoryReader.open(directory)) { - logger.trace("loading documents from persistent cache index [{}]", directoryPath); - for (LeafReaderContext leafReaderContext : indexReader.leaves()) { - final LeafReader leafReader = leafReaderContext.reader(); - final Bits liveDocs = leafReader.getLiveDocs(); - for (int i = 0; i < leafReader.maxDoc(); i++) { - if (liveDocs == null || liveDocs.get(i)) { - final Document document = leafReader.document(i); - logger.trace("loading document [{}]", document); - documents.put(getValue(document, CACHE_ID_FIELD), document); - } - } - } - } catch (IndexNotFoundException e) { - logger.debug("persistent cache index does not exist yet", e); - } - } + documents.putAll(loadDocuments(directoryPath)); } } } catch (IOException e) { @@ -331,6 +314,35 @@ static Map loadExistingDocuments(NodeEnvironment nodeEnvironme return documents; } + /** + * Load existing documents from a persistent cache Lucene directory. + * + * @param directoryPath the Lucene directory path + * @return a map of {cache file uuid, Lucene document} + */ + static Map loadDocuments(Path directoryPath) throws IOException { + final Map documents = new HashMap<>(); + try (Directory directory = FSDirectory.open(directoryPath)) { + try (IndexReader indexReader = DirectoryReader.open(directory)) { + logger.trace("loading documents from persistent cache index [{}]", directoryPath); + for (LeafReaderContext leafReaderContext : indexReader.leaves()) { + final LeafReader leafReader = leafReaderContext.reader(); + final Bits liveDocs = leafReader.getLiveDocs(); + for (int i = 0; i < leafReader.maxDoc(); i++) { + if (liveDocs == null || liveDocs.get(i)) { + final Document document = leafReader.document(i); + logger.trace("loading document [{}]", document); + documents.put(getValue(document, CACHE_ID_FIELD), document); + } + } + } + } catch (IndexNotFoundException e) { + logger.debug("persistent cache index does not exist yet", e); + } + } + return documents; + } + /** * Cleans any leftover searchable snapshot caches (files and Lucene indices) when a non-data node is starting up. * This is useful when the node is repurposed and is not a data node anymore. @@ -589,7 +601,11 @@ private static SortedSet> buildCacheFileRanges(Document docume } static Path resolveCacheIndexFolder(NodeEnvironment.NodePath nodePath) { - return CacheService.resolveSnapshotCache(nodePath.path); + return resolveCacheIndexFolder(nodePath.path); + } + + static Path resolveCacheIndexFolder(Path dataPath) { + return CacheService.resolveSnapshotCache(dataPath); } /** diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java index af7e0a2b0d4d2..8a26358ecc8e5 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java @@ -59,7 +59,7 @@ public void testCacheIndexWriter() throws Exception { assertThat(Files.exists(snapshotCacheIndexDir), equalTo(iter > 0)); // load existing documents from persistent cache index before each iteration - final Map documents = PersistentCache.loadExistingDocuments(nodeEnvironment); + final Map documents = PersistentCache.loadDocuments(nodeEnvironment); assertThat(documents.size(), equalTo(liveDocs.size())); try (PersistentCache.CacheIndexWriter writer = createCacheIndexWriter(nodePath)) { From f6ecc3ab59afc84c91caab59da554efd3dfd1716 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 4 Dec 2020 17:09:16 +0100 Subject: [PATCH 08/20] add comment + IndexRemovalReason.NO_LONGER_ASSIGNED --- .../SearchableSnapshotIndexEventListener.java | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java index 8940bdc2e030e..e1b27aacb1a98 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java @@ -30,6 +30,13 @@ public class SearchableSnapshotIndexEventListener implements IndexEventListener private static final Logger logger = LogManager.getLogger(SearchableSnapshotIndexEventListener.class); + /** + * Called before a searchable snapshot {@link IndexShard} starts to recover. This event is used to trigger the loading of the shard + * snapshot information that contains the list of shard's Lucene files. + * + * @param indexShard the shard that is about to recover + * @param indexSettings the shard's index settings + */ @Override public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexSettings) { assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC); @@ -37,9 +44,18 @@ public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexS associateNewEmptyTranslogWithIndex(indexShard); } + /** + * Called before closing an {@link IndexService}. This event is used to toggle the "clear cache on closing" flag of the existing + * searchable snapshot directory instance. This way the directory will clean up its cache entries for us. + * + * @param indexService The index service + * @param reason the reason for index removal + */ @Override public void beforeIndexRemoved(IndexService indexService, IndexRemovalReason reason) { - if (reason == IndexRemovalReason.DELETED || reason == IndexRemovalReason.FAILURE) { + if (reason == IndexRemovalReason.DELETED + || reason == IndexRemovalReason.NO_LONGER_ASSIGNED + || reason == IndexRemovalReason.FAILURE) { if (SearchableSnapshotsConstants.isSearchableSnapshotStore(indexService.getIndexSettings().getSettings())) { for (IndexShard indexShard : indexService) { try { From 8c12a960c4402f01d99695efc35dfb652d9f6b44 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 11 Dec 2020 12:33:08 +0100 Subject: [PATCH 09/20] remove after merge leftovers --- .../SearchableSnapshotsIntegTests.java | 1 - ...ableSnapshotsPersistentCacheIntegTests.java | 18 ++---------------- .../store/SearchableSnapshotDirectory.java | 10 ---------- .../SearchableSnapshotIndexEventListener.java | 3 --- 4 files changed, 2 insertions(+), 30 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java index 194c9d31fbdc6..f12a7d87df671 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java @@ -92,7 +92,6 @@ public class SearchableSnapshotsIntegTests extends BaseSearchableSnapshotsIntegTestCase { - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/65725") // TODO under investigation public void testCreateAndRestoreSearchableSnapshot() throws Exception { final String fsRepoName = randomAlphaOfLength(10); final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/SearchableSnapshotsPersistentCacheIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/SearchableSnapshotsPersistentCacheIntegTests.java index 48a2e0ba9d84a..67810dc112196 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/SearchableSnapshotsPersistentCacheIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/SearchableSnapshotsPersistentCacheIntegTests.java @@ -7,11 +7,9 @@ package org.elasticsearch.xpack.searchablesnapshots.cache; import org.apache.lucene.document.Document; -import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -21,8 +19,6 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.test.InternalTestCluster; -import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction; -import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest; import org.elasticsearch.xpack.searchablesnapshots.BaseSearchableSnapshotsIntegTestCase; import java.io.IOException; @@ -74,18 +70,8 @@ public void testCacheSurviveRestart() throws Exception { final DiscoveryNodes discoveryNodes = client().admin().cluster().prepareState().clear().setNodes(true).get().getState().nodes(); final String dataNode = randomFrom(discoveryNodes.getDataNodes().values().toArray(DiscoveryNode.class)).getName(); - final MountSearchableSnapshotRequest req = new MountSearchableSnapshotRequest( - restoredIndexName, - fsRepoName, - snapshotName, - indexName, - Settings.builder().put(INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "._name", dataNode).build(), - Strings.EMPTY_ARRAY, - true - ); - - final RestoreSnapshotResponse restoreSnapshotResponse = client().execute(MountSearchableSnapshotAction.INSTANCE, req).get(); - assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0)); + mountSnapshot(fsRepoName, snapshotName, indexName, restoredIndexName, + Settings.builder().put(INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "._name", dataNode).build()); ensureGreen(restoredIndexName); final Index restoredIndex = client().admin() diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java index 74890fc008f91..fff39310b0214 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java @@ -126,7 +126,6 @@ public class SearchableSnapshotDirectory extends BaseDirectory { private final Path cacheDir; private final ShardPath shardPath; private final AtomicBoolean closed; - private final AtomicBoolean clearCacheOnClose; // volatile fields are updated once under `this` lock, all together, iff loaded is not true. private volatile BlobStoreIndexShardSnapshot snapshot; @@ -163,7 +162,6 @@ public SearchableSnapshotDirectory( this.cacheDir = Objects.requireNonNull(cacheDir); this.shardPath = Objects.requireNonNull(shardPath); this.closed = new AtomicBoolean(false); - this.clearCacheOnClose = new AtomicBoolean(false); this.useCache = SNAPSHOT_CACHE_ENABLED_SETTING.get(indexSettings); this.prewarmCache = useCache ? SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.get(indexSettings) : false; this.excludedFileTypes = new HashSet<>(SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING.get(indexSettings)); @@ -328,14 +326,6 @@ private static UnsupportedOperationException unsupportedException() { return new UnsupportedOperationException("Searchable snapshot directory does not support this operation"); } - /** - * Flag the current directory so that cache files associated to it will be evicted when the directory is closed. - */ - public void clearCacheOnClose() { - final boolean value = clearCacheOnClose.getAndSet(true); - assert value == false : "cache clearing is already set"; - } - @Override public final void close() { if (closed.compareAndSet(false, true)) { diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java index e792996cd3ca7..f3ede77a0c58d 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java @@ -5,8 +5,6 @@ */ package org.elasticsearch.xpack.searchablesnapshots; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -27,7 +25,6 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogException; import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; -import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.threadpool.ThreadPool; From 255853ddc231672e78eee7ec0b27e7aab365776a Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 11 Dec 2020 12:42:09 +0100 Subject: [PATCH 10/20] more spotless fixes --- .../SearchableSnapshotsPersistentCacheIntegTests.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/SearchableSnapshotsPersistentCacheIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/SearchableSnapshotsPersistentCacheIntegTests.java index 67810dc112196..34b657864fa81 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/SearchableSnapshotsPersistentCacheIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/SearchableSnapshotsPersistentCacheIntegTests.java @@ -70,8 +70,13 @@ public void testCacheSurviveRestart() throws Exception { final DiscoveryNodes discoveryNodes = client().admin().cluster().prepareState().clear().setNodes(true).get().getState().nodes(); final String dataNode = randomFrom(discoveryNodes.getDataNodes().values().toArray(DiscoveryNode.class)).getName(); - mountSnapshot(fsRepoName, snapshotName, indexName, restoredIndexName, - Settings.builder().put(INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "._name", dataNode).build()); + mountSnapshot( + fsRepoName, + snapshotName, + indexName, + restoredIndexName, + Settings.builder().put(INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "._name", dataNode).build() + ); ensureGreen(restoredIndexName); final Index restoredIndex = client().admin() From ac6050f206ff4e096736ef19a2812c33a365b9b7 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 14 Dec 2020 09:34:27 +0100 Subject: [PATCH 11/20] feedback --- .../cache/PersistentCache.java | 80 +++++++------------ 1 file changed, 31 insertions(+), 49 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java index 752e2cd6558da..c659d4ef53dc4 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java @@ -60,7 +60,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicBoolean; @@ -148,14 +147,36 @@ void loadCacheFiles(CacheService cacheService) { if (Files.isDirectory(shardCachePath)) { logger.trace("found snapshot cache dir at [{}], loading cache files from disk and index", shardCachePath); - Files.walkFileTree(shardCachePath, new CacheFileVisitor(cacheService, writer, documents)); + Files.walkFileTree(shardCachePath, new SimpleFileVisitor<>() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { + try { + final String id = buildId(file); + final Document cacheDocument = documents.get(id); + if (cacheDocument != null) { + logger.trace("indexing cache file with id [{}] in persistent cache index", id); + writer.updateCacheFile(id, cacheDocument); + + final CacheKey cacheKey = buildCacheKey(cacheDocument); + final long fileLength = getFileLength(cacheDocument); + final SortedSet> ranges = buildCacheFileRanges(cacheDocument); + + logger.trace("adding cache file with [id={}, cache key={}, ranges={}]", id, cacheKey, ranges); + cacheService.put(cacheKey, fileLength, file.getParent(), id, ranges); + } else { + logger.trace("deleting cache file [{}] (does not exist in persistent cache index)", file); + Files.delete(file); + } + } catch (Exception e) { + throw ExceptionsHelper.convertToRuntime(e); + } + return FileVisitResult.CONTINUE; + } + }); } } } } - for (CacheIndexWriter writer : writers) { - writer.prepareCommit(); - } for (CacheIndexWriter writer : writers) { writer.commit(); } @@ -228,8 +249,11 @@ public long getNumDocs() { @Override public void close() throws IOException { if (closed.compareAndSet(false, true)) { - IOUtils.close(writers); - documents.clear(); + try { + IOUtils.close(writers); + } finally { + documents.clear(); + } } } @@ -446,48 +470,6 @@ public String toString() { } } - /** - * {@link CacheFileVisitor} is used to visit cache files on disk and find information about them using the Lucene documents loaded - * at startup from the persistent cache index. If there are no corresponding document for a cache file, the cache file is deleted - * from disk. If a corresponding document is found, the cache file is added to the current persistent cache index and inserted in - * the searchable snapshots cache. - */ - private static class CacheFileVisitor extends SimpleFileVisitor { - - private final CacheService cacheService; - private final Map documents; - private final CacheIndexWriter writer; - - private CacheFileVisitor(CacheService cacheService, CacheIndexWriter writer, Map documents) { - this.cacheService = Objects.requireNonNull(cacheService); - this.documents = Objects.requireNonNull(documents); - this.writer = Objects.requireNonNull(writer); - } - - @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { - try { - final String id = buildId(file); - final Document cacheDocument = documents.get(id); - if (cacheDocument != null) { - logger.trace("indexing cache file with id [{}] in persistent cache index", id); - writer.updateCacheFile(id, cacheDocument); - - final CacheKey cacheKey = buildCacheKey(cacheDocument); - logger.trace("adding cache file with [id={}, cache key={}]", id, cacheKey); - final long fileLength = getFileLength(cacheDocument); - cacheService.put(cacheKey, fileLength, file.getParent(), id, buildCacheFileRanges(cacheDocument)); - } else { - logger.trace("deleting cache file [{}] (does not exist in persistent cache index)", file); - Files.delete(file); - } - } catch (Exception e) { - throw ExceptionsHelper.convertToRuntime(e); - } - return FileVisitResult.CONTINUE; - } - } - private static final String CACHE_ID_FIELD = "cache_id"; private static final String CACHE_PATH_FIELD = "cache_path"; private static final String CACHE_RANGES_FIELD = "cache_ranges"; From 51da63cad3d3f86343446fbbabb7627c51413027 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 14 Dec 2020 12:25:37 +0100 Subject: [PATCH 12/20] repopulateCache --- .../xpack/searchablesnapshots/cache/CacheService.java | 2 +- .../xpack/searchablesnapshots/cache/PersistentCache.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java index dfc060dcdd60b..8e100523f481c 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java @@ -163,7 +163,7 @@ static Path resolveSnapshotCache(Path path) { @Override protected void doStart() { - persistentCache.loadCacheFiles(this); + persistentCache.repopulateCache(this); cacheSyncTask.rescheduleIfNecessary(); } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java index c659d4ef53dc4..24ed4fd7134ae 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java @@ -133,7 +133,7 @@ public void removeCacheFile(CacheFile cacheFile) throws IOException { * * @param cacheService the {@link CacheService} to use when repopulating {@link CacheFile}. */ - void loadCacheFiles(CacheService cacheService) { + void repopulateCache(CacheService cacheService) { ensureOpen(); try { for (CacheIndexWriter writer : writers) { From 51755bd9d64793e4a00ebac5246ec06438180064 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 14 Dec 2020 12:35:50 +0100 Subject: [PATCH 13/20] started --- .../cache/PersistentCache.java | 116 ++++++++++-------- 1 file changed, 65 insertions(+), 51 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java index 24ed4fd7134ae..7cbfc661fbe0a 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java @@ -78,12 +78,14 @@ public class PersistentCache implements Closeable { private final NodeEnvironment nodeEnvironment; private final Map documents; private final List writers; + private final AtomicBoolean started; private final AtomicBoolean closed; public PersistentCache(NodeEnvironment nodeEnvironment) { this.documents = synchronizedMap(loadDocuments(nodeEnvironment)); this.writers = createWriters(nodeEnvironment); this.nodeEnvironment = nodeEnvironment; + this.started = new AtomicBoolean(); this.closed = new AtomicBoolean(); } @@ -93,6 +95,12 @@ private void ensureOpen() { } } + private void ensureStarted() { + if (started.get() == false) { + throw new IllegalStateException("Persistent cache is not started"); + } + } + /** * @return the {@link CacheIndexWriter} to use for the given {@link CacheFile} */ @@ -110,10 +118,12 @@ private CacheIndexWriter getWriter(CacheFile cacheFile) { } public void addCacheFile(CacheFile cacheFile, SortedSet> ranges) throws IOException { + ensureStarted(); getWriter(cacheFile).updateCacheFile(cacheFile, ranges); } public void removeCacheFile(CacheFile cacheFile) throws IOException { + ensureStarted(); getWriter(cacheFile).deleteCacheFile(cacheFile); } @@ -135,63 +145,67 @@ public void removeCacheFile(CacheFile cacheFile) throws IOException { */ void repopulateCache(CacheService cacheService) { ensureOpen(); - try { - for (CacheIndexWriter writer : writers) { - final NodeEnvironment.NodePath nodePath = writer.nodePath(); - logger.debug("loading persistent cache on data path [{}]", nodePath); - - for (String indexUUID : nodeEnvironment.availableIndexFoldersForPath(nodePath)) { - for (ShardId shardId : nodeEnvironment.findAllShardIds(new Index("_unknown_", indexUUID))) { - final Path shardDataPath = writer.nodePath().resolve(shardId); - final Path shardCachePath = getShardCachePath(new ShardPath(false, shardDataPath, shardDataPath, shardId)); - - if (Files.isDirectory(shardCachePath)) { - logger.trace("found snapshot cache dir at [{}], loading cache files from disk and index", shardCachePath); - Files.walkFileTree(shardCachePath, new SimpleFileVisitor<>() { - @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { - try { - final String id = buildId(file); - final Document cacheDocument = documents.get(id); - if (cacheDocument != null) { - logger.trace("indexing cache file with id [{}] in persistent cache index", id); - writer.updateCacheFile(id, cacheDocument); - - final CacheKey cacheKey = buildCacheKey(cacheDocument); - final long fileLength = getFileLength(cacheDocument); - final SortedSet> ranges = buildCacheFileRanges(cacheDocument); - - logger.trace("adding cache file with [id={}, cache key={}, ranges={}]", id, cacheKey, ranges); - cacheService.put(cacheKey, fileLength, file.getParent(), id, ranges); - } else { - logger.trace("deleting cache file [{}] (does not exist in persistent cache index)", file); - Files.delete(file); + if (started.compareAndSet(false, true)) { + try { + for (CacheIndexWriter writer : writers) { + final NodeEnvironment.NodePath nodePath = writer.nodePath(); + logger.debug("loading persistent cache on data path [{}]", nodePath); + + for (String indexUUID : nodeEnvironment.availableIndexFoldersForPath(nodePath)) { + for (ShardId shardId : nodeEnvironment.findAllShardIds(new Index("_unknown_", indexUUID))) { + final Path shardDataPath = writer.nodePath().resolve(shardId); + final Path shardCachePath = getShardCachePath(new ShardPath(false, shardDataPath, shardDataPath, shardId)); + + if (Files.isDirectory(shardCachePath)) { + logger.trace("found snapshot cache dir at [{}], loading cache files from disk and index", shardCachePath); + Files.walkFileTree(shardCachePath, new SimpleFileVisitor<>() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { + try { + final String id = buildId(file); + final Document cacheDocument = documents.get(id); + if (cacheDocument != null) { + logger.trace("indexing cache file with id [{}] in persistent cache index", id); + writer.updateCacheFile(id, cacheDocument); + + final CacheKey cacheKey = buildCacheKey(cacheDocument); + final long fileLength = getFileLength(cacheDocument); + final SortedSet> ranges = buildCacheFileRanges(cacheDocument); + + logger.trace("adding cache file with [id={}, key={}, ranges={}]", id, cacheKey, ranges); + cacheService.put(cacheKey, fileLength, file.getParent(), id, ranges); + } else { + logger.trace("deleting cache file [{}] (does not exist in persistent cache index)", file); + Files.delete(file); + } + } catch (Exception e) { + throw ExceptionsHelper.convertToRuntime(e); } - } catch (Exception e) { - throw ExceptionsHelper.convertToRuntime(e); + return FileVisitResult.CONTINUE; } - return FileVisitResult.CONTINUE; - } - }); + }); + } } } } + for (CacheIndexWriter writer : writers) { + writer.commit(); + } + logger.info("persistent cache index loaded"); + documents.clear(); + } catch (IOException e) { + try { + close(); + } catch (Exception e2) { + logger.warn("failed to close persistent cache index", e2); + e.addSuppressed(e2); + } + throw new UncheckedIOException("Failed to load persistent cache", e); + } finally { + closeIfAnyIndexWriterHasTragedyOrIsClosed(); } - for (CacheIndexWriter writer : writers) { - writer.commit(); - } - logger.info("persistent cache index loaded"); - documents.clear(); - } catch (IOException e) { - try { - close(); - } catch (Exception e2) { - logger.warn("failed to close persistent cache index", e2); - e.addSuppressed(e2); - } - throw new UncheckedIOException("Failed to load persistent cache", e); - } finally { - closeIfAnyIndexWriterHasTragedyOrIsClosed(); + } else { + assert false : "persistent cache is already loaded"; } } From 14c680efd0072f61d8f3e23d4efcdcf1432ee5c1 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 14 Dec 2020 12:37:40 +0100 Subject: [PATCH 14/20] simpler updateCacheFile --- .../xpack/searchablesnapshots/cache/PersistentCache.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java index 7cbfc661fbe0a..b656a2b40979a 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCache.java @@ -438,9 +438,7 @@ NodeEnvironment.NodePath nodePath() { } void updateCacheFile(CacheFile cacheFile, SortedSet> cacheRanges) throws IOException { - final Term term = buildTerm(cacheFile); - logger.debug("updating document with term [{}]", term); - indexWriter.updateDocument(term, buildDocument(nodePath, cacheFile, cacheRanges)); + updateCacheFile(buildId(cacheFile), buildDocument(nodePath, cacheFile, cacheRanges)); } void updateCacheFile(String cacheFileId, Document cacheFileDocument) throws IOException { From 589cfdaf03ad33fde1dc0938d4a90462eefff2e0 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 14 Dec 2020 12:39:29 +0100 Subject: [PATCH 15/20] unused --- .../AbstractSearchableSnapshotsTestCase.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java index 65b288e98c9d8..d9ee98343a9f6 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java @@ -62,7 +62,6 @@ public abstract class AbstractSearchableSnapshotsTestCase extends ESIndexInputTe protected ThreadPool threadPool; protected ClusterService clusterService; protected NodeEnvironment nodeEnvironment; - protected PersistentCache persistentCache; @Before public void setUpTest() throws Exception { @@ -80,7 +79,7 @@ public void setUpTest() throws Exception { @After public void tearDownTest() throws Exception { - IOUtils.close(persistentCache, nodeEnvironment, clusterService); + IOUtils.close(nodeEnvironment, clusterService); assertTrue(ThreadPool.terminate(threadPool, 30L, TimeUnit.SECONDS)); } From 5aa545ad40686ab0666ea206e59137f88cedfa03 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 14 Dec 2020 12:41:01 +0100 Subject: [PATCH 16/20] assert IOE --- .../xpack/searchablesnapshots/cache/CacheService.java | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java index 8e100523f481c..f6a38f6b7e354 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java @@ -411,6 +411,7 @@ private void onCacheFileRemoval(CacheFile cacheFile) { try { persistentCache.removeCacheFile(cacheFile); } catch (Exception e) { + assert e instanceof IOException : e; logger.warn("failed to remove cache file from persistent cache", e); } } From 2940d20433ddf0d9fcfbe7111d2f97112438872a Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 14 Dec 2020 12:42:48 +0100 Subject: [PATCH 17/20] local var buffer --- .../searchablesnapshots/cache/PersistentCacheTests.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java index 8a26358ecc8e5..9eedfd1f68a82 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java @@ -132,17 +132,14 @@ public void testCacheIndexWriter() throws Exception { } } - private static final byte[] buffer; - static { - buffer = new byte[1024]; - Arrays.fill(buffer, (byte) 0xff); - } - public void testCleanUp() throws Exception { final List cacheFiles = new ArrayList<>(); try (CacheService cacheService = defaultCacheService()) { cacheService.start(); + final byte[] buffer = new byte[1024]; + Arrays.fill(buffer, (byte) 0xff); + for (int snapshots = 0; snapshots < between(1, 2); snapshots++) { SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())); for (int indices = 0; indices < between(1, 2); indices++) { From eab0a0caf48c8996e3f17438e5621a5bac2c9ccd Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 14 Dec 2020 13:23:33 +0100 Subject: [PATCH 18/20] should persist --- .../searchablesnapshots/cache/CacheService.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java index f6a38f6b7e354..e18f53b77b042 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java @@ -473,17 +473,23 @@ protected void synchronizeCache() { ranges.size() ); final Path cacheDir = cacheFilePath.toAbsolutePath().getParent(); - if (cacheDirs.add(cacheDir)) { + boolean shouldPersist = cacheDirs.contains(cacheDir); + if (shouldPersist == false) { try { IOUtils.fsync(cacheDir, true, false); // TODO evict cache file if fsync failed logger.trace("cache directory [{}] synchronized", cacheDir); + cacheDirs.add(cacheDir); + shouldPersist = true; } catch (Exception e) { assert e instanceof IOException : e; + shouldPersist = false; logger.warn(() -> new ParameterizedMessage("failed to synchronize cache directory [{}]", cacheDir), e); } } - persistentCache.addCacheFile(cacheFile, ranges); - count += 1L; + if (shouldPersist) { + persistentCache.addCacheFile(cacheFile, ranges); + count += 1L; + } } } catch (Exception e) { assert e instanceof IOException : e; From 9f041f6f0c9458fb6a8e753cc8176012fd09ba12 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 14 Dec 2020 13:50:52 +0100 Subject: [PATCH 19/20] Fix tests --- .../searchablesnapshots/cache/CacheService.java | 2 +- .../SearchableSnapshotDirectoryStatsTests.java | 13 ++++--------- .../store/SearchableSnapshotDirectoryTests.java | 10 +++------- .../elasticsearch/index/store/cache/TestUtils.java | 2 +- .../cache/CacheServiceTests.java | 12 +++++++++--- 5 files changed, 18 insertions(+), 21 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java index e18f53b77b042..a34e67fffb5fe 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java @@ -157,7 +157,7 @@ public static Path getShardCachePath(ShardPath shardPath) { return resolveSnapshotCache(shardPath.getDataPath()); } - static Path resolveSnapshotCache(Path path) { + public static Path resolveSnapshotCache(Path path) { return path.resolve("snapshot_cache"); } diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java index 1d45ccd3f0d0d..0e809a5fc3ac4 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryStatsTests.java @@ -22,7 +22,6 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; @@ -38,8 +37,8 @@ import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService; import java.io.IOException; -import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.nio.file.Path; import java.util.List; import java.util.concurrent.atomic.AtomicLong; @@ -51,6 +50,7 @@ import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.toIntBytes; +import static org.elasticsearch.xpack.searchablesnapshots.cache.CacheService.resolveSnapshotCache; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -603,14 +603,9 @@ private void executeTestCase( final StoreFileMetadata metadata = new StoreFileMetadata(fileName, fileContent.length, "_checksum", Version.CURRENT.luceneVersion); final List files = List.of(new FileInfo(blobName, metadata, new ByteSizeValue(fileContent.length))); final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(), 0L, files, 0L, 0L, 0, 0L); - final Path shardDir; - try { - shardDir = new NodeEnvironment.NodePath(createTempDir()).resolve(shardId); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + final Path shardDir = randomShardPath(shardId); final ShardPath shardPath = new ShardPath(false, shardDir, shardDir, shardId); - final Path cacheDir = createTempDir(); + final Path cacheDir = Files.createDirectories(resolveSnapshotCache(shardDir).resolve(snapshotId.getUUID())); try ( CacheService ignored = cacheService; diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java index 462f741edf859..7e2df834b66f3 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java @@ -125,6 +125,7 @@ import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.toIntBytes; +import static org.elasticsearch.xpack.searchablesnapshots.cache.CacheService.resolveSnapshotCache; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -684,14 +685,9 @@ public void testClearCache() throws Exception { final IndexId indexId = new IndexId("_id", "_uuid"); final ShardId shardId = new ShardId(new Index("_name", "_id"), 0); - final Path shardDir; - try { - shardDir = new NodeEnvironment.NodePath(createTempDir()).resolve(shardId); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + final Path shardDir = randomShardPath(shardId); final ShardPath shardPath = new ShardPath(false, shardDir, shardDir, shardId); - final Path cacheDir = createTempDir(); + final Path cacheDir = Files.createDirectories(resolveSnapshotCache(shardDir).resolve(snapshotId.getUUID())); try ( SearchableSnapshotDirectory directory = new SearchableSnapshotDirectory( () -> blobContainer, diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java index 922cd5646806f..28e887516a886 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java @@ -348,7 +348,7 @@ public Integer getNumberOfFSyncs(Path path) { @Override public FileChannel newFileChannel(Path path, Set options, FileAttribute... attrs) throws IOException { final AtomicInteger counter = files.computeIfAbsent(path, p -> new AtomicInteger(0)); - return new FilterFileChannel(delegate.newFileChannel(toDelegate(path), options, attrs)) { + return new FilterFileChannel(super.newFileChannel(path, options, attrs)) { @Override public void force(boolean metaData) throws IOException { diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java index 69099ee0abade..a5431f2fc44ed 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheServiceTests.java @@ -38,6 +38,7 @@ import static java.util.Collections.emptySortedSet; import static org.elasticsearch.index.store.cache.TestUtils.randomPopulateAndReads; import static org.elasticsearch.index.store.cache.TestUtils.randomRanges; +import static org.elasticsearch.xpack.searchablesnapshots.cache.CacheService.resolveSnapshotCache; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -58,6 +59,7 @@ public static void installFileSystem() { @AfterClass public static void removeFileSystem() { fileSystemProvider.tearDown(); + fileSystemProvider = null; } public void testCacheSynchronization() throws Exception { @@ -74,7 +76,7 @@ public void testCacheSynchronization() throws Exception { assertFalse(Files.exists(shardDataPath)); logger.debug("--> creating directories [{}] for shard [{}]", shardDataPath.toAbsolutePath(), i); - shardsCacheDirs[i] = Files.createDirectories(CacheService.resolveSnapshotCache(shardDataPath).resolve(snapshotId.getUUID())); + shardsCacheDirs[i] = Files.createDirectories(resolveSnapshotCache(shardDataPath).resolve(snapshotId.getUUID())); } try (CacheService cacheService = defaultCacheService()) { @@ -175,7 +177,6 @@ public void testCacheSynchronization() throws Exception { } public void testPut() throws Exception { - final Path cacheDir = createTempDir(); try (CacheService cacheService = defaultCacheService()) { final long fileLength = randomLongBetween(0L, 1000L); final CacheKey cacheKey = new CacheKey( @@ -184,6 +185,10 @@ public void testPut() throws Exception { new ShardId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random()), randomInt(5)), randomAlphaOfLength(105).toLowerCase(Locale.ROOT) ); + + final Path cacheDir = Files.createDirectories( + resolveSnapshotCache(randomShardPath(cacheKey.getShardId())).resolve(cacheKey.getSnapshotId().getUUID()) + ); final String cacheFileUuid = UUIDs.randomBase64UUID(random()); final SortedSet> cacheFileRanges = randomBoolean() ? randomRanges(fileLength) : emptySortedSet(); @@ -218,6 +223,7 @@ public void testRunIfShardMarkedAsEvictedInCache() throws Exception { final SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())); final IndexId indexId = new IndexId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())); final ShardId shardId = new ShardId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random()), 0); + final Path cacheDir = Files.createDirectories(resolveSnapshotCache(randomShardPath(shardId)).resolve(snapshotId.getUUID())); final CacheService cacheService = defaultCacheService(); cacheService.setCacheSyncInterval(TimeValue.ZERO); @@ -234,7 +240,7 @@ public void testRunIfShardMarkedAsEvictedInCache() throws Exception { final PlainActionFuture waitForEviction = PlainActionFuture.newFuture(); final CacheFile.EvictionListener evictionListener = evicted -> waitForEviction.onResponse(null); - final CacheFile cacheFile = cacheService.get(new CacheKey(snapshotId, indexId, shardId, "_0.dvd"), 100, createTempDir()); + final CacheFile cacheFile = cacheService.get(new CacheKey(snapshotId, indexId, shardId, "_0.dvd"), 100, cacheDir); cacheFile.acquire(evictionListener); cacheService.markShardAsEvictedInCache(snapshotId, indexId, shardId); From 5ca99d266052ec7cb118334feac5452e866c53f4 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 14 Dec 2020 14:27:21 +0100 Subject: [PATCH 20/20] test --- .../index/store/cache/CacheFile.java | 5 + .../index/store/cache/TestUtils.java | 7 + .../cache/PersistentCacheTests.java | 127 ++++++++++++------ 3 files changed, 100 insertions(+), 39 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java index c1dd02cb505c8..5793f4d33c5b3 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java @@ -158,6 +158,11 @@ FileChannel getChannel() { return reference == null ? null : reference.fileChannel; } + // Only used in tests + SortedSet> getCompletedRanges() { + return tracker.getCompletedRanges(); + } + public void acquire(final EvictionListener listener) throws IOException { assert listener != null; diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java index 28e887516a886..db4d9f1ba88fa 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java @@ -148,6 +148,13 @@ public static SortedSet> mergeContiguousRanges(final SortedSet }); } + public static void assertCacheFileEquals(CacheFile expected, CacheFile actual) { + assertThat(actual.getLength(), equalTo(expected.getLength())); + assertThat(actual.getFile(), equalTo(expected.getFile())); + assertThat(actual.getCacheKey(), equalTo(expected.getCacheKey())); + assertThat(actual.getCompletedRanges(), equalTo(expected.getCompletedRanges())); + } + public static void assertCounter(IndexInputStats.Counter counter, long total, long count, long min, long max) { assertThat(counter.total(), equalTo(total)); assertThat(counter.count(), equalTo(count)); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java index 9eedfd1f68a82..378d9f9bb83ec 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/PersistentCacheTests.java @@ -10,8 +10,11 @@ import org.apache.lucene.document.Field; import org.apache.lucene.document.StringField; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.cache.CacheFile; @@ -32,14 +35,18 @@ import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.SortedSet; +import java.util.stream.Collectors; import static org.elasticsearch.cluster.node.DiscoveryNodeRole.BUILT_IN_ROLES; import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_ROLE; +import static org.elasticsearch.index.store.cache.TestUtils.assertCacheFileEquals; import static org.elasticsearch.index.store.cache.TestUtils.randomPopulateAndReads; import static org.elasticsearch.node.NodeRoleSettings.NODE_ROLES_SETTING; import static org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache.createCacheIndexWriter; import static org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache.resolveCacheIndexFolder; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; @@ -132,48 +139,44 @@ public void testCacheIndexWriter() throws Exception { } } + public void testRepopulateCache() throws Exception { + final CacheService cacheService = defaultCacheService(); + cacheService.setCacheSyncInterval(TimeValue.ZERO); + cacheService.start(); + + final List cacheFiles = generateRandomCacheFiles(cacheService); + cacheService.synchronizeCache(); + + if (cacheFiles.isEmpty() == false) { + final List removedCacheFiles = randomSubsetOf(cacheFiles); + for (CacheFile removedCacheFile : removedCacheFiles) { + if (randomBoolean()) { + // evict cache file from the cache + cacheService.removeFromCache(removedCacheFile.getCacheKey()); + } else { + IOUtils.rm(removedCacheFile.getFile()); + } + cacheFiles.remove(removedCacheFile); + } + } + cacheService.stop(); + + final CacheService newCacheService = defaultCacheService(); + newCacheService.start(); + for (CacheFile cacheFile : cacheFiles) { + CacheFile newCacheFile = newCacheService.get(cacheFile.getCacheKey(), cacheFile.getLength(), cacheFile.getFile().getParent()); + assertThat(newCacheFile, notNullValue()); + assertThat(newCacheFile, not(sameInstance(cacheFile))); + assertCacheFileEquals(newCacheFile, cacheFile); + } + newCacheService.stop(); + } + public void testCleanUp() throws Exception { - final List cacheFiles = new ArrayList<>(); + final List cacheFiles; try (CacheService cacheService = defaultCacheService()) { cacheService.start(); - - final byte[] buffer = new byte[1024]; - Arrays.fill(buffer, (byte) 0xff); - - for (int snapshots = 0; snapshots < between(1, 2); snapshots++) { - SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())); - for (int indices = 0; indices < between(1, 2); indices++) { - IndexId indexId = new IndexId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())); - for (int shards = 0; shards < between(1, 2); shards++) { - ShardId shardId = new ShardId(indexId.getName(), indexId.getId(), shards); - - final Path cacheDir = Files.createDirectories( - CacheService.resolveSnapshotCache(randomShardPath(shardId)).resolve(snapshotId.getUUID()) - ); - cacheFiles.add(cacheDir); - - for (int files = 0; files < between(1, 2); files++) { - final CacheKey cacheKey = new CacheKey(snapshotId, indexId, shardId, "file_" + files); - final CacheFile cacheFile = cacheService.get(cacheKey, randomLongBetween(0L, buffer.length), cacheDir); - - final CacheFile.EvictionListener listener = evictedCacheFile -> {}; - cacheFile.acquire(listener); - try { - randomPopulateAndReads(cacheFile, (channel, from, to) -> { - try { - channel.write(ByteBuffer.wrap(buffer, Math.toIntExact(from), Math.toIntExact(to))); - } catch (IOException e) { - throw new AssertionError(e); - } - }); - cacheFiles.add(cacheFile.getFile()); - } finally { - cacheFile.release(listener); - } - } - } - } - } + cacheFiles = generateRandomCacheFiles(cacheService).stream().map(CacheFile::getFile).collect(Collectors.toList()); if (randomBoolean()) { cacheService.synchronizeCache(); } @@ -187,4 +190,50 @@ public void testCleanUp() throws Exception { PersistentCache.cleanUp(nodeSettings, nodeEnvironment); assertTrue(cacheFiles.stream().noneMatch(Files::exists)); } + + /** + * Generates 1 or more cache files using the specified {@link CacheService}. + */ + private List generateRandomCacheFiles(CacheService cacheService) throws Exception { + final byte[] buffer = new byte[1024]; + Arrays.fill(buffer, (byte) 0xff); + + final List cacheFiles = new ArrayList<>(); + for (int snapshots = 0; snapshots < between(1, 2); snapshots++) { + SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())); + for (int indices = 0; indices < between(1, 2); indices++) { + IndexId indexId = new IndexId(randomAlphaOfLength(5).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())); + for (int shards = 0; shards < between(1, 2); shards++) { + ShardId shardId = new ShardId(indexId.getName(), indexId.getId(), shards); + + final Path cacheDir = Files.createDirectories( + CacheService.resolveSnapshotCache(randomShardPath(shardId)).resolve(snapshotId.getUUID()) + ); + + for (int files = 0; files < between(1, 2); files++) { + final CacheKey cacheKey = new CacheKey(snapshotId, indexId, shardId, "file_" + files); + final CacheFile cacheFile = cacheService.get(cacheKey, randomLongBetween(0L, buffer.length), cacheDir); + + final CacheFile.EvictionListener listener = evictedCacheFile -> {}; + cacheFile.acquire(listener); + try { + SortedSet> ranges = randomPopulateAndReads(cacheFile, (channel, from, to) -> { + try { + channel.write(ByteBuffer.wrap(buffer, Math.toIntExact(from), Math.toIntExact(to))); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + if (ranges.isEmpty() == false) { + cacheFiles.add(cacheFile); + } + } finally { + cacheFile.release(listener); + } + } + } + } + } + return cacheFiles; + } }