Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/84838.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 84838
summary: '`CompoundProcessor` should also catch exceptions when executing a processor'
area: Ingest
type: bug
issues:
- 84781
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,6 @@ teardown:
---
"Test simulate with provided pipeline that does not exist":
- do:
catch: bad_request
ingest.simulate:
verbose: true
body: >
Expand All @@ -990,5 +989,6 @@ teardown:
}
]
}
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "Pipeline processor configured for non-existent pipeline [____pipeline_doesnot_exist___]" }
- match: { docs.0.processor_results.0.status: "error" }
- match: { docs.0.processor_results.0.error.root_cause.0.type: "illegal_argument_exception" }
- match: { docs.0.processor_results.0.error.root_cause.0.reason: "Pipeline processor configured for non-existent pipeline [____pipeline_doesnot_exist___]" }
Original file line number Diff line number Diff line change
Expand Up @@ -133,30 +133,47 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsume
final IngestMetric metric = processorWithMetric.v2();
final long startTimeInNanos = relativeTimeProvider.getAsLong();
metric.preIngest();
processor.execute(ingestDocument, (result, e) -> {
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
metric.postIngest(ingestTimeInNanos);
try {
processor.execute(ingestDocument, (result, e) -> {
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
metric.postIngest(ingestTimeInNanos);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be calling this in case of failure as well?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

specifically metric.postIngest(ingestTimeInNanos);

\me shakes fist at IDEA review tools picking four lines for the comment

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I think metric.postIngest(ingestTimeInNanos); should be called as well when catch exception at line 151.

(in case an error comes in via this handler (meaning e is not null) then this is already invoked)


if (e != null) {
metric.ingestFailed();
if (ignoreFailure) {
innerExecute(currentProcessor + 1, ingestDocument, handler);
if (e != null) {
executeOnFailure(currentProcessor, ingestDocument, handler, processor, metric, e);
} else {
IngestProcessorException compoundProcessorException = newCompoundProcessorException(e, processor, ingestDocument);
if (onFailureProcessors.isEmpty()) {
handler.accept(null, compoundProcessorException);
if (result != null) {
innerExecute(currentProcessor + 1, result, handler);
} else {
executeOnFailureAsync(0, ingestDocument, compoundProcessorException, handler);
handler.accept(null, null);
}
}
});
} catch (Exception e) {
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
metric.postIngest(ingestTimeInNanos);
executeOnFailure(currentProcessor, ingestDocument, handler, processor, metric, e);
}
}

private void executeOnFailure(
int currentProcessor,
IngestDocument ingestDocument,
BiConsumer<IngestDocument, Exception> handler,
Processor processor,
IngestMetric metric,
Exception e
) {
metric.ingestFailed();
if (ignoreFailure) {
innerExecute(currentProcessor + 1, ingestDocument, handler);
} else {
IngestProcessorException compoundProcessorException = newCompoundProcessorException(e, processor, ingestDocument);
if (onFailureProcessors.isEmpty()) {
handler.accept(null, compoundProcessorException);
} else {
if (result != null) {
innerExecute(currentProcessor + 1, result, handler);
} else {
handler.accept(null, null);
}
executeOnFailureAsync(0, ingestDocument, compoundProcessorException, handler);
}
});
}
}

void executeOnFailureAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,23 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
IngestDocument ingestDocumentCopy = new IngestDocument(ingestDocument);
Pipeline pipelineToCall = pipelineProcessor.getPipeline(ingestDocument);
if (pipelineToCall == null) {
throw new IllegalArgumentException(
IllegalArgumentException e = new IllegalArgumentException(
"Pipeline processor configured for non-existent pipeline ["
+ pipelineProcessor.getPipelineToCallName(ingestDocument)
+ ']'
);
// Add error as processor result, otherwise this gets lost in SimulateExecutionService#execute(...) and
// an empty response gets returned by the ingest simulate api.
processorResultList.add(
new SimulateProcessorResult(
pipelineProcessor.getType(),
pipelineProcessor.getTag(),
pipelineProcessor.getDescription(),
e,
conditionalWithResult
)
);
throw e;
}
ingestDocumentCopy.executePipeline(pipelineToCall, (result, e) -> {
// special handling for pipeline cycle errors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,39 @@ public void testTemplating() throws Exception {
assertThat(keys.contains(userEntry.get(MATCH_FIELD)), is(true));
}

public void testFailureAfterEnrich() throws Exception {
List<String> keys = createSourceMatchIndex(1, 1);
String policyName = "my-policy";
EnrichPolicy enrichPolicy = new EnrichPolicy(
EnrichPolicy.MATCH_TYPE,
null,
Collections.singletonList(SOURCE_INDEX_NAME),
MATCH_FIELD,
Arrays.asList(DECORATE_FIELDS)
);
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet();

// A pipeline with a foreach that uses a non existing field that is specified after enrich has run:
String pipelineName = "my-pipeline";
String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\""
+ policyName
+ "\", \"field\": \"email\", \"target_field\": \"users\"}},"
+ "{ \"foreach\": {\"field\":\"users\", \"processor\":{\"append\":{\"field\":\"matched2\",\"value\":\"{{_ingest._value}}\"}}}}"
+ "]}";
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).actionGet();

for (int i = 0; i < 5; i++) {
IndexRequest indexRequest = new IndexRequest("my-index").id("1")
.setPipeline(pipelineName)
.source(mapOf(MATCH_FIELD, "non_existing"));
Exception e = expectThrows(IllegalArgumentException.class, () -> client().index(indexRequest).actionGet());
assertThat(e.getMessage(), equalTo("field [users] not present as part of path [users]"));
}
}

private List<String> createSourceMatchIndex(int numKeys, int numDocsPerKey) {
Set<String> keys = new HashSet<>();
for (int id = 0; id < numKeys; id++) {
Expand Down