Skip to content

Conversation

@martijnvg
Copy link
Member

Forward porting #84838 to master branch.

Currently, CompoundProcessor does not catch Exception and
if a processor throws an error and a method higher in the
call stack doesn't catch the exception then pipeline execution
stalls and bulk requests may not complete.

Usually these exceptions are caught by IngestService#executePipelines(...) method,
but when a processor executes async (for example: enrich processor) and the thread
that executes enrich is no longer the original write thread then there is no logic
that deals with failing pipeline execution and cleaning resources up. This then leads
to memory leaks.

Closes #84781

Also change how 'pipeline doesn't exist' error is thrown in TrackingResultProcessor.

With the change to CompoundProcessor thrown exceptions are caught and delegated to handler.
SimulateExecutionService in verbose mode ignores exceptions delegated to its handler, since
it assumes that processorResultList contains the result (successful or not successful) of every
processor in the pipeline.

In case TrackingResultProcessor for PipelineProcessor couldn't find the mentioned pipeline then
it just throws an error without updating the processorResultList. This commit addresses that.

…ssor (elastic#84838)

Currently, CompoundProcessor does not catch Exception and
if a processor throws an error and a method higher in the
call stack doesn't catch the exception then pipeline execution
stalls and bulk requests may not complete.

Usually these exceptions are caught by IngestService#executePipelines(...) method,
but when a processor executes async (for example: enrich processor) and the thread
that executes enrich is no longer the original write thread then there is no logic
that deals with failing pipeline execution and cleaning resources up. This then leads
to memory leaks.

Closes elastic#84781

Also change how 'pipeline doesn't exist' error is thrown in TrackingResultProcessor.

With the change to CompoundProcessor thrown exceptions are caught and delegated to handler.
SimulateExecutionService in verbose mode ignores exceptions delegated to its handler, since
it assumes that processorResultList contains the result (successful or not successful) of every
processor in the pipeline.

In case TrackingResultProcessor for PipelineProcessor couldn't find the mentioned pipeline then
it just throws an error without updating the processorResultList. This commit addresses that.
@martijnvg martijnvg added >non-issue :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP auto-backport Automatically create backport pull requests when merged v8.2.0 v8.1.2 labels Mar 16, 2022
@elasticmachine elasticmachine added the Team:Data Management Meta label for data/management team label Mar 16, 2022
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-data-management (Team:Data Management)

@martijnvg martijnvg added >bug :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP v8.2.0 and removed auto-backport Automatically create backport pull requests when merged v8.1.2 >non-issue :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP Team:Data Management Meta label for data/management team v8.2.0 labels Mar 16, 2022
@martijnvg martijnvg assigned martijnvg and unassigned martijnvg Mar 16, 2022
@martijnvg martijnvg added >non-issue and removed >bug labels Mar 16, 2022
@elasticmachine elasticmachine added the Team:Data Management Meta label for data/management team label Mar 16, 2022
@martijnvg martijnvg merged commit a0e5772 into elastic:master Mar 16, 2022
martijnvg added a commit that referenced this pull request Mar 16, 2022
Forward port #85000 (Revert enrich cache lookup optimisation ) and #84838 (CompoundProcessor should also catch exceptions when executing a processor) to 8.1 branch.

Revert enrich cache lookup optimisation (#85028)

Forwardporting #85000 to master branch.
This PR reverts the optimisation that was added via #77259.
This optimisation cleverly ensures no duplicate searches happen if multiple threads concurrently execute the same search. However there are issues with the implementation that cause issues like #84781. The optimisation make use of CompletableFuture and in this case we don't check whether the result has completed exceptionally. Which causes the callback not being invoked and this leads to bulk request not being completed and hanging around.

The ingest framework due to its asynchronous nature is already complex and adding CompletableFuture into the mix makes debugging these issues very time consuming. This is the main reason why we like to revert this commit.

* CompoundProcessor should also catch exceptions when executing a processor (#84838) (#85035)

Currently, CompoundProcessor does not catch Exception and
if a processor throws an error and a method higher in the
call stack doesn't catch the exception then pipeline execution
stalls and bulk requests may not complete.

Usually these exceptions are caught by IngestService#executePipelines(...) method,
but when a processor executes async (for example: enrich processor) and the thread
that executes enrich is no longer the original write thread then there is no logic
that deals with failing pipeline execution and cleaning resources up. This then leads
to memory leaks.

Closes #84781

Also change how 'pipeline doesn't exist' error is thrown in TrackingResultProcessor.

With the change to CompoundProcessor thrown exceptions are caught and delegated to handler.
SimulateExecutionService in verbose mode ignores exceptions delegated to its handler, since
it assumes that processorResultList contains the result (successful or not successful) of every
processor in the pipeline.

In case TrackingResultProcessor for PipelineProcessor couldn't find the mentioned pipeline then
it just throws an error without updating the processorResultList. This commit addresses that.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP >non-issue Team:Data Management Meta label for data/management team v8.2.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Ingest enrich processor then foreach processor == possible memory leak

2 participants