Skip to content

Commit 2bcdcb1

Browse files
committed
Introduce dedicated ingest processor exception (#48810)
Today we wrap exceptions that occur while executing an ingest processor in an ElasticsearchException. Today, in ExceptionsHelper#unwrapCause we only unwrap causes for exceptions that implement ElasticsearchWrapperException, which the top-level ElasticsearchException does not. Ultimately, this means that any exception that occurs during processor execution does not have its cause unwrapped, and so its status is blanket treated as a 500. This means that while executing a bulk request with an ingest pipeline, document-level failures that occur during a processor will cause the status for that document to be treated as 500. Since that does not give the client any indication that they made a mistake, it means some clients will enter infinite retries, thinking that there is some server-side problem that merely needs to clear. This commit addresses this by introducing a dedicated ingest processor exception, so that its causes can be unwrapped. While we could consider a broader change to unwrap causes for more than just ElasticsearchWrapperExceptions, that is a broad change with unclear implications. Since the problem of reporting 500s on client errors is a user-facing bug, we take the conservative approach for now, and we can revisit the unwrapping in a future change.
1 parent cac9fe4 commit 2bcdcb1

File tree

10 files changed

+70
-27
lines changed

10 files changed

+70
-27
lines changed

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,10 @@
1818
*/
1919
package org.elasticsearch.ingest.common;
2020

21-
import org.elasticsearch.ElasticsearchException;
2221
import org.elasticsearch.action.support.WriteRequest;
2322
import org.elasticsearch.common.bytes.BytesArray;
2423
import org.elasticsearch.common.bytes.BytesReference;
2524
import org.elasticsearch.common.settings.Settings;
26-
import org.elasticsearch.common.util.set.Sets;
2725
import org.elasticsearch.common.xcontent.XContentType;
2826
import org.elasticsearch.plugins.Plugin;
2927
import org.elasticsearch.script.MockScriptEngine;
@@ -99,7 +97,7 @@ public Settings onNodeStopped(String nodeName) {
9997
}
10098

10199
});
102-
100+
103101
checkPipelineExists.accept(pipelineIdWithoutScript);
104102
checkPipelineExists.accept(pipelineIdWithScript);
105103

