Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,88 +4,41 @@
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.searchablesnapshots;
package org.elasticsearch.xpack.searchablesnapshots.cache;

import org.apache.lucene.mockfile.FilterFileSystemProvider;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.apache.lucene.document.Document;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.io.PathUtilsForTesting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.elasticsearch.xpack.searchablesnapshots.BaseSearchableSnapshotsIntegTestCase;

import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.FileSystem;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_PREFIX;
import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache.resolveCacheIndexFolder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.notNullValue;

public class SearchableSnapshotsCacheClearingIntegTests extends BaseSearchableSnapshotsIntegTestCase {

private static DeleteBlockingFileSystemProvider deleteBlockingFileSystemProvider;

@BeforeClass
public static void installDeleteBlockingFileSystemProvider() {
FileSystem current = PathUtils.getDefaultFileSystem();
deleteBlockingFileSystemProvider = new DeleteBlockingFileSystemProvider(current);
PathUtilsForTesting.installMock(deleteBlockingFileSystemProvider.getFileSystem(null));
}

@AfterClass
public static void removeDeleteBlockingFileSystemProvider() {
PathUtilsForTesting.teardown();
}

void startBlockingDeletes() {
deleteBlockingFileSystemProvider.injectFailures.set(true);
}

void stopBlockingDeletes() {
deleteBlockingFileSystemProvider.injectFailures.set(false);
}

private static class DeleteBlockingFileSystemProvider extends FilterFileSystemProvider {

AtomicBoolean injectFailures = new AtomicBoolean();

DeleteBlockingFileSystemProvider(FileSystem inner) {
super("deleteblocking://", inner);
}

@Override
public boolean deleteIfExists(Path path) throws IOException {
if (injectFailures.get()) {
throw new IOException("blocked deletion of " + path);
} else {
return super.deleteIfExists(path);
}
}

}
public class SearchableSnapshotsPersistentCacheIntegTests extends BaseSearchableSnapshotsIntegTestCase {

@Override
protected Settings nodeSettings(int nodeOrdinal) {
Expand All @@ -96,7 +49,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
.build();
}

public void testCacheDirectoriesRemovedOnStartup() throws Exception {
public void testCacheSurviveRestart() throws Exception {
final String fsRepoName = randomAlphaOfLength(10);
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final String restoredIndexName = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
Expand All @@ -117,18 +70,13 @@ public void testCacheDirectoriesRemovedOnStartup() throws Exception {
final DiscoveryNodes discoveryNodes = client().admin().cluster().prepareState().clear().setNodes(true).get().getState().nodes();
final String dataNode = randomFrom(discoveryNodes.getDataNodes().values().toArray(DiscoveryNode.class)).getName();

final MountSearchableSnapshotRequest req = new MountSearchableSnapshotRequest(
restoredIndexName,
mountSnapshot(
fsRepoName,
snapshotName,
indexName,
Settings.builder().put(INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "._name", dataNode).build(),
Strings.EMPTY_ARRAY,
true
restoredIndexName,
Settings.builder().put(INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "._name", dataNode).build()
);

final RestoreSnapshotResponse restoreSnapshotResponse = client().execute(MountSearchableSnapshotAction.INSTANCE, req).get();
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));
ensureGreen(restoredIndexName);

final Index restoredIndex = client().admin()
Expand All @@ -143,7 +91,9 @@ public void testCacheDirectoriesRemovedOnStartup() throws Exception {
.getIndex();

final IndexService indexService = internalCluster().getInstance(IndicesService.class, dataNode).indexService(restoredIndex);
final Path shardCachePath = CacheService.getShardCachePath(indexService.getShard(0).shardPath());
final ShardPath shardPath = indexService.getShard(0).shardPath();
final Path shardCachePath = CacheService.getShardCachePath(shardPath);

assertTrue(Files.isDirectory(shardCachePath));
final Set<Path> cacheFiles = new HashSet<>();
try (DirectoryStream<Path> snapshotCacheStream = Files.newDirectoryStream(shardCachePath)) {
Expand All @@ -159,25 +109,49 @@ public void testCacheDirectoriesRemovedOnStartup() throws Exception {
}
assertFalse("no cache files found", cacheFiles.isEmpty());

startBlockingDeletes();
CacheService cacheService = internalCluster().getInstance(CacheService.class, dataNode);
cacheService.synchronizeCache();

PersistentCache persistentCache = cacheService.getPersistentCache();
assertThat(persistentCache.getNumDocs(), equalTo((long) cacheFiles.size()));

internalCluster().restartNode(dataNode, new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) {
assertTrue(Files.isDirectory(shardCachePath));
for (Path cacheFile : cacheFiles) {
assertTrue(cacheFile + " should not have been cleaned up yet", Files.isRegularFile(cacheFile));
try {
assertTrue(Files.isDirectory(shardCachePath));

final Path persistentCacheIndexDir = resolveCacheIndexFolder(shardPath.getRootDataPath());
assertTrue(Files.isDirectory(persistentCacheIndexDir));

final Map<String, Document> documents = PersistentCache.loadDocuments(persistentCacheIndexDir);
assertThat(documents.size(), equalTo(cacheFiles.size()));

for (Path cacheFile : cacheFiles) {
final String cacheFileName = cacheFile.getFileName().toString();
assertTrue(cacheFileName + " should exist on disk", Files.isRegularFile(cacheFile));
assertThat(cacheFileName + " should exist in persistent cache index", documents.get(cacheFileName), notNullValue());
}
} catch (IOException e) {
throw new AssertionError(e);
}
stopBlockingDeletes();
return Settings.EMPTY;
}
});

persistentCache = internalCluster().getInstance(CacheService.class, dataNode).getPersistentCache();
assertThat(persistentCache.getNumDocs(), equalTo((long) cacheFiles.size()));
ensureGreen(restoredIndexName);

for (Path cacheFile : cacheFiles) {
assertFalse(cacheFile + " should have been cleaned up", Files.exists(cacheFile));
}
cacheFiles.forEach(cacheFile -> assertTrue(cacheFile + " should have survived node restart", Files.exists(cacheFile)));

assertAcked(client().admin().indices().prepareDelete(restoredIndexName));

assertBusy(() -> cacheFiles.forEach(cacheFile -> assertFalse(cacheFile + " should have been cleaned up", Files.exists(cacheFile))));
cacheService = internalCluster().getInstance(CacheService.class, dataNode);
cacheService.synchronizeCache();

persistentCache = cacheService.getPersistentCache();
assertThat(persistentCache.getNumDocs(), equalTo(0L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ FileChannel getChannel() {
return reference == null ? null : reference.fileChannel;
}

// Only used in tests
SortedSet<Tuple<Long, Long>> getCompletedRanges() {
return tracker.getCompletedRanges();
}

public void acquire(final EvictionListener listener) throws IOException {
assert listener != null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import java.nio.file.Path;

import static org.elasticsearch.index.store.SearchableSnapshotDirectory.unwrapDirectory;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
Expand All @@ -49,6 +50,13 @@ public SearchableSnapshotIndexEventListener(Settings settings, @Nullable CacheSe
this.cacheService = cacheService;
}

/**
* Called before a searchable snapshot {@link IndexShard} starts to recover. This event is used to trigger the loading of the shard
* snapshot information that contains the list of shard's Lucene files.
*
* @param indexShard the shard that is about to recover
* @param indexSettings the shard's index settings
*/
@Override
public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexSettings) {
assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC);
Expand All @@ -57,7 +65,7 @@ public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexS
}

private static void ensureSnapshotIsLoaded(IndexShard indexShard) {
final SearchableSnapshotDirectory directory = SearchableSnapshotDirectory.unwrapDirectory(indexShard.store().directory());
final SearchableSnapshotDirectory directory = unwrapDirectory(indexShard.store().directory());
assert directory != null;
final StepListener<Void> preWarmListener = new StepListener<>();
final boolean success = directory.loadSnapshot(indexShard.recoveryState(), preWarmListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
import org.elasticsearch.xpack.searchablesnapshots.action.TransportMountSearchableSnapshotAction;
import org.elasticsearch.xpack.searchablesnapshots.action.TransportSearchableSnapshotsStatsAction;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
import org.elasticsearch.xpack.searchablesnapshots.cache.NodeEnvironmentCacheCleaner;
import org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache;
import org.elasticsearch.xpack.searchablesnapshots.rest.RestClearSearchableSnapshotsCacheAction;
import org.elasticsearch.xpack.searchablesnapshots.rest.RestMountSearchableSnapshotAction;
import org.elasticsearch.xpack.searchablesnapshots.rest.RestSearchableSnapshotsStatsAction;
Expand Down Expand Up @@ -210,12 +210,7 @@ public Collection<Object> createComponents(
this.threadPool.set(threadPool);
this.failShardsListener.set(new FailShardsOnInvalidLicenseClusterListener(getLicenseState(), clusterService.getRerouteService()));
if (DiscoveryNode.isDataNode(settings)) {
final CacheService cacheService = new CacheService(
settings,
clusterService,
threadPool,
new NodeEnvironmentCacheCleaner(nodeEnvironment)
);
final CacheService cacheService = new CacheService(settings, clusterService, threadPool, new PersistentCache(nodeEnvironment));
this.cacheService.set(cacheService);
components.add(cacheService);
final BlobStoreCacheService blobStoreCacheService = new BlobStoreCacheService(
Expand All @@ -226,6 +221,8 @@ public Collection<Object> createComponents(
);
this.blobStoreCacheService.set(blobStoreCacheService);
components.add(blobStoreCacheService);
} else {
PersistentCache.cleanUp(settings, nodeEnvironment);
}
return Collections.unmodifiableList(components);
}
Expand Down
Loading