Skip to content

Commit 3c5e896

Browse files
authored
[7.x] Add periodic maintenance task to clean up unused blob store cache docs (#78610)
* Add periodic maintenance task to clean up unused blob store cache docs (#78438) In #77686 we added a service to clean up blob store cache docs after a searchable snapshot is no more used. We noticed some situations where some cache docs could still remain in the system index: when the system index is not available when the searchable snapshot index is deleted; when the system index is restored from a backup or when the searchable snapshot index was deleted on a version before #77686. This commit introduces a maintenance task that periodically scans and cleans up unused blob cache docs. This task is scheduled to run every hour on the data node that contain the blob store cache primary shard. The periodic task works by using a point in time context with search_after. * fix
1 parent 30f9a1f commit 3c5e896

File tree

9 files changed

+819
-117
lines changed

9 files changed

+819
-117
lines changed

docs/reference/searchable-snapshots/index.asciidoc

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,34 @@ node that does not have the <<data-frozen-node,`data_frozen`>> role, it will be
200200
is set to `0b`. Additionally, nodes with a shared cache can only have a single
201201
<<path-settings,data path>>.
202202

203+
{es} also uses a dedicated system index named `.snapshot-blob-cache` to speed
204+
up the recoveries of {search-snap} shards. This index is used as an additional
205+
caching layer on top of the partially or fully mounted data and contains the
206+
minimal required data to start the {search-snap} shards. {es} automatically
207+
deletes the documents that are no longer used in this index. This periodic
208+
clean up can be tuned using the following settings:
209+
210+
`searchable_snapshots.blob_cache.periodic_cleanup.interval`::
211+
(<<dynamic-cluster-setting,Dynamic>>)
212+
The interval at which the periodic cleanup of the `.snapshot-blob-cache`
213+
index is scheduled. Defaults to every hour (`1h`).
214+
215+
`searchable_snapshots.blob_cache.periodic_cleanup.retention_period`::
216+
(<<dynamic-cluster-setting,Dynamic>>)
217+
The retention period to keep obsolete documents in the `.snapshot-blob-cache`
218+
index. Defaults to every hour (`1h`).
219+
220+
`searchable_snapshots.blob_cache.periodic_cleanup.batch_size`::
221+
(<<dynamic-cluster-setting,Dynamic>>)
222+
The number of documents that are searched for and bulk-deleted at once during
223+
the periodic cleanup of the `.snapshot-blob-cache` index. Defaults to `100`.
224+
225+
`searchable_snapshots.blob_cache.periodic_cleanup.pit_keep_alive`::
226+
(<<dynamic-cluster-setting,Dynamic>>)
227+
The value used for the <point-in-time-keep-alive,point-in-time keep alive>>
228+
requests executed during the periodic cleanup of the `.snapshot-blob-cache`
229+
index. Defaults to `10m`.
230+
203231
[discrete]
204232
[[searchable-snapshots-costs]]
205233
=== Reduce costs with {search-snaps}

x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java

Lines changed: 246 additions & 65 deletions
Large diffs are not rendered by default.

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,11 @@ public List<Setting<?>> getSettings() {
305305
FrozenCacheService.FROZEN_CACHE_RECOVERY_RANGE_SIZE_SETTING,
306306
FrozenCacheService.SNAPSHOT_CACHE_MAX_FREQ_SETTING,
307307
FrozenCacheService.SNAPSHOT_CACHE_DECAY_INTERVAL_SETTING,
308-
FrozenCacheService.SNAPSHOT_CACHE_MIN_TIME_DELTA_SETTING
308+
FrozenCacheService.SNAPSHOT_CACHE_MIN_TIME_DELTA_SETTING,
309+
BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING,
310+
BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_KEEP_ALIVE_SETTING,
311+
BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_BATCH_SIZE_SETTING,
312+
BlobStoreCacheMaintenanceService.SNAPSHOT_SNAPSHOT_CLEANUP_RETENTION_PERIOD
309313
);
310314
}
311315

@@ -336,11 +340,12 @@ public Collection<Object> createComponents(
336340
final BlobStoreCacheService blobStoreCacheService = new BlobStoreCacheService(
337341
clusterService,
338342
client,
339-
SNAPSHOT_BLOB_CACHE_INDEX,
340-
threadPool::absoluteTimeInMillis
343+
SNAPSHOT_BLOB_CACHE_INDEX
341344
);
342345
this.blobStoreCacheService.set(blobStoreCacheService);
343-
clusterService.addListener(new BlobStoreCacheMaintenanceService(threadPool, client, SNAPSHOT_BLOB_CACHE_INDEX));
346+
clusterService.addListener(
347+
new BlobStoreCacheMaintenanceService(settings, clusterService, threadPool, client, SNAPSHOT_BLOB_CACHE_INDEX)
348+
);
344349
components.add(blobStoreCacheService);
345350
} else {
346351
PersistentCache.cleanUp(settings, nodeEnvironment);

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java

Lines changed: 503 additions & 32 deletions
Large diffs are not rendered by default.

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheService.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import java.util.concurrent.Semaphore;
5050
import java.util.concurrent.TimeUnit;
5151
import java.util.concurrent.atomic.AtomicBoolean;
52-
import java.util.function.Supplier;
5352

5453
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
5554
import static org.elasticsearch.xpack.core.ClientHelper.SEARCHABLE_SNAPSHOTS_ORIGIN;
@@ -72,17 +71,15 @@ public class BlobStoreCacheService extends AbstractLifecycleComponent {
7271

7372
private final ClusterService clusterService;
7473
private final Semaphore inFlightCacheFills;
75-
private final Supplier<Long> timeSupplier;
7674
private final AtomicBoolean closed;
7775
private final Client client;
7876
private final String index;
7977

80-
public BlobStoreCacheService(ClusterService clusterService, Client client, String index, Supplier<Long> timeSupplier) {
78+
public BlobStoreCacheService(ClusterService clusterService, Client client, String index) {
8179
this.client = new OriginSettingClient(client, SEARCHABLE_SNAPSHOTS_ORIGIN);
8280
this.inFlightCacheFills = new Semaphore(MAX_IN_FLIGHT_CACHE_FILLS);
8381
this.closed = new AtomicBoolean(false);
8482
this.clusterService = clusterService;
85-
this.timeSupplier = timeSupplier;
8683
this.index = index;
8784
}
8885

@@ -242,12 +239,13 @@ public final void putAsync(
242239
final String name,
243240
final ByteRange range,
244241
final BytesReference bytes,
242+
final long timeInEpochMillis,
245243
final ActionListener<Void> listener
246244
) {
247245
final String id = generateId(repository, snapshotId, indexId, shardId, name, range);
248246
try {
249247
final CachedBlob cachedBlob = new CachedBlob(
250-
Instant.ofEpochMilli(timeSupplier.get()),
248+
Instant.ofEpochMilli(timeInEpochMillis),
251249
Version.CURRENT,
252250
repository,
253251
name,

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/CachedBlob.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public class CachedBlob implements ToXContent {
3131
public static final CachedBlob CACHE_MISS = new CachedBlob(null, null, null, "CACHE_MISS", null, BytesArray.EMPTY, 0L, 0L);
3232

3333
private static final String TYPE = "blob";
34+
public static final String CREATION_TIME_FIELD = "creation_time";
3435

3536
private final Instant creationTime;
3637
private final Version version;
@@ -80,7 +81,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
8081
builder.startObject();
8182
{
8283
builder.field("type", TYPE);
83-
builder.field("creation_time", creationTime.toEpochMilli());
84+
builder.field(CREATION_TIME_FIELD, creationTime.toEpochMilli());
8485
builder.field("version", version.id);
8586
builder.field("repository", repository);
8687
builder.startObject("blob");
@@ -117,9 +118,17 @@ public BytesReference bytes() {
117118
return bytes;
118119
}
119120

121+
public Version version() {
122+
return version;
123+
}
124+
125+
public Instant creationTime() {
126+
return creationTime;
127+
}
128+
120129
@SuppressWarnings("unchecked")
121130
public static CachedBlob fromSource(final Map<String, Object> source) {
122-
final Long creationTimeEpochMillis = (Long) source.get("creation_time");
131+
final Long creationTimeEpochMillis = (Long) source.get(CREATION_TIME_FIELD);
123132
if (creationTimeEpochMillis == null) {
124133
throw new IllegalStateException("cached blob document does not have the [creation_time] field");
125134
}

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectory.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -713,7 +713,17 @@ public CachedBlob getCachedBlob(String name, ByteRange range) {
713713
}
714714

715715
public void putCachedBlob(String name, ByteRange range, BytesReference content, ActionListener<Void> listener) {
716-
blobStoreCacheService.putAsync(repository, snapshotId, indexId, shardId, name, range, content, listener);
716+
blobStoreCacheService.putAsync(
717+
repository,
718+
snapshotId,
719+
indexId,
720+
shardId,
721+
name,
722+
range,
723+
content,
724+
threadPool.absoluteTimeInMillis(),
725+
listener
726+
);
717727
}
718728

719729
public FrozenCacheFile getFrozenCacheFile(String fileName, long length) {

x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheServiceTests.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public void testGetWhenServiceNotStarted() {
104104
return null;
105105
}).when(mockClient).execute(eq(GetAction.INSTANCE), any(GetRequest.class), any(ActionListener.class));
106106

107-
BlobStoreCacheService blobCacheService = new BlobStoreCacheService(null, mockClient, SNAPSHOT_BLOB_CACHE_INDEX, () -> 0L);
107+
BlobStoreCacheService blobCacheService = new BlobStoreCacheService(null, mockClient, SNAPSHOT_BLOB_CACHE_INDEX);
108108
blobCacheService.start();
109109

110110
PlainActionFuture<CachedBlob> future = PlainActionFuture.newFuture();
@@ -137,17 +137,17 @@ public void testPutWhenServiceNotStarted() {
137137
return null;
138138
}).when(mockClient).execute(eq(IndexAction.INSTANCE), any(IndexRequest.class), any(ActionListener.class));
139139

140-
BlobStoreCacheService blobCacheService = new BlobStoreCacheService(null, mockClient, SNAPSHOT_BLOB_CACHE_INDEX, () -> 0L);
140+
BlobStoreCacheService blobCacheService = new BlobStoreCacheService(null, mockClient, SNAPSHOT_BLOB_CACHE_INDEX);
141141
blobCacheService.start();
142142

143143
PlainActionFuture<Void> future = PlainActionFuture.newFuture();
144-
blobCacheService.putAsync(repository, snapshotId, indexId, shardId, fileName, range, BytesArray.EMPTY, future);
144+
blobCacheService.putAsync(repository, snapshotId, indexId, shardId, fileName, range, BytesArray.EMPTY, 0L, future);
145145
assertThat(future.actionGet(), nullValue());
146146

147147
blobCacheService.stop();
148148

149149
future = PlainActionFuture.newFuture();
150-
blobCacheService.putAsync(repository, snapshotId, indexId, shardId, fileName, range, BytesArray.EMPTY, future);
150+
blobCacheService.putAsync(repository, snapshotId, indexId, shardId, fileName, range, BytesArray.EMPTY, 0L, future);
151151
IllegalStateException exception = expectThrows(IllegalStateException.class, future::actionGet);
152152
assertThat(exception.getMessage(), containsString("Blob cache service is closed"));
153153
}
@@ -176,7 +176,7 @@ public void testWaitForInFlightCacheFillsToComplete() throws Exception {
176176
return null;
177177
}).when(mockClient).execute(eq(IndexAction.INSTANCE), any(IndexRequest.class), any(ActionListener.class));
178178

179-
final BlobStoreCacheService blobCacheService = new BlobStoreCacheService(null, mockClient, SNAPSHOT_BLOB_CACHE_INDEX, () -> 0L);
179+
final BlobStoreCacheService blobCacheService = new BlobStoreCacheService(null, mockClient, SNAPSHOT_BLOB_CACHE_INDEX);
180180
blobCacheService.start();
181181

182182
assertThat(blobCacheService.getInFlightCacheFills(), equalTo(0));
@@ -186,7 +186,7 @@ public void testWaitForInFlightCacheFillsToComplete() throws Exception {
186186
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
187187
threadPool.generic()
188188
.execute(
189-
() -> blobCacheService.putAsync(repository, snapshotId, indexId, shardId, fileName, range, BytesArray.EMPTY, future)
189+
() -> blobCacheService.putAsync(repository, snapshotId, indexId, shardId, fileName, range, BytesArray.EMPTY, 0L, future)
190190
);
191191
futures.add(future);
192192
}

x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/TestUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ private static Client mockClient() {
324324
public static class NoopBlobStoreCacheService extends BlobStoreCacheService {
325325

326326
public NoopBlobStoreCacheService() {
327-
super(null, mockClient(), SNAPSHOT_BLOB_CACHE_INDEX, () -> 0L);
327+
super(null, mockClient(), SNAPSHOT_BLOB_CACHE_INDEX);
328328
}
329329

330330
@Override
@@ -353,7 +353,7 @@ public static class SimpleBlobStoreCacheService extends BlobStoreCacheService {
353353
private final ConcurrentHashMap<String, BytesArray> blobs = new ConcurrentHashMap<>();
354354

355355
public SimpleBlobStoreCacheService() {
356-
super(null, mockClient(), SNAPSHOT_BLOB_CACHE_INDEX, System::currentTimeMillis);
356+
super(null, mockClient(), SNAPSHOT_BLOB_CACHE_INDEX);
357357
}
358358

359359
@Override

0 commit comments

Comments
 (0)