diff --git a/docs/changelog/84838.yaml b/docs/changelog/84838.yaml new file mode 100644 index 0000000000000..98a6a5ec22345 --- /dev/null +++ b/docs/changelog/84838.yaml @@ -0,0 +1,6 @@ +pr: 84838 +summary: '`CompoundProcessor` should also catch exceptions when executing a processor' +area: Ingest +type: bug +issues: + - 84781 diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/90_simulate.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/90_simulate.yml index 2224d56165fd3..5a0cf79688039 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/90_simulate.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/90_simulate.yml @@ -969,7 +969,6 @@ teardown: --- "Test simulate with provided pipeline that does not exist": - do: - catch: bad_request ingest.simulate: verbose: true body: > @@ -990,5 +989,6 @@ teardown: } ] } - - match: { error.root_cause.0.type: "illegal_argument_exception" } - - match: { error.root_cause.0.reason: "Pipeline processor configured for non-existent pipeline [____pipeline_doesnot_exist___]" } + - match: { docs.0.processor_results.0.status: "error" } + - match: { docs.0.processor_results.0.error.root_cause.0.type: "illegal_argument_exception" } + - match: { docs.0.processor_results.0.error.root_cause.0.reason: "Pipeline processor configured for non-existent pipeline [____pipeline_doesnot_exist___]" } diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index 5e3410043ce03..c3aa709e2b36a 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -133,30 +133,47 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsume final IngestMetric metric = processorWithMetric.v2(); final long startTimeInNanos = relativeTimeProvider.getAsLong(); metric.preIngest(); - processor.execute(ingestDocument, (result, e) -> { - long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos; - metric.postIngest(ingestTimeInNanos); + try { + processor.execute(ingestDocument, (result, e) -> { + long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos; + metric.postIngest(ingestTimeInNanos); - if (e != null) { - metric.ingestFailed(); - if (ignoreFailure) { - innerExecute(currentProcessor + 1, ingestDocument, handler); + if (e != null) { + executeOnFailure(currentProcessor, ingestDocument, handler, processor, metric, e); } else { - IngestProcessorException compoundProcessorException = newCompoundProcessorException(e, processor, ingestDocument); - if (onFailureProcessors.isEmpty()) { - handler.accept(null, compoundProcessorException); + if (result != null) { + innerExecute(currentProcessor + 1, result, handler); } else { - executeOnFailureAsync(0, ingestDocument, compoundProcessorException, handler); + handler.accept(null, null); } } + }); + } catch (Exception e) { + long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos; + metric.postIngest(ingestTimeInNanos); + executeOnFailure(currentProcessor, ingestDocument, handler, processor, metric, e); + } + } + + private void executeOnFailure( + int currentProcessor, + IngestDocument ingestDocument, + BiConsumer handler, + Processor processor, + IngestMetric metric, + Exception e + ) { + metric.ingestFailed(); + if (ignoreFailure) { + innerExecute(currentProcessor + 1, ingestDocument, handler); + } else { + IngestProcessorException compoundProcessorException = newCompoundProcessorException(e, processor, ingestDocument); + if (onFailureProcessors.isEmpty()) { + handler.accept(null, compoundProcessorException); } else { - if (result != null) { - innerExecute(currentProcessor + 1, result, handler); - } else { - handler.accept(null, null); - } + executeOnFailureAsync(0, ingestDocument, compoundProcessorException, handler); } - }); + } } void executeOnFailureAsync( diff --git a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java index b2e3f01d0fc74..54c8decc3da44 100644 --- a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java @@ -70,11 +70,23 @@ public void execute(IngestDocument ingestDocument, BiConsumer { // special handling for pipeline cycle errors diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java index db806a3044ad0..6ce4624995c37 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java @@ -360,6 +360,39 @@ public void testTemplating() throws Exception { assertThat(keys.contains(userEntry.get(MATCH_FIELD)), is(true)); } + public void testFailureAfterEnrich() throws Exception { + List keys = createSourceMatchIndex(1, 1); + String policyName = "my-policy"; + EnrichPolicy enrichPolicy = new EnrichPolicy( + EnrichPolicy.MATCH_TYPE, + null, + Collections.singletonList(SOURCE_INDEX_NAME), + MATCH_FIELD, + Arrays.asList(DECORATE_FIELDS) + ); + PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy); + client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet(); + client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet(); + + // A pipeline with a foreach that uses a non existing field that is specified after enrich has run: + String pipelineName = "my-pipeline"; + String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + + policyName + + "\", \"field\": \"email\", \"target_field\": \"users\"}}," + + "{ \"foreach\": {\"field\":\"users\", \"processor\":{\"append\":{\"field\":\"matched2\",\"value\":\"{{_ingest._value}}\"}}}}" + + "]}"; + PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).actionGet(); + + for (int i = 0; i < 5; i++) { + IndexRequest indexRequest = new IndexRequest("my-index").id("1") + .setPipeline(pipelineName) + .source(mapOf(MATCH_FIELD, "non_existing")); + Exception e = expectThrows(IllegalArgumentException.class, () -> client().index(indexRequest).actionGet()); + assertThat(e.getMessage(), equalTo("field [users] not present as part of path [users]")); + } + } + private List createSourceMatchIndex(int numKeys, int numDocsPerKey) { Set keys = new HashSet<>(); for (int id = 0; id < numKeys; id++) {