Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions docs/reference/searchable-snapshots/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,34 @@ node that does not have the <<data-frozen-node,`data_frozen`>> role, it will be
is set to `0b`. Additionally, nodes with a shared cache can only have a single
<<path-settings,data path>>.

{es} also uses a dedicated system index named `.snapshot-blob-cache` to speed
up the recoveries of {search-snap} shards. This index is used as an additional
caching layer on top of the partially or fully mounted data and contains the
minimal required data to start the {search-snap} shards. {es} automatically
deletes the documents that are no longer used in this index. This periodic
clean up can be tuned using the following settings:

`searchable_snapshots.blob_cache.periodic_cleanup.interval`::
(<<dynamic-cluster-setting,Dynamic>>)
The interval at which the periodic cleanup of the `.snapshot-blob-cache`
index is scheduled. Defaults to every hour (`1h`).

`searchable_snapshots.blob_cache.periodic_cleanup.retention_period`::
(<<dynamic-cluster-setting,Dynamic>>)
The retention period to keep obsolete documents in the `.snapshot-blob-cache`
index. Defaults to every hour (`1h`).

`searchable_snapshots.blob_cache.periodic_cleanup.batch_size`::
(<<dynamic-cluster-setting,Dynamic>>)
The number of documents that are searched for and bulk-deleted at once during
the periodic cleanup of the `.snapshot-blob-cache` index. Defaults to `100`.

`searchable_snapshots.blob_cache.periodic_cleanup.pit_keep_alive`::
(<<dynamic-cluster-setting,Dynamic>>)
The value used for the <point-in-time-keep-alive,point-in-time keep alive>>
requests executed during the periodic cleanup of the `.snapshot-blob-cache`
index. Defaults to `10m`.

[discrete]
[[searchable-snapshots-costs]]
=== Reduce costs with {search-snaps}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,11 @@ public List<Setting<?>> getSettings() {
FrozenCacheService.FROZEN_CACHE_RECOVERY_RANGE_SIZE_SETTING,
FrozenCacheService.SNAPSHOT_CACHE_MAX_FREQ_SETTING,
FrozenCacheService.SNAPSHOT_CACHE_DECAY_INTERVAL_SETTING,
FrozenCacheService.SNAPSHOT_CACHE_MIN_TIME_DELTA_SETTING
FrozenCacheService.SNAPSHOT_CACHE_MIN_TIME_DELTA_SETTING,
BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING,
BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_KEEP_ALIVE_SETTING,
BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_BATCH_SIZE_SETTING,
BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_RETENTION_PERIOD
);
}

