Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
*/
package org.elasticsearch.ingest.common;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.MockScriptEngine;
Expand Down Expand Up @@ -109,15 +107,13 @@ public Settings onNodeStopped(String nodeName) {
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();

ElasticsearchException exception = expectThrows(ElasticsearchException.class,
IllegalStateException exception = expectThrows(IllegalStateException.class,
() -> client().prepareIndex("index").setId("2")
.setSource("x", 0)
.setPipeline(pipelineIdWithScript)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get());
assertThat(exception.getHeaderKeys(), equalTo(Sets.newHashSet("processor_type")));
assertThat(exception.getHeader("processor_type"), equalTo(Arrays.asList("unknown")));
assertThat(exception.getRootCause().getMessage(),
assertThat(exception.getMessage(),
equalTo("pipeline with id [" + pipelineIdWithScript + "] could not be loaded, caused by " +
"[org.elasticsearch.ElasticsearchParseException: Error updating pipeline with id [" + pipelineIdWithScript + "]; " +
"org.elasticsearch.ElasticsearchException: java.lang.IllegalArgumentException: cannot execute [inline] scripts; " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,5 +106,5 @@ teardown:
id: 1
pipeline: "outer"
body: {}
- match: { error.root_cause.0.type: "exception" }
- match: { error.root_cause.0.type: "ingest_processor_exception" }
- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: inner" }
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ teardown:
]
}
- length: { docs: 2 }
- match: { docs.0.error.type: "exception" }
- match: { docs.0.error.type: "illegal_argument_exception" }
- match: { docs.1.doc._source.foo: "BAR" }
- length: { docs.1.doc._ingest: 1 }
- is_true: docs.1.doc._ingest.timestamp
Expand Down Expand Up @@ -653,8 +653,7 @@ teardown:
}
- length: { docs: 1 }
- length: { docs.0.processor_results: 1 }
- match: { docs.0.processor_results.0.error.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: outer" }
- match: { docs.0.processor_results.0.error.caused_by.reason: "Cycle detected for pipeline: outer" }
- match: { docs.0.processor_results.0.error.reason: "Cycle detected for pipeline: outer" }

---
"Test verbose simulate with Pipeline Processor with Multiple Pipelines":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1036,7 +1036,12 @@ private enum ElasticsearchExceptionHandle {
org.elasticsearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException.class,
org.elasticsearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException::new,
156,
Version.V_7_5_0);
Version.V_7_5_0),
INGEST_PROCESSOR_EXCEPTION(
org.elasticsearch.ingest.IngestProcessorException.class,
org.elasticsearch.ingest.IngestProcessorException::new,
157,
Version.V_7_6_0);

final Class<? extends ElasticsearchException> exceptionClass;
final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsume
if (ignoreFailure) {
innerExecute(currentProcessor + 1, ingestDocument, handler);
} else {
ElasticsearchException compoundProcessorException =
IngestProcessorException compoundProcessorException =
newCompoundProcessorException(e, processor.getType(), processor.getTag());
if (onFailureProcessors.isEmpty()) {
handler.accept(null, compoundProcessorException);
Expand Down Expand Up @@ -207,12 +207,12 @@ private void removeFailureMetadata(IngestDocument ingestDocument) {
ingestMetadata.remove(ON_FAILURE_PROCESSOR_TAG_FIELD);
}

private ElasticsearchException newCompoundProcessorException(Exception e, String processorType, String processorTag) {
if (e instanceof ElasticsearchException && ((ElasticsearchException) e).getHeader("processor_type") != null) {
return (ElasticsearchException) e;
private IngestProcessorException newCompoundProcessorException(Exception e, String processorType, String processorTag) {
if (e instanceof IngestProcessorException && ((IngestProcessorException) e).getHeader("processor_type") != null) {
return (IngestProcessorException) e;
}

ElasticsearchException exception = new ElasticsearchException(e);
IngestProcessorException exception = new IngestProcessorException(e);

if (processorType != null) {
exception.addHeader("processor_type", processorType);
Expand All @@ -223,4 +223,5 @@ private ElasticsearchException newCompoundProcessorException(Exception e, String

return exception;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.ingest;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchWrapperException;
import org.elasticsearch.common.io.stream.StreamInput;

import java.io.IOException;

/**
* A dedicated wrapper for exceptions encountered executing an ingest processor. The wrapper is needed as we currently only unwrap causes
* for instances of {@link ElasticsearchWrapperException}.
*/
public class IngestProcessorException extends ElasticsearchException implements ElasticsearchWrapperException {

IngestProcessorException(final Exception cause) {
super(cause);
}

public IngestProcessorException(final StreamInput in) throws IOException {
super(in);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.elasticsearch.indices.IndexTemplateMissingException;
import org.elasticsearch.indices.InvalidIndexTemplateException;
import org.elasticsearch.indices.recovery.RecoverFilesRecoveryException;
import org.elasticsearch.ingest.IngestProcessorException;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException;
Expand Down Expand Up @@ -816,6 +817,7 @@ public void testIds() {
ids.put(154, RetentionLeaseNotFoundException.class);
ids.put(155, ShardNotInPrimaryModeException.class);
ids.put(156, RetentionLeaseInvalidRetainingSeqNoException.class);
ids.put(157, IngestProcessorException.class);

Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@

package org.elasticsearch.action.ingest;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.DropProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.IngestProcessorException;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.junit.After;
Expand Down Expand Up @@ -258,7 +258,7 @@ public void testExecuteItemWithFailure() throws Exception {
assertThat(simulateDocumentBaseResult.getIngestDocument(), nullValue());
assertThat(simulateDocumentBaseResult.getFailure(), instanceOf(RuntimeException.class));
Exception exception = simulateDocumentBaseResult.getFailure();
assertThat(exception, instanceOf(ElasticsearchException.class));
assertThat(exception, instanceOf(IngestProcessorException.class));
assertThat(exception.getMessage(), equalTo("java.lang.RuntimeException: processor failed"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
Expand Down Expand Up @@ -823,7 +822,7 @@ public void testExecuteSuccessWithOnFailure() throws Exception {
@SuppressWarnings("unchecked")
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
verify(failureHandler, never()).accept(eq(0), any(ElasticsearchException.class));
verify(failureHandler, never()).accept(eq(0), any(IngestProcessorException.class));
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.ingest;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ingest.SimulateProcessorResult;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.script.MockScriptEngine;
Expand Down Expand Up @@ -86,7 +85,7 @@ public void testActualCompoundProcessorWithoutOnFailure() throws Exception {

Exception[] holder = new Exception[1];
trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e);
assertThat(((ElasticsearchException) holder[0]).getRootCause().getMessage(), equalTo(exception.getMessage()));
assertThat(((IngestProcessorException) holder[0]).getRootCause().getMessage(), equalTo(exception.getMessage()));

SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument);
assertThat(testProcessor.getInvokedCounter(), equalTo(1));
Expand Down Expand Up @@ -456,7 +455,7 @@ public void testActualPipelineProcessorWithCycle() throws Exception {

Exception[] holder = new Exception[1];
trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e);
ElasticsearchException exception = (ElasticsearchException) holder[0];
IngestProcessorException exception = (IngestProcessorException) holder[0];
assertThat(exception.getCause(), instanceOf(IllegalStateException.class));
assertThat(exception.getMessage(), containsString("Cycle detected for pipeline: pipeline1"));
}
Expand Down