@@ -109,15 +107,13 @@ public Settings onNodeStopped(String nodeName) {
109107
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
110108
.get();
111109

112-
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
110+
IllegalStateException exception = expectThrows(IllegalStateException.class,
113111
() -> client().prepareIndex("index", "doc", "2")
114112
.setSource("x", 0)
115113
.setPipeline(pipelineIdWithScript)
116114
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
117115
.get());
118-
assertThat(exception.getHeaderKeys(), equalTo(Sets.newHashSet("processor_type")));
119-
assertThat(exception.getHeader("processor_type"), equalTo(Arrays.asList("unknown")));
120-
assertThat(exception.getRootCause().getMessage(),
116+
assertThat(exception.getMessage(),
121117
equalTo("pipeline with id [" + pipelineIdWithScript + "] could not be loaded, caused by " +
122118
"[ElasticsearchParseException[Error updating pipeline with id [" + pipelineIdWithScript + "]]; " +
123119
"nested: ElasticsearchException[java.lang.IllegalArgumentException: cannot execute [inline] scripts]; " +

modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,5 +106,5 @@ teardown:
106106
id: 1
107107
pipeline: "outer"
108108
body: {}
109-
- match: { error.root_cause.0.type: "exception" }
109+
- match: { error.root_cause.0.type: "ingest_processor_exception" }
110110
- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: inner" }

modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ teardown:
348348
]
349349
}
350350
- length: { docs: 2 }
351-
- match: { docs.0.error.type: "exception" }
351+
- match: { docs.0.error.type: "illegal_argument_exception" }
352352
- match: { docs.1.doc._source.foo: "BAR" }
353353
- length: { docs.1.doc._ingest: 1 }
354354
- is_true: docs.1.doc._ingest.timestamp
@@ -668,8 +668,7 @@ teardown:
668668
}
669669
- length: { docs: 1 }
670670
- length: { docs.0.processor_results: 1 }
671-
- match: { docs.0.processor_results.0.error.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: outer" }
672-
- match: { docs.0.processor_results.0.error.caused_by.reason: "Cycle detected for pipeline: outer" }
671+
- match: { docs.0.processor_results.0.error.reason: "Cycle detected for pipeline: outer" }
673672

674673
---
675674
"Test verbose simulate with Pipeline Processor with Multiple Pipelines":

server/src/main/java/org/elasticsearch/ElasticsearchException.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1036,7 +1036,12 @@ private enum ElasticsearchExceptionHandle {
10361036
org.elasticsearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException.class,
10371037
org.elasticsearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException::new,
10381038
156,
1039-
Version.V_7_5_0);
1039+
Version.V_7_5_0),
1040+
INGEST_PROCESSOR_EXCEPTION(
1041+
org.elasticsearch.ingest.IngestProcessorException.class,
1042+
org.elasticsearch.ingest.IngestProcessorException::new,
1043+
157,
1044+
Version.V_7_6_0);
10401045

10411046
final Class<? extends ElasticsearchException> exceptionClass;
10421047
final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;

server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsume
143143
if (ignoreFailure) {
144144
innerExecute(currentProcessor + 1, ingestDocument, handler);
145145
} else {
146-
ElasticsearchException compoundProcessorException =
146+
IngestProcessorException compoundProcessorException =
147147
newCompoundProcessorException(e, processor.getType(), processor.getTag());
148148
if (onFailureProcessors.isEmpty()) {
149149
handler.accept(null, compoundProcessorException);
@@ -207,12 +207,12 @@ private void removeFailureMetadata(IngestDocument ingestDocument) {
207207
ingestMetadata.remove(ON_FAILURE_PROCESSOR_TAG_FIELD);
208208
}
209209

210-
private ElasticsearchException newCompoundProcessorException(Exception e, String processorType, String processorTag) {
211-
if (e instanceof ElasticsearchException && ((ElasticsearchException) e).getHeader("processor_type") != null) {
212-
return (ElasticsearchException) e;
210+
private IngestProcessorException newCompoundProcessorException(Exception e, String processorType, String processorTag) {
211+
if (e instanceof IngestProcessorException && ((IngestProcessorException) e).getHeader("processor_type") != null) {
212+
return (IngestProcessorException) e;
213213
}
214214

215-
ElasticsearchException exception = new ElasticsearchException(e);
215+
IngestProcessorException exception = new IngestProcessorException(e);
216216

217217
if (processorType != null) {
218218
exception.addHeader("processor_type", processorType);
@@ -223,4 +223,5 @@ private ElasticsearchException newCompoundProcessorException(Exception e, String
223223

224224
return exception;
225225
}
226+
226227
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.ingest;
21+
22+
import org.elasticsearch.ElasticsearchException;
23+
import org.elasticsearch.ElasticsearchWrapperException;
24+
import org.elasticsearch.common.io.stream.StreamInput;
25+
26+
import java.io.IOException;
27+
28+
/**
29+
* A dedicated wrapper for exceptions encountered executing an ingest processor. The wrapper is needed as we currently only unwrap causes
30+
* for instances of {@link ElasticsearchWrapperException}.
31+
*/
32+
public class IngestProcessorException extends ElasticsearchException implements ElasticsearchWrapperException {
33+
34+
IngestProcessorException(final Exception cause) {
35+
super(cause);
36+
}
37+
38+
public IngestProcessorException(final StreamInput in) throws IOException {
39+
super(in);
40+
}
41+
42+
}

server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import org.elasticsearch.indices.IndexTemplateMissingException;
7272
import org.elasticsearch.indices.InvalidIndexTemplateException;
7373
import org.elasticsearch.indices.recovery.RecoverFilesRecoveryException;
74+
import org.elasticsearch.ingest.IngestProcessorException;
7475
import org.elasticsearch.repositories.RepositoryException;
7576
import org.elasticsearch.rest.RestStatus;
7677
import org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException;
@@ -817,6 +818,7 @@ public void testIds() {
817818
ids.put(154, RetentionLeaseNotFoundException.class);
818819
ids.put(155, ShardNotInPrimaryModeException.class);
819820
ids.put(156, RetentionLeaseInvalidRetainingSeqNoException.class);
821+
ids.put(157, IngestProcessorException.class);
820822

821823
Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
822824
for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {

server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@
1919

2020
package org.elasticsearch.action.ingest;
2121

22-
import org.elasticsearch.ElasticsearchException;
22+
import org.elasticsearch.ingest.CompoundProcessor;
2323
import org.elasticsearch.ingest.DropProcessor;
24+
import org.elasticsearch.ingest.IngestDocument;
25+
import org.elasticsearch.ingest.IngestProcessorException;
26+
import org.elasticsearch.ingest.Pipeline;
2427
import org.elasticsearch.ingest.Processor;
2528
import org.elasticsearch.ingest.RandomDocumentPicks;
2629
import org.elasticsearch.ingest.TestProcessor;
27-
import org.elasticsearch.ingest.CompoundProcessor;
28-
import org.elasticsearch.ingest.IngestDocument;
29-
import org.elasticsearch.ingest.Pipeline;
3030
import org.elasticsearch.test.ESTestCase;
3131
import org.elasticsearch.threadpool.TestThreadPool;
3232
import org.elasticsearch.threadpool.TestThreadPool;
@@ -259,7 +259,7 @@ public void testExecuteItemWithFailure() throws Exception {
259259
assertThat(simulateDocumentBaseResult.getIngestDocument(), nullValue());
260260
assertThat(simulateDocumentBaseResult.getFailure(), instanceOf(RuntimeException.class));
261261
Exception exception = simulateDocumentBaseResult.getFailure();
262-
assertThat(exception, instanceOf(ElasticsearchException.class));
262+
assertThat(exception, instanceOf(IngestProcessorException.class));
263263
assertThat(exception.getMessage(), equalTo("java.lang.RuntimeException: processor failed"));
264264
}
265265

server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.logging.log4j.LogManager;
2424
import org.apache.logging.log4j.Logger;
2525
import org.apache.lucene.util.SetOnce;
26-
import org.elasticsearch.ElasticsearchException;
2726
import org.elasticsearch.ElasticsearchParseException;
2827
import org.elasticsearch.ResourceNotFoundException;
2928
import org.elasticsearch.Version;
@@ -843,7 +842,7 @@ public void testExecuteSuccessWithOnFailure() throws Exception {
843842
@SuppressWarnings("unchecked")
844843
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
845844
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
846-
verify(failureHandler, never()).accept(eq(0), any(ElasticsearchException.class));
845+
verify(failureHandler, never()).accept(eq(0), any(IngestProcessorException.class));
847846
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
848847
}
849848

server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.elasticsearch.ingest;
2121

22-
import org.elasticsearch.ElasticsearchException;
2322
import org.elasticsearch.action.ingest.SimulateProcessorResult;
2423
import org.elasticsearch.common.settings.Settings;
2524
import org.elasticsearch.script.MockScriptEngine;
@@ -86,7 +85,7 @@ public void testActualCompoundProcessorWithoutOnFailure() throws Exception {
8685

8786
Exception[] holder = new Exception[1];
8887
trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e);
89-
assertThat(((ElasticsearchException) holder[0]).getRootCause().getMessage(), equalTo(exception.getMessage()));
88+
assertThat(((IngestProcessorException) holder[0]).getRootCause().getMessage(), equalTo(exception.getMessage()));
9089

9190
SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument);
9291
assertThat(testProcessor.getInvokedCounter(), equalTo(1));
@@ -456,7 +455,7 @@ public void testActualPipelineProcessorWithCycle() throws Exception {
456455

457456
Exception[] holder = new Exception[1];
458457
trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e);
459-
ElasticsearchException exception = (ElasticsearchException) holder[0];
458+
IngestProcessorException exception = (IngestProcessorException) holder[0];
460459
assertThat(exception.getCause(), instanceOf(IllegalStateException.class));
461460
assertThat(exception.getMessage(), containsString("Cycle detected for pipeline: pipeline1"));
462461
}

0 commit comments

Comments
 (0)