Expand Down Expand Up @@ -336,11 +340,12 @@ public Collection<Object> createComponents(
final BlobStoreCacheService blobStoreCacheService = new BlobStoreCacheService(
clusterService,
client,
SNAPSHOT_BLOB_CACHE_INDEX,
threadPool::absoluteTimeInMillis
SNAPSHOT_BLOB_CACHE_INDEX
);
this.blobStoreCacheService.set(blobStoreCacheService);
clusterService.addListener(new BlobStoreCacheMaintenanceService(threadPool, client, SNAPSHOT_BLOB_CACHE_INDEX));
clusterService.addListener(
new BlobStoreCacheMaintenanceService(settings, clusterService, threadPool, client, SNAPSHOT_BLOB_CACHE_INDEX)
);
components.add(blobStoreCacheService);
} else {
PersistentCache.cleanUp(settings, nodeEnvironment);
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.ClientHelper.SEARCHABLE_SNAPSHOTS_ORIGIN;
Expand All @@ -72,17 +71,15 @@ public class BlobStoreCacheService extends AbstractLifecycleComponent {

private final ClusterService clusterService;
private final Semaphore inFlightCacheFills;
private final Supplier<Long> timeSupplier;
private final AtomicBoolean closed;
private final Client client;
private final String index;

public BlobStoreCacheService(ClusterService clusterService, Client client, String index, Supplier<Long> timeSupplier) {
public BlobStoreCacheService(ClusterService clusterService, Client client, String index) {
this.client = new OriginSettingClient(client, SEARCHABLE_SNAPSHOTS_ORIGIN);
this.inFlightCacheFills = new Semaphore(MAX_IN_FLIGHT_CACHE_FILLS);
this.closed = new AtomicBoolean(false);
this.clusterService = clusterService;
this.timeSupplier = timeSupplier;
this.index = index;
}

Expand Down Expand Up @@ -242,12 +239,13 @@ public final void putAsync(
final String name,
final ByteRange range,
final BytesReference bytes,
final long timeInEpochMillis,
final ActionListener<Void> listener
) {
final String id = generateId(repository, snapshotId, indexId, shardId, name, range);
try {
final CachedBlob cachedBlob = new CachedBlob(
Instant.ofEpochMilli(timeSupplier.get()),
Instant.ofEpochMilli(timeInEpochMillis),
Version.CURRENT,
repository,
name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class CachedBlob implements ToXContent {
public static final CachedBlob CACHE_MISS = new CachedBlob(null, null, null, "CACHE_MISS", null, BytesArray.EMPTY, 0L, 0L);

private static final String TYPE = "blob";
public static final String CREATION_TIME_FIELD = "creation_time";

private final Instant creationTime;
private final Version version;
Expand Down Expand Up @@ -80,7 +81,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startObject();
{
builder.field("type", TYPE);
builder.field("creation_time", creationTime.toEpochMilli());
builder.field(CREATION_TIME_FIELD, creationTime.toEpochMilli());
builder.field("version", version.id);
builder.field("repository", repository);
builder.startObject("blob");
Expand Down Expand Up @@ -117,9 +118,17 @@ public BytesReference bytes() {
return bytes;
}

public Version version() {
return version;
}

public Instant creationTime() {
return creationTime;
}

@SuppressWarnings("unchecked")
public static CachedBlob fromSource(final Map<String, Object> source) {
final Long creationTimeEpochMillis = (Long) source.get("creation_time");
final Long creationTimeEpochMillis = (Long) source.get(CREATION_TIME_FIELD);
if (creationTimeEpochMillis == null) {
throw new IllegalStateException("cached blob document does not have the [creation_time] field");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,17 @@ public CachedBlob getCachedBlob(String name, ByteRange range) {
}

public void putCachedBlob(String name, ByteRange range, BytesReference content, ActionListener<Void> listener) {
blobStoreCacheService.putAsync(repository, snapshotId, indexId, shardId, name, range, content, listener);
blobStoreCacheService.putAsync(
repository,
snapshotId,
indexId,
shardId,
name,
range,
content,
threadPool.absoluteTimeInMillis(),
listener
);
}

public FrozenCacheFile getFrozenCacheFile(String fileName, long length) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void testGetWhenServiceNotStarted() {
return null;
}).when(mockClient).execute(eq(GetAction.INSTANCE), any(GetRequest.class), any(ActionListener.class));

BlobStoreCacheService blobCacheService = new BlobStoreCacheService(null, mockClient, SNAPSHOT_BLOB_CACHE_INDEX, () -> 0L);
BlobStoreCacheService blobCacheService = new BlobStoreCacheService(null, mockClient, SNAPSHOT_BLOB_CACHE_INDEX);
blobCacheService.start();

PlainActionFuture<CachedBlob> future = PlainActionFuture.newFuture();
Expand Down Expand Up @@ -137,17 +137,17 @@ public void testPutWhenServiceNotStarted() {
return null;
}).when(mockClient).execute(eq(IndexAction.INSTANCE), any(IndexRequest.class), any(ActionListener.class));

BlobStoreCacheService blobCacheService = new BlobStoreCacheService(null, mockClient, SNAPSHOT_BLOB_CACHE_INDEX, () -> 0L);
BlobStoreCacheService blobCacheService = new BlobStoreCacheService(null, mockClient, SNAPSHOT_BLOB_CACHE_INDEX);
blobCacheService.start();

PlainActionFuture<Void> future = PlainActionFuture.newFuture();
blobCacheService.putAsync(repository, snapshotId, indexId, shardId, fileName, range, BytesArray.EMPTY, future);
blobCacheService.putAsync(repository, snapshotId, indexId, shardId, fileName, range, BytesArray.EMPTY, 0L, future);
assertThat(future.actionGet(), nullValue());

blobCacheService.stop();

future = PlainActionFuture.newFuture();
blobCacheService.putAsync(repository, snapshotId, indexId, shardId, fileName, range, BytesArray.EMPTY, future);
blobCacheService.putAsync(repository, snapshotId, indexId, shardId, fileName, range, BytesArray.EMPTY, 0L, future);
IllegalStateException exception = expectThrows(IllegalStateException.class, future::actionGet);
assertThat(exception.getMessage(), containsString("Blob cache service is closed"));
}
Expand Down Expand Up @@ -176,7 +176,7 @@ public void testWaitForInFlightCacheFillsToComplete() throws Exception {
return null;
}).when(mockClient).execute(eq(IndexAction.INSTANCE), any(IndexRequest.class), any(ActionListener.class));

final BlobStoreCacheService blobCacheService = new BlobStoreCacheService(null, mockClient, SNAPSHOT_BLOB_CACHE_INDEX, () -> 0L);
final BlobStoreCacheService blobCacheService = new BlobStoreCacheService(null, mockClient, SNAPSHOT_BLOB_CACHE_INDEX);
blobCacheService.start();

assertThat(blobCacheService.getInFlightCacheFills(), equalTo(0));
Expand All @@ -186,7 +186,7 @@ public void testWaitForInFlightCacheFillsToComplete() throws Exception {
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
threadPool.generic()
.execute(
() -> blobCacheService.putAsync(repository, snapshotId, indexId, shardId, fileName, range, BytesArray.EMPTY, future)
() -> blobCacheService.putAsync(repository, snapshotId, indexId, shardId, fileName, range, BytesArray.EMPTY, 0L, future)
);
futures.add(future);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ private static Client mockClient() {
public static class NoopBlobStoreCacheService extends BlobStoreCacheService {

public NoopBlobStoreCacheService() {
super(null, mockClient(), SNAPSHOT_BLOB_CACHE_INDEX, () -> 0L);
super(null, mockClient(), SNAPSHOT_BLOB_CACHE_INDEX);
}

@Override
Expand Down Expand Up @@ -353,7 +353,7 @@ public static class SimpleBlobStoreCacheService extends BlobStoreCacheService {
private final ConcurrentHashMap<String, BytesArray> blobs = new ConcurrentHashMap<>();

public SimpleBlobStoreCacheService() {
super(null, mockClient(), SNAPSHOT_BLOB_CACHE_INDEX, System::currentTimeMillis);
super(null, mockClient(), SNAPSHOT_BLOB_CACHE_INDEX);
}

@Override
Expand Down