-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Add maintenance service to clean up unused docs in snapshot blob cache #77686
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Pinging @elastic/es-distributed (Team:Distributed) |
henningandersen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did an initial read of the main service. I wonder if there is a way we can make it more bullet-proof?
| if (state.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) { | ||
| return; // state not fully recovered | ||
| } | ||
| if (event.indicesDeleted() == null || event.indicesDeleted().isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not entirely sure this catches deletes that happen while the shard is red, i.e., all nodes containing the shard are offline?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought a bit more about this and I think that you are right, it's possible that some indices deletions got bypassed if the .snapshot-blob-cache is unassigned for whatever reason.
A more solid solution would be to run this service on the master node and always try to delete cache entries whatever the status of the .snapshot-blob-cache cache is. Then we could complete this by running a full cache clean up every day that would list the entries and remove the ones that do not belong to mounted indices. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could still keep it on the 0'th primary shard node. But running a cleanup like you describe once per day could help catch any misses. I think that should work out fine too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Henning. I'll update this PR accordingly and I'll add a daily task in a follow up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opened #78438 for the full clean up.
|
|
||
| @Override | ||
| public void onFailure(Exception e) { | ||
| logger.warn( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose we can expect this to happen rarely, i.e., we would still be leaking entries in the cache.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we should be more explicit about leaked entries in the system index when logging.
At the same time I think we should complete this on-index-deletion clean up with another clean up that would run every day (I think we have maintenance service like this) to clean up unused cache entries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense to me. Also, we can then move the logging here to debug level, since we are going to clean it up anyway.
|
|
||
| final Set<MaintenanceTask> tasks = new HashSet<>(); | ||
|
|
||
| for (Index deletedIndex : event.indicesDeleted()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add this to the task spawned on generic pool, seems like we would always spawn at least one task here. And then perhaps just spawn one task, responsible for cleaning this rather than potentially many tasks? Just to take more of the burden off the applier thread.
|
Thanks for your feedback @henningandersen. I've updated the PR according to your comments. I also improved the integration tests and I'll follow with the daily/fully clean up maintenance task once this is merged. Let me know what you think 🙏🏻 |
henningandersen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My only concern here is the level of parallelism we may spawn, otherwise this looks good.
| final ClusterState state = event.state(); | ||
| for (Index deletedIndex : event.indicesDeleted()) { | ||
| final IndexMetadata indexMetadata = event.previousState().metadata().index(deletedIndex); | ||
| if (indexMetadata != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we expect the metadata to exist in the previous state? I think the if is fine, but could be accompanied by assert indexMetadata != null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
|
|
||
| final DeleteByQueryRequest request = new DeleteByQueryRequest(systemIndexName); | ||
| request.setQuery(buildDeleteByQuery(indexMetadata.getNumberOfShards(), snapshotId.getUUID(), indexId.getId())); | ||
| clientWithOrigin.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I worry a bit that we risk scenarios where we fire off 1000 delete by query requests in this loop. I wonder if we can wait spawning the next request until the previous either failed or succeeded?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do agree. I reworked this to queue the delete-by-query request and execute them in sequence.
| @Override | ||
| public void onResponse(BulkByScrollResponse response) { | ||
| logger.debug( | ||
| "snapshot blob cache maintenance task deleted [{}] documents for snapshot [{}] of index {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we also mention the deleted mounted index name in this message (and in the failure message)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
| } | ||
|
|
||
| final DeleteByQueryRequest request = new DeleteByQueryRequest(systemIndexName); | ||
| request.setQuery(buildDeleteByQuery(indexMetadata.getNumberOfShards(), snapshotId.getUUID(), indexId.getId())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to ask delete-by-query to refresh to ensure it can see the newest entries, at least on the first request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For simplification I added the refresh for the 1st delete-by-query
arteam
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've left a couple of small suggestions
| @Override | ||
| protected Collection<Class<? extends Plugin>> nodePlugins() { | ||
| return CollectionUtils.appendToCopy(super.nodePlugins(), WaitForSnapshotBlobCacheShardsActivePlugin.class); | ||
| final List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: create plugins in declarative style without a variable
Stream.concat(super.nodePlugins().stream(),
Stream.of(WaitForSnapshotBlobCacheShardsActivePlugin.class, ReindexPlugin.class))
.collect(Collectors.toList())There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using concatenation of streams here seems a bit overkill and less readable to me 😁 Anyway if I'm the only one to prefer the old fashion way I'll turn this into streams, otherwise I prefer to keep it like this.
| if (indexService.index().getName().equals(restoredIndex)) { | ||
| for (IndexShard indexShard : indexService) { | ||
| try { | ||
| final SearchableSnapshotDirectory directory = unwrapDirectory(indexShard.store().directory()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that the directory variable can be inlined and we can just do unwrapDirectory(indexShard.store().directory()).clearStats()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I pushed dc218de
| try { | ||
| final SearchableSnapshotDirectory directory = unwrapDirectory(indexShard.store().directory()); | ||
| directory.clearStats(); | ||
| } catch (AlreadyClosedException ace) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I believe a common convention is to declare the exception name as ignore instead of adding an explicit comment.
For example, Intellij wouldn't highlight the exception as unused in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, see dc218de
| protected Collection<Class<? extends Plugin>> nodePlugins() { | ||
| final List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins()); | ||
| plugins.add(ReindexPlugin.class); | ||
| return plugins; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as for SearchableSnapshotsBlobStoreCacheIntegTests : Consider using Stream.concat(super.nodePlugins().stream(), Stream.of(ReindexPlugin.class)).collect(Collectors.toList()) here
| } | ||
| }); | ||
|
|
||
| final Set<String> remainingIndices = new HashSet<>(mountedIndices.keySet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider performing filtering with the Streams API
mountedIndices.keySet().stream()
.filter(Predicate.not(indicesToDelete::contains))
.collect(Collectors.toSet())There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No preference here for me but this streaming versions looks readable to me :) => e0343a5
| } | ||
|
|
||
| static QueryBuilder buildDeleteByQuery(int numberOfShards, String snapshotUuid, String indexUuid) { | ||
| final Set<String> paths = new HashSet<>(numberOfShards); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two comments:
HashSetaccepts the capacity, not the amount of expected elements in the constructor, so it would actually be re-sized in the runtime.- Consider using the Streams API here to avoid manually sizing the set:
IntStream.range(0, numberOfShards)
.mapToObj(shard -> String.join("/", snapshotUuid, indexUuid, String.valueOf(shard)))
.collect(Collectors.toSet())There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching this, looks like an ArrayList got changed into a HashSet 😬 I pushed d816419
|
Thanks @henningandersen and @arteam, I've updated the pull request according to your comments. Let me know if you have more of them, thanks! |
henningandersen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
arteam
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
|
Thanks both! |
elastic#77686) Today we use the system index .snapshot-blob-cache to store parts of blobs and to avoid to fetch them again from the snapshot repository when recovering a searchable snapshot shard. This index is never cleaned up though and because it's a system index users won't be able to clean up manually in the future. This commit adds a BlobStoreCacheMaintenanceService which detects the deletion of searchable snapshot indices and triggers the deletion of associated documents in .snapshot-blob-cache.
…b cache (#78263) * Add maintenance service to clean up unused docs in snapshot blob cache (#77686) Today we use the system index .snapshot-blob-cache to store parts of blobs and to avoid to fetch them again from the snapshot repository when recovering a searchable snapshot shard. This index is never cleaned up though and because it's a system index users won't be able to clean up manually in the future. This commit adds a BlobStoreCacheMaintenanceService which detects the deletion of searchable snapshot indices and triggers the deletion of associated documents in .snapshot-blob-cache. * fixes
#78438) In #77686 we added a service to clean up blob store cache docs after a searchable snapshot is no more used. We noticed some situations where some cache docs could still remain in the system index: when the system index is not available when the searchable snapshot index is deleted; when the system index is restored from a backup or when the searchable snapshot index was deleted on a version before #77686. This commit introduces a maintenance task that periodically scans and cleans up unused blob cache docs. This task is scheduled to run every hour on the data node that contain the blob store cache primary shard. The periodic task works by using a point in time context with search_after.
elastic#78438) In elastic#77686 we added a service to clean up blob store cache docs after a searchable snapshot is no more used. We noticed some situations where some cache docs could still remain in the system index: when the system index is not available when the searchable snapshot index is deleted; when the system index is restored from a backup or when the searchable snapshot index was deleted on a version before elastic#77686. This commit introduces a maintenance task that periodically scans and cleans up unused blob cache docs. This task is scheduled to run every hour on the data node that contain the blob store cache primary shard. The periodic task works by using a point in time context with search_after.
…he docs (#78610) * Add periodic maintenance task to clean up unused blob store cache docs (#78438) In #77686 we added a service to clean up blob store cache docs after a searchable snapshot is no more used. We noticed some situations where some cache docs could still remain in the system index: when the system index is not available when the searchable snapshot index is deleted; when the system index is restored from a backup or when the searchable snapshot index was deleted on a version before #77686. This commit introduces a maintenance task that periodically scans and cleans up unused blob cache docs. This task is scheduled to run every hour on the data node that contain the blob store cache primary shard. The periodic task works by using a point in time context with search_after. * fix
Today we use the system index
.snapshot-blob-cacheto store parts of blobs and to avoid to fetch them again from the snapshot repository when recovering a searchable snapshot shard. This index is never cleaned up though and because it's a system index users won't be able to clean up manually in the future.This pull request adds a
BlobStoreCacheMaintenanceServicewhich detects the deletion of searchable snapshot indices and triggers the deletion of associated documents in.snapshot-blob-cache.