Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/plugin/searchable-snapshots/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies {
compileOnly project(path: xpackModule('core'))
implementation project(path: 'preallocate')
internalClusterTestImplementation(testArtifact(project(xpackModule('core'))))
internalClusterTestImplementation(project(path: ':modules:reindex'))
}

addQaCheckDependencies()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.searchablesnapshots.cache.blob;

import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
Expand All @@ -26,13 +27,16 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.reindex.ReindexPlugin;
import org.elasticsearch.snapshots.SearchableSnapshotsSettings;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.InternalTestCluster;
Expand All @@ -59,6 +63,7 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_BLOB_CACHE_INDEX;
import static org.elasticsearch.xpack.searchablesnapshots.cache.shared.SharedBytes.pageAligned;
import static org.elasticsearch.xpack.searchablesnapshots.store.SearchableSnapshotDirectory.unwrapDirectory;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

Expand Down Expand Up @@ -93,7 +98,10 @@ public static void tearDownCacheSettings() {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), WaitForSnapshotBlobCacheShardsActivePlugin.class);
final List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: create plugins in declarative style without a variable

Stream.concat(super.nodePlugins().stream(),
            Stream.of(WaitForSnapshotBlobCacheShardsActivePlugin.class, ReindexPlugin.class))
            .collect(Collectors.toList())

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using concatenation of streams here seems a bit overkill and less readable to me 😁 Anyway if I'm the only one to prefer the old fashion way I'll turn this into streams, otherwise I prefer to keep it like this.

plugins.add(WaitForSnapshotBlobCacheShardsActivePlugin.class);
plugins.add(ReindexPlugin.class);
return plugins;
}

@Override
Expand Down Expand Up @@ -159,7 +167,7 @@ public void testBlobStoreCache() throws Exception {
storage1,
blobCacheMaxLength.getStringRep()
);
final String restoredIndex = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final String restoredIndex = "restored-" + randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
mountSnapshot(
repositoryName,
snapshot.getName(),
Expand All @@ -174,17 +182,9 @@ public void testBlobStoreCache() throws Exception {
);
ensureGreen(restoredIndex);

// wait for all async cache fills to complete
assertBusy(() -> {
for (final SearchableSnapshotShardStats shardStats : client().execute(
SearchableSnapshotsStatsAction.INSTANCE,
new SearchableSnapshotsStatsRequest()
).actionGet().getStats()) {
for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) {
assertThat(Strings.toString(indexInputStats), indexInputStats.getCurrentIndexCacheFills(), equalTo(0L));
}
}
});
assertRecoveryStats(restoredIndex, false);
assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);
waitForBlobCacheFillsToComplete();

for (final SearchableSnapshotShardStats shardStats : client().execute(
SearchableSnapshotsStatsAction.INSTANCE,
Expand All @@ -209,11 +209,14 @@ public void testBlobStoreCache() throws Exception {
equalTo("data_content,data_hot")
);

refreshSystemIndex();

final long numberOfCachedBlobs = systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX)
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)
.get()
.getHits()
.getTotalHits().value;

IndexingStats indexingStats = systemClient().admin()
.indices()
.prepareStats(SNAPSHOT_BLOB_CACHE_INDEX)
Expand All @@ -227,19 +230,24 @@ public void testBlobStoreCache() throws Exception {

logger.info("--> verifying number of documents in index [{}]", restoredIndex);
assertHitCount(client().prepareSearch(restoredIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs);
assertAcked(client().admin().indices().prepareDelete(restoredIndex));

assertBusy(() -> {
refreshSystemIndex();
assertThat(
systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get().getHits().getTotalHits().value,
greaterThan(0L)
);
});
for (IndicesService indicesService : internalCluster().getDataNodeInstances(IndicesService.class)) {
for (IndexService indexService : indicesService) {
if (indexService.index().getName().equals(restoredIndex)) {
for (IndexShard indexShard : indexService) {
try {
unwrapDirectory(indexShard.store().directory()).clearStats();
} catch (AlreadyClosedException ignore) {
// ok to ignore these
}
}
}
}
}

final Storage storage2 = randomFrom(Storage.values());
logger.info("--> mount snapshot [{}] as an index for the second time [storage={}]", snapshot, storage2);
final String restoredAgainIndex = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final String restoredAgainIndex = "restored-" + randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
mountSnapshot(
repositoryName,
snapshot.getName(),
Expand All @@ -254,6 +262,10 @@ public void testBlobStoreCache() throws Exception {
);
ensureGreen(restoredAgainIndex);

assertRecoveryStats(restoredAgainIndex, false);
assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);
waitForBlobCacheFillsToComplete();

logger.info("--> verifying shards of [{}] were started without using the blob store more than necessary", restoredAgainIndex);
checkNoBlobStoreAccess();

Expand Down Expand Up @@ -289,6 +301,10 @@ public Settings onNodeStopped(String nodeName) throws Exception {
});
ensureGreen(restoredAgainIndex);

assertRecoveryStats(restoredAgainIndex, false);
assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);
waitForBlobCacheFillsToComplete();

logger.info("--> shards of [{}] should start without downloading bytes from the blob store", restoredAgainIndex);
checkNoBlobStoreAccess();

Expand All @@ -311,8 +327,18 @@ public Settings onNodeStopped(String nodeName) throws Exception {
logger.info("--> verifying number of documents in index [{}]", restoredAgainIndex);
assertHitCount(client().prepareSearch(restoredAgainIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs);

// TODO also test when the index is frozen
// TODO also test when prewarming is enabled
logger.info("--> deleting indices, maintenance service should clean up snapshot blob cache index");
assertAcked(client().admin().indices().prepareDelete("restored-*"));
assertBusy(() -> {
refreshSystemIndex();
assertHitCount(
systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX)
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)
.setSize(0)
.get(),
0L
);
});
}

private void checkNoBlobStoreAccess() {
Expand Down
Loading