From f8eda4a11477dfec38e78465f8610a31f0eb21d4 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 9 Mar 2022 20:43:13 +0100 Subject: [PATCH 1/5] CompoundProcessor should also catch exceptions when executing a processor. Currently, CompoundProcessor does not catch Exception and if a processor throws an error and a method higher in the call stack doesn't catch the exception then pipeline execution stalls and bulk requests may not complete. Usually these exceptions are caught by IngestService#executePipelines(...) method, but when a processor executes async (for example: enrich processor) and the thread that executes enrich is no longer the original write thread then there is no logic that deals with failing pipeline execution and cleaning resources up. This then leads to memory leaks. Closes #84781 --- .../ingest/CompoundProcessor.java | 49 ++++++++++++------- .../xpack/enrich/BasicEnrichTests.java | 33 +++++++++++++ 2 files changed, 65 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index 5e3410043ce03..b3024f4b1f4dc 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -133,30 +133,45 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsume final IngestMetric metric = processorWithMetric.v2(); final long startTimeInNanos = relativeTimeProvider.getAsLong(); metric.preIngest(); - processor.execute(ingestDocument, (result, e) -> { - long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos; - metric.postIngest(ingestTimeInNanos); + try { + processor.execute(ingestDocument, (result, e) -> { + long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos; + metric.postIngest(ingestTimeInNanos); - if (e != null) { - metric.ingestFailed(); - if (ignoreFailure) { - innerExecute(currentProcessor + 1, ingestDocument, handler); + if (e != null) { + executeOnFailure(currentProcessor, ingestDocument, handler, processor, metric, e); } else { - IngestProcessorException compoundProcessorException = newCompoundProcessorException(e, processor, ingestDocument); - if (onFailureProcessors.isEmpty()) { - handler.accept(null, compoundProcessorException); + if (result != null) { + innerExecute(currentProcessor + 1, result, handler); } else { - executeOnFailureAsync(0, ingestDocument, compoundProcessorException, handler); + handler.accept(null, null); } } + }); + } catch (Exception e) { + executeOnFailure(currentProcessor, ingestDocument, handler, processor, metric, e); + } + } + + private void executeOnFailure( + int currentProcessor, + IngestDocument ingestDocument, + BiConsumer handler, + Processor processor, + IngestMetric metric, + Exception e + ) { + metric.ingestFailed(); + if (ignoreFailure) { + innerExecute(currentProcessor + 1, ingestDocument, handler); + } else { + IngestProcessorException compoundProcessorException = newCompoundProcessorException(e, processor, ingestDocument); + if (onFailureProcessors.isEmpty()) { + handler.accept(null, compoundProcessorException); } else { - if (result != null) { - innerExecute(currentProcessor + 1, result, handler); - } else { - handler.accept(null, null); - } + executeOnFailureAsync(0, ingestDocument, compoundProcessorException, handler); } - }); + } } void executeOnFailureAsync( diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java index db806a3044ad0..6ce4624995c37 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java @@ -360,6 +360,39 @@ public void testTemplating() throws Exception { assertThat(keys.contains(userEntry.get(MATCH_FIELD)), is(true)); } + public void testFailureAfterEnrich() throws Exception { + List keys = createSourceMatchIndex(1, 1); + String policyName = "my-policy"; + EnrichPolicy enrichPolicy = new EnrichPolicy( + EnrichPolicy.MATCH_TYPE, + null, + Collections.singletonList(SOURCE_INDEX_NAME), + MATCH_FIELD, + Arrays.asList(DECORATE_FIELDS) + ); + PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy); + client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet(); + client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet(); + + // A pipeline with a foreach that uses a non existing field that is specified after enrich has run: + String pipelineName = "my-pipeline"; + String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + + policyName + + "\", \"field\": \"email\", \"target_field\": \"users\"}}," + + "{ \"foreach\": {\"field\":\"users\", \"processor\":{\"append\":{\"field\":\"matched2\",\"value\":\"{{_ingest._value}}\"}}}}" + + "]}"; + PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).actionGet(); + + for (int i = 0; i < 5; i++) { + IndexRequest indexRequest = new IndexRequest("my-index").id("1") + .setPipeline(pipelineName) + .source(mapOf(MATCH_FIELD, "non_existing")); + Exception e = expectThrows(IllegalArgumentException.class, () -> client().index(indexRequest).actionGet()); + assertThat(e.getMessage(), equalTo("field [users] not present as part of path [users]")); + } + } + private List createSourceMatchIndex(int numKeys, int numDocsPerKey) { Set keys = new HashSet<>(); for (int id = 0; id < numKeys; id++) { From 5190375160333d21eb09485c795dcb637200e2a6 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 14 Mar 2022 12:14:11 +0100 Subject: [PATCH 2/5] Change how 'pipeline doesn't exist' error is thrown in TrackingResultProcessor. With the change to CompoundProcessor thrown exceptions are caught and delegated to handler. SimulateExecutionService in verbose mode ignores exceptions delegated to its handler, since it assumes that processorResultList contains the result (successful or not successful) of every processor in the pipeline. In case TrackingResultProcessor for PipelineProcessor couldn't find the mentioned pipeline then it just throws an error without updating the processorResultList. This commit addresses that. --- .../rest-api-spec/test/ingest/90_simulate.yml | 6 +++--- .../ingest/TrackingResultProcessor.java | 12 +++++++++++- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/90_simulate.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/90_simulate.yml index 2224d56165fd3..5a0cf79688039 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/90_simulate.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/90_simulate.yml @@ -969,7 +969,6 @@ teardown: --- "Test simulate with provided pipeline that does not exist": - do: - catch: bad_request ingest.simulate: verbose: true body: > @@ -990,5 +989,6 @@ teardown: } ] } - - match: { error.root_cause.0.type: "illegal_argument_exception" } - - match: { error.root_cause.0.reason: "Pipeline processor configured for non-existent pipeline [____pipeline_doesnot_exist___]" } + - match: { docs.0.processor_results.0.status: "error" } + - match: { docs.0.processor_results.0.error.root_cause.0.type: "illegal_argument_exception" } + - match: { docs.0.processor_results.0.error.root_cause.0.reason: "Pipeline processor configured for non-existent pipeline [____pipeline_doesnot_exist___]" } diff --git a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java index b2e3f01d0fc74..4da50209b3c00 100644 --- a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java @@ -70,11 +70,21 @@ public void execute(IngestDocument ingestDocument, BiConsumer { // special handling for pipeline cycle errors From 8ca62b836cbc0603379296d5d70902a4114ecbdb Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 14 Mar 2022 13:08:29 +0100 Subject: [PATCH 3/5] added comment --- .../java/org/elasticsearch/ingest/TrackingResultProcessor.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java index 4da50209b3c00..54c8decc3da44 100644 --- a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java @@ -75,6 +75,8 @@ public void execute(IngestDocument ingestDocument, BiConsumer Date: Tue, 15 Mar 2022 17:34:54 +0100 Subject: [PATCH 4/5] Update docs/changelog/84838.yaml --- docs/changelog/84838.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 docs/changelog/84838.yaml diff --git a/docs/changelog/84838.yaml b/docs/changelog/84838.yaml new file mode 100644 index 0000000000000..98a6a5ec22345 --- /dev/null +++ b/docs/changelog/84838.yaml @@ -0,0 +1,6 @@ +pr: 84838 +summary: '`CompoundProcessor` should also catch exceptions when executing a processor' +area: Ingest +type: bug +issues: + - 84781 From e32248853d808b95b9fca486cd5bfbc6df0f7716 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 16 Mar 2022 13:24:21 +0100 Subject: [PATCH 5/5] Also invoke postIngest(...) method when catching an exception --- .../main/java/org/elasticsearch/ingest/CompoundProcessor.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index b3024f4b1f4dc..c3aa709e2b36a 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -149,6 +149,8 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsume } }); } catch (Exception e) { + long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos; + metric.postIngest(ingestTimeInNanos); executeOnFailure(currentProcessor, ingestDocument, handler, processor, metric, e); } }