-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Add periodic maintenance task to clean up unused blob store cache docs #78438
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| if (Objects.equals(indexId, otherIndexId)) { | ||
| return true; | ||
| } | ||
| if (Objects.equals(snapshotId, SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: this method does not check if the snapshot belongs to the same repository. Since snapshot UUID and index UUID are unique ids I think the risk of collision is very limited, given that in case of collision the cached docs are not deleted.
|
|
||
| final Set<Tuple<String, String>> knownSnapshots = new HashSet<>(); | ||
| final Set<Tuple<String, String>> missingSnapshots = new HashSet<>(); | ||
| final Set<String> knownRepositories = state.metadata() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We keep around the snapshots in order to avoid reiterating over all indices in cluster state.
|
|
||
| // See {@link BlobStoreCacheService#generateId} | ||
| // doc id = {repository name}/{snapshot id}/{snapshot index id}/{shard id}/{file name}/@{file offset} | ||
| final String[] parts = Objects.requireNonNull(searchHit.getId()).split("/"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed that the repository name is part of the document id :( Using our own maintenance task here could help to reindex the docs without the repository name, if we think the snapshot UUID + index UUID are unique enough to avoid collisions.
henningandersen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went through the production code changes, I have one concern I would like to clarify before reviewing the rest.
| if (periodicTask == null || periodicTask.isCancelled()) { | ||
| schedulePeriodic = true; | ||
| schedulePeriodicTask(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: since we have stop in it's own method, I would prefer to also add a start method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I pushed 71ff57a
| final Tuple<String, String> snapshot = Tuple.tuple(parts[1], parts[2]); | ||
| boolean isMissing = missingSnapshots.contains(snapshot); | ||
| boolean isKnown = knownSnapshots.contains(snapshot); | ||
| if (isMissing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than maintain this extra state and loop through the indices for every snapshot we see, I wonder if we could simply find the set of snapshots to keep after opening the PIT? Since the PIT will not return docs indexed after it was opened, that should be relatively safe.
There is a race condition though, in that we risk a searchable snapshot in the process of being mounted but this node does not know about it yet. That race also exists with the current structure.
I can think of a few options:
- Remember max seq-no from last round and only allow deleting entries with lower seq-no than that.
- Wait 10 sec after opening PIT before proceeding.
- Collect the snapshot ids to delete but wait a round before deleting, rechecking that they can still be deleted.
- Ignore the issue, put in a comment and accept that we rarely delete a bit too eagerly.
- Add a timestamp to each entry, allowing us to not delete entries newer than an hour or so.
Do you have other ideas/options?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this feedback. I used to consider this cache as a best effort thing so in my mind I accepted that we sometimes delete cache entries too early. With the fact that it runs not so frequently and that the cluster state is "refreshed" for every batch of docs to scan I found it acceptable.
But I do like your suggestion of using a timestamp. It fits well with the current behavior (assuming we list the snapshots to keep once the PIT is opened) and in fact I already added a creation_time field to the docs as I anticipated it would be useful in the future... for clean up.
So I'd go with listing the snapshots to keep and use the existing timestamp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds great, I somehow missed the timestamp (was not given as args to putAsync), but do see it now. I agree to this path forward.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I somehow missed the timestamp (was not given as args to
putAsync)
I moved the creation time as an argument of putAsync in 6ac5a6e. This is more natural and it is useful in tests too.
I pushed 70b7666 to compute the existing searchable snapshots before the first bulk request is executed. I updated the test to check that recent cache entries are correctly ignored.
|
Thanks Henning for your first comments. I updated the code, let me know if you have more feedback. |
henningandersen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Tanguy, I left a number of smaller comments, otherwise this looks good.
| /** | ||
| * The interval at which the periodic cleanup of the blob store cache index is scheduled. | ||
| */ | ||
| public static final Setting<TimeValue> SNAPSHOT_SNAPSHOT_CLEANUP_INTERVAL_SETTING = Setting.timeSetting( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us document these settings as part of this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have no idea where the documentation for these settings should go. searchable-snapshots/index.asciidoc is more of a high-level explanation of the feature. Maybe in the Mount API since this is what triggers the creation of the system index?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can put it in the high level description for now (also contains the cache size settings). Maybe @jrodewig can then figure out if a new settings page should be added as a follow-up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @tlrx @henningandersen.
I agree that adding this setting to the high-level page is okay for now. That page currently includes docs for some of the cache-related settings, like xpack.searchable.snapshot.shared_cache.size.
I'll defer to @debadair on whether to create a separate page in the long term. She's the docs lead for things related to searchable snapshots and data tiers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks both! I pushed 8909ab8, suggestions welcome
|
|
||
| if (searchResponse == null) { | ||
| final SearchSourceBuilder searchSource = new SearchSourceBuilder(); | ||
| searchSource.trackScores(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add searchSource.trackTotalHits(searchAfter==null)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense yes, I pushed 2a84b4a
| searchSource.pointInTimeBuilder(pointInTime); | ||
| pointInTime.setKeepAlive(keepAlive); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps swap the order of these two lines?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, pushed in 2a84b4a
| if (searchAfter != null) { | ||
| searchSource.searchAfter(searchAfter); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this before creating the search request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, pushed in 2a84b4a
| .minus(retention.duration(), retention.timeUnit().toChronoUnit()); | ||
|
|
||
| // compute the list of existing searchable snapshots once | ||
| Map<String, Set<String>> knownSnapshots = existingSnapshots; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can make this a field to avoid collecting this for every search round?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it was already computed once. But I reworked this a bit in 15ca8cd where the expiration time, snapshots and repositories are all computed once.
| final BulkRequest bulkRequest = new BulkRequest(); | ||
|
|
||
| final TimeValue retention = periodicTaskRetention; | ||
| final Instant expirationTime = Instant.ofEpochMilli(threadPool.absoluteTimeInMillis()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also make this a field. At least we should take current time before taking the state (though this is best-effort and it thus really does not matter).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. I made expirationTime a field and used it as a trigger to compute all other ones (snapshots and repos) in 15ca8cd
| deleteRequest.setIfSeqNo(searchHit.getSeqNo()); | ||
| deleteRequest.setIfPrimaryTerm(searchHit.getPrimaryTerm()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this is necessary. Not that it harms, but we do not retry on failure and I think it would be correct without it. Obviously it will be sorted on next round in case it should fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this to avoid collision in case of a primary shard failing over. But since we only delete and don't retry or log, it does not make much sense. I removed this in 1d0654f
| threadPool.generic().execute(maintenanceTask); | ||
| } | ||
|
|
||
| private void complete(PeriodicMaintenanceTask maintenanceTask, @Nullable Exception failure) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this not be a method on PeriodicMaintenanceTask to avoid the maintenanceTask parameter here and the dereferences of it in the implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I pushed b009c35.
| assertAcked(client().admin().cluster().prepareDeleteRepository("repo")); | ||
| ensureClusterStateConsistency(); | ||
|
|
||
| refreshSystemIndex(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this line necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a left over from debugging the test. It is not required as the system index is deleted before. I removed it in f963685
| return; // state not fully recovered | ||
| } | ||
| final ShardRouting primary = systemIndexPrimaryShard(state); | ||
| if (primary == null || Objects.equals(state.nodes().getLocalNodeId(), primary.currentNodeId()) == false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should also check that shard is active here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
@henningandersen This is ready for another round of review; I'm still trying to find where adding the doc for the new settings. |
henningandersen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the docs and extra iteration.
| maintenanceTask.searchAfter = null; | ||
| executeNext(maintenanceTask); | ||
| if (searchAfter == null) { | ||
| PeriodicMaintenanceTask.this.total.compareAndSet(0L, response.getHits().getTotalHits().value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we instead assert that total == 0 and then set the value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
|
|
||
| if (searchResponse == null) { | ||
| final SearchSourceBuilder searchSource = new SearchSourceBuilder(); | ||
| searchSource.fetchField(CachedBlob.CREATION_TIME_FIELD); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use a new FieldAndFormat(CachedBlob.CREATION_TIME_FIELD, "epoch_millis") to avoid parsing the string to long in getCreationTime?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but it seems that the Fields API still return a String to be parsed.
|
Thanks Henning and James! |
elastic#78438) In elastic#77686 we added a service to clean up blob store cache docs after a searchable snapshot is no more used. We noticed some situations where some cache docs could still remain in the system index: when the system index is not available when the searchable snapshot index is deleted; when the system index is restored from a backup or when the searchable snapshot index was deleted on a version before elastic#77686. This commit introduces a maintenance task that periodically scans and cleans up unused blob cache docs. This task is scheduled to run every hour on the data node that contain the blob store cache primary shard. The periodic task works by using a point in time context with search_after.
…he docs (#78610) * Add periodic maintenance task to clean up unused blob store cache docs (#78438) In #77686 we added a service to clean up blob store cache docs after a searchable snapshot is no more used. We noticed some situations where some cache docs could still remain in the system index: when the system index is not available when the searchable snapshot index is deleted; when the system index is restored from a backup or when the searchable snapshot index was deleted on a version before #77686. This commit introduces a maintenance task that periodically scans and cleans up unused blob cache docs. This task is scheduled to run every hour on the data node that contain the blob store cache primary shard. The periodic task works by using a point in time context with search_after. * fix
In #77686 we added a service to clean up blob store cache docs after a searchable snapshot is no more used. We noticed some situations where some cache docs could still remain in the system index: when the system index is not available when the searchable snapshot index is deleted; when the system index is restored from a backup or when the searchable snapshot index was deleted on a version before #77686.
This pull request introduce a maintenance task that periodically scans and cleans up unused blob cache docs. This task is scheduled to run every hour on the data node that contain the blob store cache primary shard. The periodic task works by using a point in time context with
search_after. I avoided to use the Reindex and AbstractBulkByScrollRequest infrastructure as it requires a lot of plumbing and I wanted this task to be easily modifiable in the case we want to upgrade/reindex the system index by ourselves in the future.