From 018625bc88511de62b93a6f0ccee9ecdd8c780e1 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 1 Nov 2019 10:33:55 -0400 Subject: [PATCH 01/10] Introduce dedicated ingest processor exception Today we wrap exceptions that occur while executing an ingest processor in an ElasticsearchException. Today, in ExceptionsHelper#unwrapCause we only unwrap causes for exceptions that implement ElasticsearchWrapperException, which the top-level ElasticsearchException does not. Ultimately, this means that any exception that occurs during processor execution does not have its cause unwrapped, and so its status is blanket treated as a 500. This means that while executing a bulk request with an ingest pipeline, document-level failures that occur during a processor will cause the status for that document to be treated as 500. Since that does not give the client any indication that they made a mistake, it means some clients will enter infinite retries, thinking that there is some server-side problem that merely needs to clear. This commit addresses this by introducing a dedicated ingest processor exception, so that its causes can be unwrapped. While we could consider a broader change to unwrap causes for more than just ElasticsearchWrapperExceptions, that is a broad change with unclear implications. Since the problem of reporting 500s on client errors is a user-facing bug, we take the conservative approach for now, and we can revisit the unwrapping in a future change. --- .../elasticsearch/ingest/CompoundProcessor.java | 11 ++++++----- .../ingest/IngestProcessorException.java | 16 ++++++++++++++++ 2 files changed, 22 insertions(+), 5 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/ingest/IngestProcessorException.java diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index cf75ead37354d..504795d8d39a3 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -143,7 +143,7 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsume if (ignoreFailure) { innerExecute(currentProcessor + 1, ingestDocument, handler); } else { - ElasticsearchException compoundProcessorException = + IngestProcessorException compoundProcessorException = newCompoundProcessorException(e, processor.getType(), processor.getTag()); if (onFailureProcessors.isEmpty()) { handler.accept(null, compoundProcessorException); @@ -207,12 +207,12 @@ private void removeFailureMetadata(IngestDocument ingestDocument) { ingestMetadata.remove(ON_FAILURE_PROCESSOR_TAG_FIELD); } - private ElasticsearchException newCompoundProcessorException(Exception e, String processorType, String processorTag) { - if (e instanceof ElasticsearchException && ((ElasticsearchException) e).getHeader("processor_type") != null) { - return (ElasticsearchException) e; + private IngestProcessorException newCompoundProcessorException(Exception e, String processorType, String processorTag) { + if (e instanceof IngestProcessorException && ((IngestProcessorException) e).getHeader("processor_type") != null) { + return (IngestProcessorException) e; } - ElasticsearchException exception = new ElasticsearchException(new IllegalArgumentException(e)); + IngestProcessorException exception = new IngestProcessorException(e); if (processorType != null) { exception.addHeader("processor_type", processorType); @@ -223,4 +223,5 @@ private ElasticsearchException newCompoundProcessorException(Exception e, String return exception; } + } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestProcessorException.java b/server/src/main/java/org/elasticsearch/ingest/IngestProcessorException.java new file mode 100644 index 0000000000000..d738d0f83838e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/ingest/IngestProcessorException.java @@ -0,0 +1,16 @@ +package org.elasticsearch.ingest; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchWrapperException; + +/** + * A dedicated wrapper for exceptions encountered executing an ingest processor. The wrapper is needed as we currently only unwrap causes + * for instances of {@link ElasticsearchWrapperException}. + */ +class IngestProcessorException extends ElasticsearchException implements ElasticsearchWrapperException { + + IngestProcessorException(final Exception cause) { + super(cause); + } + +} From 8303300df375241a2f5a00f10d375c72d1ac38fa Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 1 Nov 2019 12:36:15 -0400 Subject: [PATCH 02/10] Add missing license header --- .../ingest/IngestProcessorException.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestProcessorException.java b/server/src/main/java/org/elasticsearch/ingest/IngestProcessorException.java index d738d0f83838e..314c5b08d67ed 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestProcessorException.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestProcessorException.java @@ -1,3 +1,22 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.elasticsearch.ingest; import org.elasticsearch.ElasticsearchException; From 6dffebfecdfa816b30f40bc17fe97924746ab59e Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 1 Nov 2019 12:53:59 -0400 Subject: [PATCH 03/10] Add serialization --- .../java/org/elasticsearch/ElasticsearchException.java | 5 +++++ .../elasticsearch/ingest/IngestProcessorException.java | 9 ++++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/ElasticsearchException.java b/server/src/main/java/org/elasticsearch/ElasticsearchException.java index 821686e8894fc..d56978f7000d6 100644 --- a/server/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/server/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -1036,6 +1036,11 @@ private enum ElasticsearchExceptionHandle { org.elasticsearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException.class, org.elasticsearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException::new, 156, + Version.V_7_5_0), + INGEST_PROCESSOR_EXCEPTION( + org.elasticsearch.ingest.IngestProcessorException.class, + org.elasticsearch.ingest.IngestProcessorException::new, + 157, Version.V_7_5_0); final Class exceptionClass; diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestProcessorException.java b/server/src/main/java/org/elasticsearch/ingest/IngestProcessorException.java index 314c5b08d67ed..cd9e3625c163e 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestProcessorException.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestProcessorException.java @@ -21,15 +21,22 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchWrapperException; +import org.elasticsearch.common.io.stream.StreamInput; + +import java.io.IOException; /** * A dedicated wrapper for exceptions encountered executing an ingest processor. The wrapper is needed as we currently only unwrap causes * for instances of {@link ElasticsearchWrapperException}. */ -class IngestProcessorException extends ElasticsearchException implements ElasticsearchWrapperException { +public class IngestProcessorException extends ElasticsearchException implements ElasticsearchWrapperException { IngestProcessorException(final Exception cause) { super(cause); } + public IngestProcessorException(final StreamInput in) throws IOException { + super(in); + } + } From f3388ede2c43cae452006e4f5b61efc10f13d465 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 1 Nov 2019 15:26:35 -0400 Subject: [PATCH 04/10] Adjust exceptions --- .../action/ingest/SimulateExecutionServiceTests.java | 10 +++++----- .../org/elasticsearch/ingest/IngestServiceTests.java | 3 +-- .../ingest/TrackingResultProcessorTests.java | 5 ++--- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java b/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java index d78d259db9701..2ced9d1e23dd2 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java @@ -19,14 +19,14 @@ package org.elasticsearch.action.ingest; -import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ingest.CompoundProcessor; import org.elasticsearch.ingest.DropProcessor; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.IngestProcessorException; +import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.RandomDocumentPicks; import org.elasticsearch.ingest.TestProcessor; -import org.elasticsearch.ingest.CompoundProcessor; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.junit.After; @@ -258,7 +258,7 @@ public void testExecuteItemWithFailure() throws Exception { assertThat(simulateDocumentBaseResult.getIngestDocument(), nullValue()); assertThat(simulateDocumentBaseResult.getFailure(), instanceOf(RuntimeException.class)); Exception exception = simulateDocumentBaseResult.getFailure(); - assertThat(exception, instanceOf(ElasticsearchException.class)); + assertThat(exception, instanceOf(IngestProcessorException.class)); assertThat(exception.getMessage(), equalTo("java.lang.RuntimeException: processor failed")); } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index b3db2300bb1f4..e27c1416f703e 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -23,7 +23,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SetOnce; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; @@ -823,7 +822,7 @@ public void testExecuteSuccessWithOnFailure() throws Exception { @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); - verify(failureHandler, never()).accept(eq(0), any(ElasticsearchException.class)); + verify(failureHandler, never()).accept(eq(0), any(IngestProcessorException.class)); verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } diff --git a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java index 5a840b9e4bb48..cc9e44e387baf 100644 --- a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.ingest; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ingest.SimulateProcessorResult; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.script.MockScriptEngine; @@ -86,7 +85,7 @@ public void testActualCompoundProcessorWithoutOnFailure() throws Exception { Exception[] holder = new Exception[1]; trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e); - assertThat(((ElasticsearchException) holder[0]).getRootCause().getMessage(), equalTo(exception.getMessage())); + assertThat(((IngestProcessorException) holder[0]).getRootCause().getMessage(), equalTo(exception.getMessage())); SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument); assertThat(testProcessor.getInvokedCounter(), equalTo(1)); @@ -456,7 +455,7 @@ public void testActualPipelineProcessorWithCycle() throws Exception { Exception[] holder = new Exception[1]; trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e); - ElasticsearchException exception = (ElasticsearchException) holder[0]; + IngestProcessorException exception = (IngestProcessorException) holder[0]; assertThat(exception.getCause(), instanceOf(IllegalStateException.class)); assertThat(exception.getMessage(), containsString("Cycle detected for pipeline: pipeline1")); } From 3ac04ff8d1945554cb04bab129d35fd20f29f91a Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 1 Nov 2019 15:27:50 -0400 Subject: [PATCH 05/10] Fix indentation --- .../src/main/java/org/elasticsearch/ElasticsearchException.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/ElasticsearchException.java b/server/src/main/java/org/elasticsearch/ElasticsearchException.java index d56978f7000d6..abaceff4f4e73 100644 --- a/server/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/server/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -1040,7 +1040,7 @@ private enum ElasticsearchExceptionHandle { INGEST_PROCESSOR_EXCEPTION( org.elasticsearch.ingest.IngestProcessorException.class, org.elasticsearch.ingest.IngestProcessorException::new, - 157, + 157, Version.V_7_5_0); final Class exceptionClass; From 37975a0eaec345ae881f2580ee8a9e8d019d2cbc Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 1 Nov 2019 15:57:02 -0400 Subject: [PATCH 06/10] Fix tests --- .../rest-api-spec/test/ingest/210_pipeline_processor.yml | 2 +- .../test/resources/rest-api-spec/test/ingest/90_simulate.yml | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml index e375d195bfbc9..5df08b7cf90d0 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml @@ -106,5 +106,5 @@ teardown: id: 1 pipeline: "outer" body: {} -- match: { error.root_cause.0.type: "exception" } +- match: { error.root_cause.0.type: "ingest_processor_exception" } - match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: inner" } diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml index 8b0f72c72fa21..456a2ba15dd4c 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml @@ -339,7 +339,7 @@ teardown: ] } - length: { docs: 2 } - - match: { docs.0.error.type: "exception" } + - match: { docs.0.error.type: "illegal_argument_exception" } - match: { docs.1.doc._source.foo: "BAR" } - length: { docs.1.doc._ingest: 1 } - is_true: docs.1.doc._ingest.timestamp @@ -653,8 +653,7 @@ teardown: } - length: { docs: 1 } - length: { docs.0.processor_results: 1 } -- match: { docs.0.processor_results.0.error.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: outer" } -- match: { docs.0.processor_results.0.error.caused_by.reason: "Cycle detected for pipeline: outer" } +- match: { docs.0.processor_results.0.error.reason: "Cycle detected for pipeline: outer" } --- "Test verbose simulate with Pipeline Processor with Multiple Pipelines": From 8ed7266f11d2e102d42b5f7cf97eb528d0a096ff Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 1 Nov 2019 17:23:48 -0400 Subject: [PATCH 07/10] Add exception to test --- .../java/org/elasticsearch/ExceptionSerializationTests.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java index 648dd142eeca3..060d228f70a93 100644 --- a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java @@ -71,6 +71,7 @@ import org.elasticsearch.indices.IndexTemplateMissingException; import org.elasticsearch.indices.InvalidIndexTemplateException; import org.elasticsearch.indices.recovery.RecoverFilesRecoveryException; +import org.elasticsearch.ingest.IngestProcessorException; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException; @@ -816,6 +817,7 @@ public void testIds() { ids.put(154, RetentionLeaseNotFoundException.class); ids.put(155, ShardNotInPrimaryModeException.class); ids.put(156, RetentionLeaseInvalidRetainingSeqNoException.class); + ids.put(157, IngestProcessorException.class); Map, Integer> reverse = new HashMap<>(); for (Map.Entry> entry : ids.entrySet()) { From 594f037f4291a46cc27b024d9537b9a47e7832e9 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 1 Nov 2019 17:58:35 -0400 Subject: [PATCH 08/10] Fix test --- .../org/elasticsearch/ingest/common/IngestRestartIT.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java index e360e0bb3152a..17b9c6e423ffd 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java @@ -109,15 +109,13 @@ public Settings onNodeStopped(String nodeName) { .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .get(); - ElasticsearchException exception = expectThrows(ElasticsearchException.class, + IllegalStateException exception = expectThrows(IllegalStateException.class, () -> client().prepareIndex("index").setId("2") .setSource("x", 0) .setPipeline(pipelineIdWithScript) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .get()); - assertThat(exception.getHeaderKeys(), equalTo(Sets.newHashSet("processor_type"))); - assertThat(exception.getHeader("processor_type"), equalTo(Arrays.asList("unknown"))); - assertThat(exception.getRootCause().getMessage(), + assertThat(exception.getMessage(), equalTo("pipeline with id [" + pipelineIdWithScript + "] could not be loaded, caused by " + "[org.elasticsearch.ElasticsearchParseException: Error updating pipeline with id [" + pipelineIdWithScript + "]; " + "org.elasticsearch.ElasticsearchException: java.lang.IllegalArgumentException: cannot execute [inline] scripts; " + From fda73a15e765a76defaca4bec3f54a097edc4838 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 1 Nov 2019 18:20:03 -0400 Subject: [PATCH 09/10] Fix imports --- .../java/org/elasticsearch/ingest/common/IngestRestartIT.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java index 17b9c6e423ffd..4eab1b7c86c9b 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java @@ -18,12 +18,10 @@ */ package org.elasticsearch.ingest.common; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.MockScriptEngine; From a54c3c26ba9819b2fad4810f605bcae0e4f77d26 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 14 Nov 2019 08:52:06 -0500 Subject: [PATCH 10/10] Update version --- .../src/main/java/org/elasticsearch/ElasticsearchException.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/ElasticsearchException.java b/server/src/main/java/org/elasticsearch/ElasticsearchException.java index abaceff4f4e73..8b96cb87a0257 100644 --- a/server/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/server/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -1041,7 +1041,7 @@ private enum ElasticsearchExceptionHandle { org.elasticsearch.ingest.IngestProcessorException.class, org.elasticsearch.ingest.IngestProcessorException::new, 157, - Version.V_7_5_0); + Version.V_7_6_0); final Class exceptionClass; final CheckedFunction constructor;