-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Operator/ingest #89735
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Operator/ingest #89735
Conversation
Move the logic out of TransportMasterNodeAction to support reserved state handlers beyond those that affect cluster state.
Extend reserved cluster state service to allow for handlers to perform async data load operations.
| } | ||
|
|
||
| process(namespace, stateChunk, errorListener); | ||
| // After we have parsed the json content, we give each handler an opportunity to augment the data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This refactoring was required in ReservedClusterStateService because the Ingest transform operations require NodeInfos, which is an async action. For this purpose, I introduced a preTransform step that runs with an async listener, allowing any handler to do additional fetching of data after we've parsed the JSON.
|
Pinging @elastic/es-core-infra (Team:Core/Infra) |
|
Hi @grcevski, I've created a changelog YAML for you. |
| try { | ||
| ingestService.validatePipelineRequest(pipeline, pipelines.nodesInfos); | ||
| } catch (Exception e) { | ||
| exceptions.add(e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we collect validation errors and collapse them into a single exception, but I notice in the SLM operator PR we are piecemeal in that the first exception halts execution. Should we add the collection of multiple failures to the other PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah, sorry, I forgot about that. It's better user experience to collect all errors. I'll fix that for SLM.
dakrone
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few comments, I'm unsure about the preTransform piece. I understand we need the ingest infos, but I wonder if there is a better way to track it somehow so that validation could not require it. What do you think?
| validatePipeline(ingestInfos, request.getId(), config); | ||
| } | ||
|
|
||
| public boolean isNoOpPipelineUpdate(ClusterState state, PutPipelineRequest request, ActionListener<AcknowledgedResponse> listener) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the listener is actually used in this method any more? It's very awkward since this takes a listener and also returns a boolean. I'd prefer to remove the listener and also to make it static if possible.
| if (stateChunk.state().isEmpty()) { | ||
| errorListener.accept(null); | ||
| return; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(excuse my ignorance of this code)
How does the stateChunk.state() being empty imply that there was failure and the state couldn't be applied because of an incompatible version? Is there no situation where the state could be empty otherwise?
| for (var handlerEntry : stateChunk.state().entrySet()) { | ||
| handlers.get(handlerEntry.getKey()).preTransform(handlerEntry.getValue(), preTransformListener); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you update the javadoc for the parent method to indicate that it runs the pretransforms as well?
| NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); | ||
| nodesInfoRequest.clear(); | ||
| nodesInfoRequest.addMetric(NodesInfoRequest.Metric.INGEST.metricName()); | ||
| nodeClient.admin().cluster().nodesInfo(nodesInfoRequest, new ActionListener<>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes me uneasy, what happens if the cluster is overloaded and this request times out? I think then we cancel all file-based changes right? (because the error will be thrown from the pre-processing if I understand correctly?)
Should this timeout be changed or configurable?
Yeah good point, let me go back and look at ways we can do without it. It complicates things quite a bit, for such a small thing. I'll address the rest of the feedback based on what I manage to do here. |
|
I pushed an update to see if all tests pass, this seems like a simpler change and more efficient, since we don't expect node configuration to change that much along with file based settings. We keep the node infos cached in the file settings service, and we only refresh them when some nodes have changed. I need to write extra tests for that logic, make sure we set the flag correctly when nodes are added and removed. I initially thought I only need to track the adds, but removes are needed in case the last ingest capable node is removed. |
|
I reworked how the node infos were being called, it's done on demand and only if nodes have joined/left the cluster. I also just pushed an update with new tests that ensure the nodeInfos are fetched correctly and at the right time. |
dakrone
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I left one question, but as long as we have some sort of protection I don't think it's a blocker or anything.
| if (nodeInfosRefreshRequired || nodesInfoResponse == null) { | ||
| var nodesInfoRequest = NodesInfoRequest.requestWithMetrics(NodesInfoRequest.Metric.INGEST); | ||
|
|
||
| clusterAdminClient().nodesInfo(nodesInfoRequest, new ActionListener<>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is processFileSettings protected by a lock or AtomicBoolean or anything like that? What happens if the file on disk is replaced hundreds of times every second, will we end up issuing many nodes info requests since we don't have any lock to ensure we only retrieve it once?
(I'm not super close to the internals of the file settings stuff, so it may be that this isn't an issue because we only invoke it once every X seconds, let me know if that's the case)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a really good question, so we have only one thread that processes the file change events and the return of this call (processFileSettings) is a latch, the CountDownLatch waitForCompletion. The caller of process file settings waits on this latch, for all async actions to complete, before it decides to pick up another event and to check if the file changed since last time. This effectively makes the processing one at a time, until we have successfully written file state or hit an error we don't try to process again.
The relevant caller code is here: https://github.com/elastic/elasticsearch/pull/89735/files#diff-5a2c0417a5b4bdbcc6873fc853db7a8c531b47536bd41b84e20bb6c7b21ad6d9R334
We also don't trust the events we get from the JDK watch service that the file has changed, we only use them as an indicator that we should check if the file is different. If the file hasn't changed the OS file key or the modified timestamp we skip the event. The reason we can't rely on the JDK watcher is because the series of events that get sent are vastly different between OSs.
|
@elasticsearchmachine run elasticsearch-ci/part-2 |
|
Thanks Lee! |
This PR adds support for /_ingest/pipeline for file based settings.
Relates to #89183