Skip to content

Conversation

@martijnvg
Copy link
Member

@martijnvg martijnvg commented Mar 9, 2022

Currently, CompoundProcessor does not catch Exception and
if a processor throws an error and a method higher in the
call stack doesn't catch the exception then pipeline execution
stalls and bulk requests may not complete.

Usually these exceptions are caught by IngestService#executePipelines(...) method,
but when a processor executes async (for example: enrich processor) and the thread
that executes enrich is no longer the original write thread then there is no logic
that deals with failing pipeline execution and cleaning resources up. This then leads
to memory leaks.

Closes #84781

Note that this PR will be forward ported to 8.1 and master branches.

…ssor.

Currently, CompoundProcessor does not catch Exception and
if a processor throws an error and a method higher in the
call stack doesn't catch the exception then pipeline execution
stalls and bulk requests may not complete.

Usually these exceptions are caught by IngestService#executePipelines(...) method,
but when a processor executes async (for example: enrich processor) and the thread
that executes enrich is no longer the original write thread then there is no logic
that deals with failing pipeline execution and cleaning resources up. This then leads
to memory leaks.

Closes elastic#84781
@martijnvg martijnvg changed the title CompoundProcessor should also catch exceptions when executing a proce… CompoundProcessor should also catch exceptions when executing a processor Mar 9, 2022
…Processor.

With the change to CompoundProcessor thrown exceptions are caught and delegated to handler.
SimulateExecutionService in verbose mode ignores exceptions delegated to its handler, since
it assumes that processorResultList contains the result (successful or not successful) of every
processor in the pipeline.

In case TrackingResultProcessor for PipelineProcessor couldn't find the mentioned pipeline then
it just throws an error without updating the processorResultList. This commit addresses that.
@martijnvg martijnvg marked this pull request as ready for review March 15, 2022 16:33
@martijnvg martijnvg requested a review from jbaiera March 15, 2022 16:33
@martijnvg martijnvg added the :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP label Mar 15, 2022
@elasticmachine elasticmachine added the Team:Data Management Meta label for data/management team label Mar 15, 2022
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-data-management (Team:Data Management)

@martijnvg martijnvg added v8.2.0 v8.1.1 >bug and removed Team:Data Management Meta label for data/management team labels Mar 15, 2022
@elasticsearchmachine
Copy link
Collaborator

Hi @martijnvg, I've created a changelog YAML for you.

Copy link
Member

@jbaiera jbaiera left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM pending question about metrics accounting

try {
processor.execute(ingestDocument, (result, e) -> {
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
metric.postIngest(ingestTimeInNanos);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be calling this in case of failure as well?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

specifically metric.postIngest(ingestTimeInNanos);

\me shakes fist at IDEA review tools picking four lines for the comment

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I think metric.postIngest(ingestTimeInNanos); should be called as well when catch exception at line 151.

(in case an error comes in via this handler (meaning e is not null) then this is already invoked)

@martijnvg martijnvg merged commit eaeb20f into elastic:7.17 Mar 16, 2022
martijnvg added a commit to martijnvg/elasticsearch that referenced this pull request Mar 16, 2022
…ssor (elastic#84838)

Currently, CompoundProcessor does not catch Exception and
if a processor throws an error and a method higher in the
call stack doesn't catch the exception then pipeline execution
stalls and bulk requests may not complete.

Usually these exceptions are caught by IngestService#executePipelines(...) method,
but when a processor executes async (for example: enrich processor) and the thread
that executes enrich is no longer the original write thread then there is no logic
that deals with failing pipeline execution and cleaning resources up. This then leads
to memory leaks.

Closes elastic#84781

Also change how 'pipeline doesn't exist' error is thrown in TrackingResultProcessor.

With the change to CompoundProcessor thrown exceptions are caught and delegated to handler.
SimulateExecutionService in verbose mode ignores exceptions delegated to its handler, since
it assumes that processorResultList contains the result (successful or not successful) of every
processor in the pipeline.

In case TrackingResultProcessor for PipelineProcessor couldn't find the mentioned pipeline then
it just throws an error without updating the processorResultList. This commit addresses that.
martijnvg added a commit that referenced this pull request Mar 16, 2022
…ssor (#84838) (#85035)

Currently, CompoundProcessor does not catch Exception and
if a processor throws an error and a method higher in the
call stack doesn't catch the exception then pipeline execution
stalls and bulk requests may not complete.

Usually these exceptions are caught by IngestService#executePipelines(...) method,
but when a processor executes async (for example: enrich processor) and the thread
that executes enrich is no longer the original write thread then there is no logic
that deals with failing pipeline execution and cleaning resources up. This then leads
to memory leaks.

Closes #84781

Also change how 'pipeline doesn't exist' error is thrown in TrackingResultProcessor.

With the change to CompoundProcessor thrown exceptions are caught and delegated to handler.
SimulateExecutionService in verbose mode ignores exceptions delegated to its handler, since
it assumes that processorResultList contains the result (successful or not successful) of every
processor in the pipeline.

In case TrackingResultProcessor for PipelineProcessor couldn't find the mentioned pipeline then
it just throws an error without updating the processorResultList. This commit addresses that.
martijnvg added a commit that referenced this pull request Mar 16, 2022
Forward port #85000 (Revert enrich cache lookup optimisation ) and #84838 (CompoundProcessor should also catch exceptions when executing a processor) to 8.1 branch.

Revert enrich cache lookup optimisation (#85028)

Forwardporting #85000 to master branch.
This PR reverts the optimisation that was added via #77259.
This optimisation cleverly ensures no duplicate searches happen if multiple threads concurrently execute the same search. However there are issues with the implementation that cause issues like #84781. The optimisation make use of CompletableFuture and in this case we don't check whether the result has completed exceptionally. Which causes the callback not being invoked and this leads to bulk request not being completed and hanging around.

The ingest framework due to its asynchronous nature is already complex and adding CompletableFuture into the mix makes debugging these issues very time consuming. This is the main reason why we like to revert this commit.

* CompoundProcessor should also catch exceptions when executing a processor (#84838) (#85035)

Currently, CompoundProcessor does not catch Exception and
if a processor throws an error and a method higher in the
call stack doesn't catch the exception then pipeline execution
stalls and bulk requests may not complete.

Usually these exceptions are caught by IngestService#executePipelines(...) method,
but when a processor executes async (for example: enrich processor) and the thread
that executes enrich is no longer the original write thread then there is no logic
that deals with failing pipeline execution and cleaning resources up. This then leads
to memory leaks.

Closes #84781

Also change how 'pipeline doesn't exist' error is thrown in TrackingResultProcessor.

With the change to CompoundProcessor thrown exceptions are caught and delegated to handler.
SimulateExecutionService in verbose mode ignores exceptions delegated to its handler, since
it assumes that processorResultList contains the result (successful or not successful) of every
processor in the pipeline.

In case TrackingResultProcessor for PipelineProcessor couldn't find the mentioned pipeline then
it just throws an error without updating the processorResultList. This commit addresses that.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

>bug :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP v7.17.2 v8.1.1 v8.2.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants