From 7c89cb38a833224d8034390977da9941c4ee52e6 Mon Sep 17 00:00:00 2001 From: Armin Date: Sat, 21 Jul 2018 13:57:28 +0200 Subject: [PATCH 1/5] INGEST: Implement Drop Processor * Throw `DroppedDocumentException` to indicate document is not to be indexed * Catch and rethrow where necessary to prevent Exception being logged and avoid unnecessary wrapping and other operations on the exception * In simulate API: Don't add dropped documents to returned `docs` array * In index request: return `"noop"` type index response for dropped documents --- .../ingest/common/Processors.java | 6 ++++ .../ingest/common/processors_whitelist.txt | 1 + .../painless/PainlessScript.java | 4 +++ .../action/bulk/TransportBulkAction.java | 31 ++++++++++++++++--- .../ingest/SimulateExecutionService.java | 8 ++++- .../ingest/CompoundProcessor.java | 4 +++ .../ingest/DroppedDocumentException.java | 26 ++++++++++++++++ 7 files changed, 74 insertions(+), 6 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/ingest/DroppedDocumentException.java diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/Processors.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/Processors.java index 8a0b152989241..7510b10c2e703 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/Processors.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/Processors.java @@ -19,6 +19,8 @@ package org.elasticsearch.ingest.common; +import org.elasticsearch.ingest.DroppedDocumentException; + import java.util.Map; public final class Processors { @@ -46,4 +48,8 @@ public static void json(Map ctx, String field) { public static String urlDecode(String value) { return URLDecodeProcessor.apply(value); } + + public static void drop() { + throw new DroppedDocumentException(); + } } diff --git a/modules/ingest-common/src/main/resources/org/elasticsearch/ingest/common/processors_whitelist.txt b/modules/ingest-common/src/main/resources/org/elasticsearch/ingest/common/processors_whitelist.txt index 3d93b19f0660e..5bb5be28df8df 100644 --- a/modules/ingest-common/src/main/resources/org/elasticsearch/ingest/common/processors_whitelist.txt +++ b/modules/ingest-common/src/main/resources/org/elasticsearch/ingest/common/processors_whitelist.txt @@ -26,4 +26,5 @@ class org.elasticsearch.ingest.common.Processors { Object json(Object) void json(Map, String) String urlDecode(String) + void drop() } \ No newline at end of file diff --git a/modules/lang-painless/src/main/java/org/elasticsearch/painless/PainlessScript.java b/modules/lang-painless/src/main/java/org/elasticsearch/painless/PainlessScript.java index 6139e66160ee6..24b9f9af13082 100644 --- a/modules/lang-painless/src/main/java/org/elasticsearch/painless/PainlessScript.java +++ b/modules/lang-painless/src/main/java/org/elasticsearch/painless/PainlessScript.java @@ -19,6 +19,7 @@ package org.elasticsearch.painless; +import org.elasticsearch.ingest.DroppedDocumentException; import org.elasticsearch.script.ScriptException; import java.util.ArrayList; @@ -56,6 +57,9 @@ public interface PainlessScript { * @return The generated ScriptException. */ default ScriptException convertToScriptException(Throwable t, Map> extraMetadata) { + if (t instanceof DroppedDocumentException) { + throw (DroppedDocumentException) t; + } // create a script stack: this is just the script portion List scriptStack = new ArrayList<>(); for (StackTraceElement element : t.getStackTrace()) { diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 939b0b7024904..b826210303ff9 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -27,6 +27,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; @@ -37,6 +38,7 @@ import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.update.TransportUpdateAction; import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; @@ -57,6 +59,7 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndexClosedException; +import org.elasticsearch.ingest.DroppedDocumentException; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.tasks.Task; @@ -488,11 +491,16 @@ private long relativeTime() { void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListener listener) { long ingestStartTimeInNanos = System.nanoTime(); BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original); - ingestService.getPipelineExecutionService().executeBulkRequest(() -> bulkRequestModifier, (indexRequest, exception) -> { - logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]", - indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), exception); - bulkRequestModifier.markCurrentItemAsFailed(exception); - }, (exception) -> { + ingestService.getPipelineExecutionService().executeBulkRequest(() -> bulkRequestModifier, + (indexRequest, exception) -> { + if (ExceptionsHelper.unwrap(exception, DroppedDocumentException.class) != null) { + bulkRequestModifier.markCurrentItemAsDropped(); + } else { + logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]", + indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), exception); + bulkRequestModifier.markCurrentItemAsFailed(exception); + } + }, (exception) -> { if (exception != null) { logger.error("failed to execute pipeline for a bulk request", exception); listener.onFailure(exception); @@ -571,6 +579,19 @@ ActionListener wrapActionListenerIfNeeded(long ingestTookInMillis, } } + void markCurrentItemAsDropped() { + IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(currentSlot); + failedSlots.set(currentSlot); + itemResponses.add( + new BulkItemResponse(currentSlot, indexRequest.opType(), + new UpdateResponse( + new ShardId(indexRequest.index(), IndexMetaData.INDEX_UUID_NA_VALUE, 0), + indexRequest.type(), indexRequest.id(), indexRequest.version(), DocWriteResponse.Result.NOOP + ) + ) + ); + } + void markCurrentItemAsFailed(Exception e) { IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(currentSlot); // We hit a error during preprocessing a request, so we: diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java index db7397ba1f86b..d891b333131f5 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.ingest.DroppedDocumentException; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.CompoundProcessor; @@ -55,6 +56,8 @@ SimulateDocumentResult executeDocument(Pipeline pipeline, IngestDocument ingestD try { pipeline.execute(ingestDocument); return new SimulateDocumentBaseResult(ingestDocument); + } catch (DroppedDocumentException e) { + return null; } catch (Exception e) { return new SimulateDocumentBaseResult(e); } @@ -67,7 +70,10 @@ public void execute(SimulatePipelineRequest.Parsed request, ActionListener responses = new ArrayList<>(); for (IngestDocument ingestDocument : request.getDocuments()) { - responses.add(executeDocument(request.getPipeline(), ingestDocument, request.isVerbose())); + SimulateDocumentResult response = executeDocument(request.getPipeline(), ingestDocument, request.isVerbose()); + if (response != null) { + responses.add(response); + } } listener.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(), request.isVerbose(), responses)); } diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index 3ab7c078cd7ad..eec59612d0f6f 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -21,6 +21,7 @@ package org.elasticsearch.ingest; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import java.util.ArrayList; import java.util.Arrays; @@ -99,6 +100,9 @@ public void execute(IngestDocument ingestDocument) throws Exception { try { processor.execute(ingestDocument); } catch (Exception e) { + if (ExceptionsHelper.unwrap(e, DroppedDocumentException.class) != null) { + throw e; + } if (ignoreFailure) { continue; } diff --git a/server/src/main/java/org/elasticsearch/ingest/DroppedDocumentException.java b/server/src/main/java/org/elasticsearch/ingest/DroppedDocumentException.java new file mode 100644 index 0000000000000..657d3b51938d5 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/ingest/DroppedDocumentException.java @@ -0,0 +1,26 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest; + +/** + * Thrown by Ingest's {@code Processors.drop()} to indicate that a document should not be indexed. + */ +public final class DroppedDocumentException extends RuntimeException { +} From 8a9f37b0d349f71c5c5a0e9e9e343761bb201a93 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 31 Aug 2018 13:09:40 +0200 Subject: [PATCH 2/5] Adjust Processor API --- .../ingest/common/AbstractStringProcessor.java | 5 +++-- .../org/elasticsearch/ingest/common/AppendProcessor.java | 3 ++- .../org/elasticsearch/ingest/common/ConvertProcessor.java | 5 +++-- .../ingest/common/DateIndexNameProcessor.java | 3 ++- .../org/elasticsearch/ingest/common/DateProcessor.java | 3 ++- .../org/elasticsearch/ingest/common/DissectProcessor.java | 5 +++-- .../elasticsearch/ingest/common/DotExpanderProcessor.java | 3 ++- .../org/elasticsearch/ingest/common/FailProcessor.java | 2 +- .../org/elasticsearch/ingest/common/ForEachProcessor.java | 5 +++-- .../org/elasticsearch/ingest/common/GrokProcessor.java | 5 +++-- .../org/elasticsearch/ingest/common/JoinProcessor.java | 3 ++- .../org/elasticsearch/ingest/common/JsonProcessor.java | 3 ++- .../elasticsearch/ingest/common/KeyValueProcessor.java | 3 ++- .../elasticsearch/ingest/common/PipelineProcessor.java | 3 ++- .../org/elasticsearch/ingest/common/RemoveProcessor.java | 3 ++- .../org/elasticsearch/ingest/common/RenameProcessor.java | 5 +++-- .../org/elasticsearch/ingest/common/ScriptProcessor.java | 3 ++- .../org/elasticsearch/ingest/common/SetProcessor.java | 3 ++- .../org/elasticsearch/ingest/common/SortProcessor.java | 3 ++- .../org/elasticsearch/ingest/common/SplitProcessor.java | 5 +++-- .../ingest/common/ForEachProcessorTests.java | 3 ++- .../ingest/common/PipelineProcessorTests.java | 3 ++- .../ingest/attachment/AttachmentProcessor.java | 5 +++-- .../org/elasticsearch/ingest/geoip/GeoIpProcessor.java | 5 +++-- .../ingest/useragent/UserAgentProcessor.java | 5 +++-- .../action/ingest/TrackingResultProcessor.java | 3 ++- .../java/org/elasticsearch/ingest/CompoundProcessor.java | 7 ++++--- .../org/elasticsearch/ingest/ConditionalProcessor.java | 5 +++-- .../main/java/org/elasticsearch/ingest/IngestService.java | 2 +- .../src/main/java/org/elasticsearch/ingest/Processor.java | 2 +- .../elasticsearch/ingest/ConditionalProcessorTests.java | 3 ++- .../java/org/elasticsearch/ingest/IngestServiceTests.java | 8 +++++--- .../main/java/org/elasticsearch/ingest/TestProcessor.java | 3 ++- .../xpack/monitoring/test/MockIngestPlugin.java | 3 ++- .../xpack/security/ingest/SetSecurityUserProcessor.java | 3 ++- 35 files changed, 83 insertions(+), 50 deletions(-) diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AbstractStringProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AbstractStringProcessor.java index 23c98ca1e0c0e..792e5e4ebed2d 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AbstractStringProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AbstractStringProcessor.java @@ -57,16 +57,17 @@ String getTargetField() { } @Override - public final void execute(IngestDocument document) { + public final IngestDocument execute(IngestDocument document) { String val = document.getFieldValue(field, String.class, ignoreMissing); if (val == null && ignoreMissing) { - return; + return document; } else if (val == null) { throw new IllegalArgumentException("field [" + field + "] is null, cannot process it."); } document.setFieldValue(targetField, process(val)); + return document; } protected abstract T process(String value); diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java index 0543ae8591f97..058d1bf22d81c 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/AppendProcessor.java @@ -56,8 +56,9 @@ public ValueSource getValue() { } @Override - public void execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { ingestDocument.appendFieldValue(field, value); + return ingestDocument; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConvertProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConvertProcessor.java index 2e881b82b59de..aca48efe6c11c 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConvertProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ConvertProcessor.java @@ -173,12 +173,12 @@ boolean isIgnoreMissing() { } @Override - public void execute(IngestDocument document) { + public IngestDocument execute(IngestDocument document) { Object oldValue = document.getFieldValue(field, Object.class, ignoreMissing); Object newValue; if (oldValue == null && ignoreMissing) { - return; + return document; } else if (oldValue == null) { throw new IllegalArgumentException("Field [" + field + "] is null, cannot be converted to type [" + convertType + "]"); } @@ -194,6 +194,7 @@ public void execute(IngestDocument document) { newValue = convertType.convert(oldValue); } document.setFieldValue(targetField, newValue); + return document; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateIndexNameProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateIndexNameProcessor.java index 0d6253c88f9fa..4a88f15b6410d 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateIndexNameProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateIndexNameProcessor.java @@ -63,7 +63,7 @@ public final class DateIndexNameProcessor extends AbstractProcessor { } @Override - public void execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { // Date can be specified as a string or long: Object obj = ingestDocument.getFieldValue(field, Object.class); String date = null; @@ -101,6 +101,7 @@ public void execute(IngestDocument ingestDocument) throws Exception { .append('>'); String dynamicIndexName = builder.toString(); ingestDocument.setFieldValue(IngestDocument.MetaData.INDEX.getFieldName(), dynamicIndexName); + return ingestDocument; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateProcessor.java index 4a9654f8cd0fe..dd6e6006eeb6d 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DateProcessor.java @@ -74,7 +74,7 @@ private Locale newLocale(Map params) { } @Override - public void execute(IngestDocument ingestDocument) { + public IngestDocument execute(IngestDocument ingestDocument) { Object obj = ingestDocument.getFieldValue(field, Object.class); String value = null; if (obj != null) { @@ -98,6 +98,7 @@ public void execute(IngestDocument ingestDocument) { } ingestDocument.setFieldValue(targetField, ISODateTimeFormat.dateTime().print(dateTime)); + return ingestDocument; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DissectProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DissectProcessor.java index 58f04ccdd431f..fa51d047e73e3 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DissectProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DissectProcessor.java @@ -47,14 +47,15 @@ public final class DissectProcessor extends AbstractProcessor { } @Override - public void execute(IngestDocument ingestDocument) { + public IngestDocument execute(IngestDocument ingestDocument) { String input = ingestDocument.getFieldValue(field, String.class, ignoreMissing); if (input == null && ignoreMissing) { - return; + return ingestDocument; } else if (input == null) { throw new IllegalArgumentException("field [" + field + "] is null, cannot process it."); } dissectParser.parse(input).forEach(ingestDocument::setFieldValue); + return ingestDocument; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DotExpanderProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DotExpanderProcessor.java index bfc32311733da..0698f6ed0a6c9 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DotExpanderProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DotExpanderProcessor.java @@ -41,7 +41,7 @@ public final class DotExpanderProcessor extends AbstractProcessor { @Override @SuppressWarnings("unchecked") - public void execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { String path; Map map; if (this.path != null) { @@ -75,6 +75,7 @@ public void execute(IngestDocument ingestDocument) throws Exception { Object value = map.remove(field); ingestDocument.setFieldValue(path, value); } + return ingestDocument; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/FailProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/FailProcessor.java index b1f946c10a239..0b62fbf72c8e5 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/FailProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/FailProcessor.java @@ -48,7 +48,7 @@ public TemplateScript.Factory getMessage() { } @Override - public void execute(IngestDocument document) { + public IngestDocument execute(IngestDocument document) { throw new FailProcessorException(document.renderTemplate(message)); } diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java index 31c0ae8cc3dc8..cf4172aed1360 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java @@ -63,11 +63,11 @@ boolean isIgnoreMissing() { } @Override - public void execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { List values = ingestDocument.getFieldValue(field, List.class, ignoreMissing); if (values == null) { if (ignoreMissing) { - return; + return ingestDocument; } throw new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements."); } @@ -81,6 +81,7 @@ public void execute(IngestDocument ingestDocument) throws Exception { } } ingestDocument.setFieldValue(field, newValues); + return ingestDocument; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GrokProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GrokProcessor.java index 88cba512b86cb..19883053d2a09 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GrokProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/GrokProcessor.java @@ -54,11 +54,11 @@ public final class GrokProcessor extends AbstractProcessor { } @Override - public void execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { String fieldValue = ingestDocument.getFieldValue(matchField, String.class, ignoreMissing); if (fieldValue == null && ignoreMissing) { - return; + return ingestDocument; } else if (fieldValue == null) { throw new IllegalArgumentException("field [" + matchField + "] is null, cannot process it."); } @@ -81,6 +81,7 @@ public void execute(IngestDocument ingestDocument) throws Exception { ingestDocument.setFieldValue(PATTERN_MATCH_KEY, "0"); } } + return ingestDocument; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/JoinProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/JoinProcessor.java index 57216a71e022c..f29a688886194 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/JoinProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/JoinProcessor.java @@ -60,7 +60,7 @@ String getTargetField() { } @Override - public void execute(IngestDocument document) { + public IngestDocument execute(IngestDocument document) { List list = document.getFieldValue(field, List.class); if (list == null) { throw new IllegalArgumentException("field [" + field + "] is null, cannot join."); @@ -69,6 +69,7 @@ public void execute(IngestDocument document) { .map(Object::toString) .collect(Collectors.joining(separator)); document.setFieldValue(targetField, joined); + return document; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/JsonProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/JsonProcessor.java index c0a9d37abdab7..90a648347cdfd 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/JsonProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/JsonProcessor.java @@ -107,12 +107,13 @@ public static void apply(Map ctx, String fieldName) { } @Override - public void execute(IngestDocument document) throws Exception { + public IngestDocument execute(IngestDocument document) throws Exception { if (addToRoot) { apply(document.getSourceAndMetadata(), field); } else { document.setFieldValue(targetField, apply(document.getFieldValue(field, Object.class))); } + return document; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/KeyValueProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/KeyValueProcessor.java index 9cce3cedf3d02..69c7e9ff75187 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/KeyValueProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/KeyValueProcessor.java @@ -188,8 +188,9 @@ private static void append(IngestDocument document, String targetField, String v } @Override - public void execute(IngestDocument document) { + public IngestDocument execute(IngestDocument document) { execution.accept(document); + return document; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/PipelineProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/PipelineProcessor.java index 77ffdb919193f..d2608b355832e 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/PipelineProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/PipelineProcessor.java @@ -42,12 +42,13 @@ private PipelineProcessor(String tag, String pipelineName, IngestService ingestS } @Override - public void execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { Pipeline pipeline = ingestService.getPipeline(pipelineName); if (pipeline == null) { throw new IllegalStateException("Pipeline processor configured for non-existent pipeline [" + pipelineName + ']'); } ingestDocument.executePipeline(pipeline); + return ingestDocument; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RemoveProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RemoveProcessor.java index 2b9eaa9a13d18..6002abb9e67a3 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RemoveProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RemoveProcessor.java @@ -52,7 +52,7 @@ public List getFields() { } @Override - public void execute(IngestDocument document) { + public IngestDocument execute(IngestDocument document) { if (ignoreMissing) { fields.forEach(field -> { String path = document.renderTemplate(field); @@ -63,6 +63,7 @@ public void execute(IngestDocument document) { } else { fields.forEach(document::removeField); } + return document; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RenameProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RenameProcessor.java index a35a164ddd3f1..2abd920048f73 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RenameProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RenameProcessor.java @@ -59,11 +59,11 @@ boolean isIgnoreMissing() { } @Override - public void execute(IngestDocument document) { + public IngestDocument execute(IngestDocument document) { String path = document.renderTemplate(field); if (document.hasField(path, true) == false) { if (ignoreMissing) { - return; + return document; } else { throw new IllegalArgumentException("field [" + path + "] doesn't exist"); } @@ -86,6 +86,7 @@ public void execute(IngestDocument document) { document.setFieldValue(path, value); throw e; } + return document; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ScriptProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ScriptProcessor.java index 169b2ab646a7d..12ef53cdcfcfa 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ScriptProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ScriptProcessor.java @@ -69,9 +69,10 @@ public final class ScriptProcessor extends AbstractProcessor { * @param document The Ingest document passed into the script context under the "ctx" object. */ @Override - public void execute(IngestDocument document) { + public IngestDocument execute(IngestDocument document) { IngestScript.Factory factory = scriptService.compile(script, IngestScript.CONTEXT); factory.newInstance(script.getParams()).execute(document.getSourceAndMetadata()); + return document; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SetProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SetProcessor.java index 7aefa28861830..0af51e5b895e4 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SetProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SetProcessor.java @@ -65,10 +65,11 @@ public ValueSource getValue() { } @Override - public void execute(IngestDocument document) { + public IngestDocument execute(IngestDocument document) { if (overrideEnabled || document.hasField(field) == false || document.getFieldValue(field, Object.class) == null) { document.setFieldValue(field, value); } + return document; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SortProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SortProcessor.java index 7ff266efe6b91..a29cc34652479 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SortProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SortProcessor.java @@ -94,7 +94,7 @@ String getTargetField() { @Override @SuppressWarnings("unchecked") - public void execute(IngestDocument document) { + public IngestDocument execute(IngestDocument document) { List> list = document.getFieldValue(field, List.class); if (list == null) { @@ -110,6 +110,7 @@ public void execute(IngestDocument document) { } document.setFieldValue(targetField, copy); + return document; } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SplitProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SplitProcessor.java index cdd90f937fd09..96a765b5ba7a3 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SplitProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/SplitProcessor.java @@ -68,11 +68,11 @@ String getTargetField() { } @Override - public void execute(IngestDocument document) { + public IngestDocument execute(IngestDocument document) { String oldVal = document.getFieldValue(field, String.class, ignoreMissing); if (oldVal == null && ignoreMissing) { - return; + return document; } else if (oldVal == null) { throw new IllegalArgumentException("field [" + field + "] is null, cannot split."); } @@ -81,6 +81,7 @@ public void execute(IngestDocument document) { List splitList = new ArrayList<>(strings.length); Collections.addAll(splitList, strings); document.setFieldValue(targetField, splitList); + return document; } @Override diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java index ffc5bcd4ac930..282994d8eb354 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java @@ -154,9 +154,10 @@ public void testRestOfTheDocumentIsAvailable() throws Exception { public void testRandom() throws Exception { Processor innerProcessor = new Processor() { @Override - public void execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { String existingValue = ingestDocument.getFieldValue("_ingest._value", String.class); ingestDocument.setFieldValue("_ingest._value", existingValue + "."); + return ingestDocument; } @Override diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java index 5baf3cf822d72..3103fb0392e96 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java @@ -45,8 +45,9 @@ public void testExecutesPipeline() throws Exception { pipelineId, null, null, new CompoundProcessor(new Processor() { @Override - public void execute(final IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(final IngestDocument ingestDocument) throws Exception { invoked.complete(ingestDocument); + return ingestDocument; } @Override diff --git a/plugins/ingest-attachment/src/main/java/org/elasticsearch/ingest/attachment/AttachmentProcessor.java b/plugins/ingest-attachment/src/main/java/org/elasticsearch/ingest/attachment/AttachmentProcessor.java index 9fb2debcb5481..c8a24ad3c8719 100644 --- a/plugins/ingest-attachment/src/main/java/org/elasticsearch/ingest/attachment/AttachmentProcessor.java +++ b/plugins/ingest-attachment/src/main/java/org/elasticsearch/ingest/attachment/AttachmentProcessor.java @@ -73,13 +73,13 @@ boolean isIgnoreMissing() { } @Override - public void execute(IngestDocument ingestDocument) { + public IngestDocument execute(IngestDocument ingestDocument) { Map additionalFields = new HashMap<>(); byte[] input = ingestDocument.getFieldValueAsBytes(field, ignoreMissing); if (input == null && ignoreMissing) { - return; + return ingestDocument; } else if (input == null) { throw new IllegalArgumentException("field [" + field + "] is null, cannot parse."); } @@ -164,6 +164,7 @@ public void execute(IngestDocument ingestDocument) { } ingestDocument.setFieldValue(targetField, additionalFields); + return ingestDocument; } @Override diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java index 366b6ffc1d241..b5dbf5a7f34de 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java @@ -81,11 +81,11 @@ boolean isIgnoreMissing() { } @Override - public void execute(IngestDocument ingestDocument) { + public IngestDocument execute(IngestDocument ingestDocument) { String ip = ingestDocument.getFieldValue(field, String.class, ignoreMissing); if (ip == null && ignoreMissing) { - return; + return ingestDocument; } else if (ip == null) { throw new IllegalArgumentException("field [" + field + "] is null, cannot extract geoip information."); } @@ -120,6 +120,7 @@ public void execute(IngestDocument ingestDocument) { if (geoData.isEmpty() == false) { ingestDocument.setFieldValue(targetField, geoData); } + return ingestDocument; } @Override diff --git a/plugins/ingest-user-agent/src/main/java/org/elasticsearch/ingest/useragent/UserAgentProcessor.java b/plugins/ingest-user-agent/src/main/java/org/elasticsearch/ingest/useragent/UserAgentProcessor.java index 93f210c427b51..6e7f588f0bd8a 100644 --- a/plugins/ingest-user-agent/src/main/java/org/elasticsearch/ingest/useragent/UserAgentProcessor.java +++ b/plugins/ingest-user-agent/src/main/java/org/elasticsearch/ingest/useragent/UserAgentProcessor.java @@ -63,11 +63,11 @@ boolean isIgnoreMissing() { } @Override - public void execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { String userAgent = ingestDocument.getFieldValue(field, String.class, ignoreMissing); if (userAgent == null && ignoreMissing) { - return; + return ingestDocument; } else if (userAgent == null) { throw new IllegalArgumentException("field [" + field + "] is null, cannot parse user-agent."); } @@ -144,6 +144,7 @@ public void execute(IngestDocument ingestDocument) throws Exception { } ingestDocument.setFieldValue(targetField, uaDetails); + return ingestDocument; } /** To maintain compatibility with logstash-filter-useragent */ diff --git a/server/src/main/java/org/elasticsearch/action/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/action/ingest/TrackingResultProcessor.java index abf617ffb1aac..04c0fe7ca49dc 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/TrackingResultProcessor.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/TrackingResultProcessor.java @@ -42,7 +42,7 @@ public TrackingResultProcessor(boolean ignoreFailure, Processor actualProcessor, } @Override - public void execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { try { actualProcessor.execute(ingestDocument); processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument))); @@ -54,6 +54,7 @@ public void execute(IngestDocument ingestDocument) throws Exception { } throw e; } + return ingestDocument; } @Override diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index eec59612d0f6f..6986ef2638e66 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -95,7 +95,7 @@ public String getTag() { } @Override - public void execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { for (Processor processor : processors) { try { processor.execute(ingestDocument); @@ -108,7 +108,7 @@ public void execute(IngestDocument ingestDocument) throws Exception { } ElasticsearchException compoundProcessorException = - newCompoundProcessorException(e, processor.getType(), processor.getTag()); + newCompoundProcessorException(e, processor.getType(), processor.getTag()); if (onFailureProcessors.isEmpty()) { throw compoundProcessorException; } else { @@ -117,6 +117,7 @@ public void execute(IngestDocument ingestDocument) throws Exception { } } } + return ingestDocument; } void executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception { @@ -153,7 +154,7 @@ private void removeFailureMetadata(IngestDocument ingestDocument) { } private ElasticsearchException newCompoundProcessorException(Exception e, String processorType, String processorTag) { - if (e instanceof ElasticsearchException && ((ElasticsearchException)e).getHeader("processor_type") != null) { + if (e instanceof ElasticsearchException && ((ElasticsearchException) e).getHeader("processor_type") != null) { return (ElasticsearchException) e; } diff --git a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java index d1eb651acae03..b6f6612344a39 100644 --- a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java @@ -51,12 +51,13 @@ public class ConditionalProcessor extends AbstractProcessor { } @Override - public void execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { IngestConditionalScript script = scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams()); if (script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata()))) { - processor.execute(ingestDocument); + return processor.execute(ingestDocument); } + return ingestDocument; } @Override diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index f0f5d76caaba8..ce8e9d1346dee 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -270,7 +270,7 @@ private static Pipeline substitutePipeline(String id, ElasticsearchParseExceptio String errorMessage = "pipeline with id [" + id + "] could not be loaded, caused by [" + e.getDetailedMessage() + "]"; Processor failureProcessor = new AbstractProcessor(tag) { @Override - public void execute(IngestDocument ingestDocument) { + public IngestDocument execute(IngestDocument ingestDocument) { throw new IllegalStateException(errorMessage); } diff --git a/server/src/main/java/org/elasticsearch/ingest/Processor.java b/server/src/main/java/org/elasticsearch/ingest/Processor.java index 15a26d3749191..498ec3a77104f 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Processor.java +++ b/server/src/main/java/org/elasticsearch/ingest/Processor.java @@ -40,7 +40,7 @@ public interface Processor { /** * Introspect and potentially modify the incoming data. */ - void execute(IngestDocument ingestDocument) throws Exception; + IngestDocument execute(IngestDocument ingestDocument) throws Exception; /** * Gets the type of a processor diff --git a/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java index 2cb13af7a2808..12b4078ddf8bb 100644 --- a/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/ConditionalProcessorTests.java @@ -65,8 +65,9 @@ public void testChecksCondition() throws Exception { scriptName, Collections.emptyMap()), scriptService, new Processor() { @Override - public void execute(final IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(final IngestDocument ingestDocument) throws Exception { ingestDocument.setFieldValue("foo", "bar"); + return ingestDocument; } @Override diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 83a5bef4de279..5c20dda009a31 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -424,7 +424,7 @@ public void testExecuteIndexPipelineExistsButFailedParsing() { IngestService ingestService = createWithProcessors(Collections.singletonMap( "mock", (factories, tag, config) -> new AbstractProcessor("mock") { @Override - public void execute(IngestDocument ingestDocument) { + public IngestDocument execute(IngestDocument ingestDocument) { throw new IllegalStateException("error"); } @@ -827,8 +827,9 @@ private static IngestService createWithProcessors() { String value = (String) config.remove("value"); return new Processor() { @Override - public void execute(IngestDocument ingestDocument) { + public IngestDocument execute(IngestDocument ingestDocument) { ingestDocument.setFieldValue(field, value); + return ingestDocument; } @Override @@ -846,8 +847,9 @@ public String getTag() { String field = (String) config.remove("field"); return new Processor() { @Override - public void execute(IngestDocument ingestDocument) { + public IngestDocument execute(IngestDocument ingestDocument) { ingestDocument.removeField(field); + return ingestDocument; } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/ingest/TestProcessor.java b/test/framework/src/main/java/org/elasticsearch/ingest/TestProcessor.java index 4e4c5a24c0cb6..a1feb3e1f73be 100644 --- a/test/framework/src/main/java/org/elasticsearch/ingest/TestProcessor.java +++ b/test/framework/src/main/java/org/elasticsearch/ingest/TestProcessor.java @@ -45,9 +45,10 @@ public TestProcessor(String tag, String type, Consumer ingestDoc } @Override - public void execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { invokedCounter.incrementAndGet(); ingestDocumentConsumer.accept(ingestDocument); + return ingestDocument; } @Override diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MockIngestPlugin.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MockIngestPlugin.java index 818ab374d3495..b4521ad58b222 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MockIngestPlugin.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/test/MockIngestPlugin.java @@ -74,8 +74,9 @@ static class MockProcessor implements Processor { } @Override - public void execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { // mock processor does nothing + return ingestDocument; } @Override diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/ingest/SetSecurityUserProcessor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/ingest/SetSecurityUserProcessor.java index 15ac88b4d9462..0c30af1879cc9 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/ingest/SetSecurityUserProcessor.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/ingest/SetSecurityUserProcessor.java @@ -43,7 +43,7 @@ public SetSecurityUserProcessor(String tag, ThreadContext threadContext, String } @Override - public void execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { Authentication authentication = Authentication.getAuthentication(threadContext); if (authentication == null) { throw new IllegalStateException("No user authenticated, only use this processor via authenticated user"); @@ -86,6 +86,7 @@ public void execute(IngestDocument ingestDocument) throws Exception { } } ingestDocument.setFieldValue(field, userObject); + return ingestDocument; } @Override From 70a455c950f999630cdcfc4ac6e06b5fc93d8e19 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 31 Aug 2018 13:42:42 +0200 Subject: [PATCH 3/5] Implement Drop Processor --- .../ingest/common/DropProcessor.java | 57 +++++++++++++++++++ .../ingest/common/ForEachProcessor.java | 4 +- .../ingest/common/IngestCommonPlugin.java | 1 + .../ingest/common/PipelineProcessor.java | 3 +- .../ingest/common/Processors.java | 5 -- .../painless/PainlessScript.java | 4 -- .../action/bulk/TransportBulkAction.java | 42 +++++++------- .../ingest/SimulateExecutionService.java | 3 - .../ingest/CompoundProcessor.java | 8 +-- .../ingest/DroppedDocumentException.java | 26 --------- .../elasticsearch/ingest/IngestDocument.java | 4 +- .../elasticsearch/ingest/IngestService.java | 35 ++++++------ .../org/elasticsearch/ingest/Pipeline.java | 4 +- .../bulk/TransportBulkActionIngestTests.java | 12 ++-- .../ingest/IngestServiceTests.java | 28 ++++----- 15 files changed, 127 insertions(+), 109 deletions(-) create mode 100644 modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DropProcessor.java delete mode 100644 server/src/main/java/org/elasticsearch/ingest/DroppedDocumentException.java diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DropProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DropProcessor.java new file mode 100644 index 0000000000000..a0eabe38979eb --- /dev/null +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/DropProcessor.java @@ -0,0 +1,57 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.common; + +import java.util.Map; +import org.elasticsearch.ingest.AbstractProcessor; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.Processor; + +/** + * Drop processor only returns {@code null} for the execution result to indicate that any document + * executed by it should not be indexed. + */ +public final class DropProcessor extends AbstractProcessor { + + public static final String TYPE = "drop"; + + private DropProcessor(final String tag) { + super(tag); + } + + @Override + public IngestDocument execute(final IngestDocument ingestDocument) throws Exception { + return null; + } + + @Override + public String getType() { + return TYPE; + } + + public static final class Factory implements Processor.Factory { + + @Override + public Processor create(final Map processorFactories, final String tag, + final Map config) { + return new DropProcessor(tag); + } + } +} diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java index cf4172aed1360..80090820a616c 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java @@ -75,7 +75,9 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { for (Object value : values) { Object previousValue = ingestDocument.getIngestMetadata().put("_value", value); try { - processor.execute(ingestDocument); + if (processor.execute(ingestDocument) == null) { + return null; + } } finally { newValues.add(ingestDocument.getIngestMetadata().put("_value", previousValue)); } diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index 8b048282814ea..d9dba2cc10073 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -84,6 +84,7 @@ public Map getProcessors(Processor.Parameters paramet processors.put(BytesProcessor.TYPE, new BytesProcessor.Factory()); processors.put(PipelineProcessor.TYPE, new PipelineProcessor.Factory(parameters.ingestService)); processors.put(DissectProcessor.TYPE, new DissectProcessor.Factory()); + processors.put(DropProcessor.TYPE, new DropProcessor.Factory()); return Collections.unmodifiableMap(processors); } diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/PipelineProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/PipelineProcessor.java index d2608b355832e..1958a3e5232b8 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/PipelineProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/PipelineProcessor.java @@ -47,8 +47,7 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { if (pipeline == null) { throw new IllegalStateException("Pipeline processor configured for non-existent pipeline [" + pipelineName + ']'); } - ingestDocument.executePipeline(pipeline); - return ingestDocument; + return ingestDocument.executePipeline(pipeline); } @Override diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/Processors.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/Processors.java index 7510b10c2e703..00209f5560090 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/Processors.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/Processors.java @@ -19,8 +19,6 @@ package org.elasticsearch.ingest.common; -import org.elasticsearch.ingest.DroppedDocumentException; - import java.util.Map; public final class Processors { @@ -49,7 +47,4 @@ public static String urlDecode(String value) { return URLDecodeProcessor.apply(value); } - public static void drop() { - throw new DroppedDocumentException(); - } } diff --git a/modules/lang-painless/src/main/java/org/elasticsearch/painless/PainlessScript.java b/modules/lang-painless/src/main/java/org/elasticsearch/painless/PainlessScript.java index 24b9f9af13082..6139e66160ee6 100644 --- a/modules/lang-painless/src/main/java/org/elasticsearch/painless/PainlessScript.java +++ b/modules/lang-painless/src/main/java/org/elasticsearch/painless/PainlessScript.java @@ -19,7 +19,6 @@ package org.elasticsearch.painless; -import org.elasticsearch.ingest.DroppedDocumentException; import org.elasticsearch.script.ScriptException; import java.util.ArrayList; @@ -57,9 +56,6 @@ public interface PainlessScript { * @return The generated ScriptException. */ default ScriptException convertToScriptException(Throwable t, Map> extraMetadata) { - if (t instanceof DroppedDocumentException) { - throw (DroppedDocumentException) t; - } // create a script stack: this is just the script portion List scriptStack = new ArrayList<>(); for (StackTraceElement element : t.getStackTrace()) { diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 317cf9d460210..a3d7d50f3e22a 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -61,7 +61,6 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndexClosedException; -import org.elasticsearch.ingest.DroppedDocumentException; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.tasks.Task; @@ -526,31 +525,28 @@ void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListen BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original); ingestService.executeBulkRequest(() -> bulkRequestModifier, (indexRequest, exception) -> { - if (ExceptionsHelper.unwrap(exception, DroppedDocumentException.class) != null) { - bulkRequestModifier.markCurrentItemAsDropped(); - } else { - logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]", - indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), exception); - bulkRequestModifier.markCurrentItemAsFailed(exception); - } + logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]", + indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), exception); + bulkRequestModifier.markCurrentItemAsFailed(exception); }, (exception) -> { - if (exception != null) { - logger.error("failed to execute pipeline for a bulk request", exception); - listener.onFailure(exception); - } else { - long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ingestStartTimeInNanos); - BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest(); - ActionListener actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(ingestTookInMillis, listener); - if (bulkRequest.requests().isEmpty()) { - // at this stage, the transport bulk action can't deal with a bulk request with no requests, - // so we stop and send an empty response back to the client. - // (this will happen if pre-processing all items in the bulk failed) - actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0)); + if (exception != null) { + logger.error("failed to execute pipeline for a bulk request", exception); + listener.onFailure(exception); } else { - doExecute(task, bulkRequest, actionListener); + long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ingestStartTimeInNanos); + BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest(); + ActionListener actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(ingestTookInMillis, listener); + if (bulkRequest.requests().isEmpty()) { + // at this stage, the transport bulk action can't deal with a bulk request with no requests, + // so we stop and send an empty response back to the client. + // (this will happen if pre-processing all items in the bulk failed) + actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0)); + } else { + doExecute(task, bulkRequest, actionListener); + } } - } - }); + }, + indexRequest -> bulkRequestModifier.markCurrentItemAsDropped()); } static final class BulkRequestModifier implements Iterator> { diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java index d891b333131f5..430da9955bafa 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java @@ -21,7 +21,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; -import org.elasticsearch.ingest.DroppedDocumentException; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.CompoundProcessor; @@ -56,8 +55,6 @@ SimulateDocumentResult executeDocument(Pipeline pipeline, IngestDocument ingestD try { pipeline.execute(ingestDocument); return new SimulateDocumentBaseResult(ingestDocument); - } catch (DroppedDocumentException e) { - return null; } catch (Exception e) { return new SimulateDocumentBaseResult(e); } diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index 6986ef2638e66..f576667f44109 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -21,7 +21,6 @@ package org.elasticsearch.ingest; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ExceptionsHelper; import java.util.ArrayList; import java.util.Arrays; @@ -98,11 +97,10 @@ public String getTag() { public IngestDocument execute(IngestDocument ingestDocument) throws Exception { for (Processor processor : processors) { try { - processor.execute(ingestDocument); - } catch (Exception e) { - if (ExceptionsHelper.unwrap(e, DroppedDocumentException.class) != null) { - throw e; + if (processor.execute(ingestDocument) == null) { + return null; } + } catch (Exception e) { if (ignoreFailure) { continue; } diff --git a/server/src/main/java/org/elasticsearch/ingest/DroppedDocumentException.java b/server/src/main/java/org/elasticsearch/ingest/DroppedDocumentException.java deleted file mode 100644 index 657d3b51938d5..0000000000000 --- a/server/src/main/java/org/elasticsearch/ingest/DroppedDocumentException.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.ingest; - -/** - * Thrown by Ingest's {@code Processors.drop()} to indicate that a document should not be indexed. - */ -public final class DroppedDocumentException extends RuntimeException { -} diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index e218168eeb7b5..5f122358d0c43 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -644,11 +644,11 @@ private static Object deepCopy(Object value) { * @param pipeline Pipeline to execute * @throws Exception On exception in pipeline execution */ - public void executePipeline(Pipeline pipeline) throws Exception { + public IngestDocument executePipeline(Pipeline pipeline) throws Exception { if (this.executedPipelines.add(pipeline) == false) { throw new IllegalStateException("Recursive invocation of pipeline [" + pipeline.getId() + "] detected."); } - pipeline.execute(this); + return pipeline.execute(this); } @Override diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index ce8e9d1346dee..5623cf30f3642 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -323,7 +323,8 @@ void validatePipeline(Map ingestInfos, PutPipelineReq } public void executeBulkRequest(Iterable> actionRequests, - BiConsumer itemFailureHandler, Consumer completionHandler) { + BiConsumer itemFailureHandler, Consumer completionHandler, + Consumer itemDroppedHandler) { threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() { @Override @@ -351,7 +352,7 @@ protected void doRun() { if (pipeline == null) { throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); } - innerExecute(indexRequest, pipeline); + innerExecute(indexRequest, pipeline, itemDroppedHandler); //this shouldn't be needed here but we do it for consistency with index api // which requires it to prevent double execution indexRequest.setPipeline(NOOP_PIPELINE_NAME); @@ -399,7 +400,7 @@ void updatePipelineStats(IngestMetadata ingestMetadata) { } } - private void innerExecute(IndexRequest indexRequest, Pipeline pipeline) throws Exception { + private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer itemDroppedHandler) throws Exception { if (pipeline.getProcessors().isEmpty()) { return; } @@ -419,20 +420,22 @@ private void innerExecute(IndexRequest indexRequest, Pipeline pipeline) throws E VersionType versionType = indexRequest.versionType(); Map sourceAsMap = indexRequest.sourceAsMap(); IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, version, versionType, sourceAsMap); - pipeline.execute(ingestDocument); - - Map metadataMap = ingestDocument.extractMetadata(); - //it's fine to set all metadata fields all the time, as ingest document holds their starting values - //before ingestion, which might also get modified during ingestion. - indexRequest.index((String) metadataMap.get(IngestDocument.MetaData.INDEX)); - indexRequest.type((String) metadataMap.get(IngestDocument.MetaData.TYPE)); - indexRequest.id((String) metadataMap.get(IngestDocument.MetaData.ID)); - indexRequest.routing((String) metadataMap.get(IngestDocument.MetaData.ROUTING)); - indexRequest.version(((Number) metadataMap.get(IngestDocument.MetaData.VERSION)).longValue()); - if (metadataMap.get(IngestDocument.MetaData.VERSION_TYPE) != null) { - indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.MetaData.VERSION_TYPE))); + if (pipeline.execute(ingestDocument) == null) { + itemDroppedHandler.accept(indexRequest); + } else { + Map metadataMap = ingestDocument.extractMetadata(); + //it's fine to set all metadata fields all the time, as ingest document holds their starting values + //before ingestion, which might also get modified during ingestion. + indexRequest.index((String) metadataMap.get(IngestDocument.MetaData.INDEX)); + indexRequest.type((String) metadataMap.get(IngestDocument.MetaData.TYPE)); + indexRequest.id((String) metadataMap.get(IngestDocument.MetaData.ID)); + indexRequest.routing((String) metadataMap.get(IngestDocument.MetaData.ROUTING)); + indexRequest.version(((Number) metadataMap.get(IngestDocument.MetaData.VERSION)).longValue()); + if (metadataMap.get(IngestDocument.MetaData.VERSION_TYPE) != null) { + indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.MetaData.VERSION_TYPE))); + } + indexRequest.source(ingestDocument.getSourceAndMetadata()); } - indexRequest.source(ingestDocument.getSourceAndMetadata()); } catch (Exception e) { totalStats.ingestFailed(); pipelineStats.ifPresent(StatsHolder::ingestFailed); diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java index 0a8f9fbc0d894..9f13cb1280aac 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -77,8 +77,8 @@ public static Pipeline create(String id, Map config, /** * Modifies the data of a document to be indexed based on the processor this pipeline holds */ - public void execute(IngestDocument ingestDocument) throws Exception { - compoundProcessor.execute(ingestDocument); + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + return compoundProcessor.execute(ingestDocument); } /** diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index 8b68d2b6bb9bd..7fdb12ff1356a 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -259,7 +259,7 @@ public void testIngestLocal() throws Exception { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture()); + verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); completionHandler.getValue().accept(exception); assertTrue(failureCalled.get()); @@ -293,7 +293,7 @@ public void testSingleItemBulkActionIngestLocal() throws Exception { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture()); + verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); completionHandler.getValue().accept(exception); assertTrue(failureCalled.get()); @@ -325,7 +325,7 @@ public void testIngestForward() throws Exception { action.execute(null, bulkRequest, listener); // should not have executed ingest locally - verify(ingestService, never()).executeBulkRequest(any(), any(), any()); + verify(ingestService, never()).executeBulkRequest(any(), any(), any(), any()); // but instead should have sent to a remote node with the transport service ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); @@ -369,7 +369,7 @@ public void testSingleItemBulkActionIngestForward() throws Exception { singleItemBulkWriteAction.execute(null, indexRequest, listener); // should not have executed ingest locally - verify(ingestService, never()).executeBulkRequest(any(), any(), any()); + verify(ingestService, never()).executeBulkRequest(any(), any(), any(), any()); // but instead should have sent to a remote node with the transport service ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); @@ -417,7 +417,7 @@ public void testUseDefaultPipeline() throws Exception { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture()); + verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); completionHandler.getValue().accept(exception); assertTrue(failureCalled.get()); @@ -449,7 +449,7 @@ public void testCreateIndexBeforeRunPipeline() throws Exception { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture()); + verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); completionHandler.getValue().accept(exception); assertTrue(failureCalled.get()); diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 5c20dda009a31..e3f52f35b79cd 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -126,7 +126,7 @@ public void testExecuteIndexPipelineDoesNotExist() { @SuppressWarnings("unchecked") final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); assertTrue(failure.get()); verify(completionHandler, times(1)).accept(null); @@ -453,7 +453,7 @@ public String getType() { @SuppressWarnings("unchecked") final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); assertTrue(failure.get()); verify(completionHandler, times(1)).accept(null); @@ -481,7 +481,7 @@ public void testExecuteBulkPipelineDoesNotExist() { BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(bulkRequest.requests(), failureHandler, completionHandler); + ingestService.executeBulkRequest(bulkRequest.requests(), failureHandler, completionHandler, indexReq -> {}); verify(failureHandler, times(1)).accept( argThat(new CustomTypeSafeMatcher("failure handler was not called with the expected arguments") { @Override @@ -514,7 +514,7 @@ public void testExecuteSuccess() { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); verify(failureHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(null); } @@ -532,7 +532,7 @@ public void testExecuteEmptyPipeline() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); verify(failureHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(null); } @@ -560,14 +560,14 @@ public void testExecutePropagateAllMetaDataUpdates() throws Exception { ingestDocument.setFieldValue(metaData.getFieldName(), "update" + metaData.getFieldName()); } } - return null; + return ingestDocument; }).when(processor).execute(any()); final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); @SuppressWarnings("unchecked") final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); verify(processor).execute(any()); verify(failureHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(null); @@ -597,7 +597,7 @@ public void testExecuteFailure() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class)); verify(completionHandler, times(1)).accept(null); @@ -624,7 +624,7 @@ public void testExecuteSuccessWithOnFailure() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); verify(failureHandler, never()).accept(eq(indexRequest), any(ElasticsearchException.class)); verify(completionHandler, times(1)).accept(null); } @@ -661,7 +661,7 @@ public void testExecuteFailureWithNestedOnFailure() throws Exception { final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class)); verify(completionHandler, times(1)).accept(null); @@ -707,7 +707,7 @@ public void testBulkRequestExecutionWithFailures() throws Exception { BiConsumer requestItemErrorHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler); + ingestService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {}); verify(requestItemErrorHandler, times(numIndexRequests)).accept(any(IndexRequest.class), argThat(new ArgumentMatcher() { @Override @@ -741,7 +741,7 @@ public void testBulkRequestExecution() { BiConsumer requestItemErrorHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") Consumer completionHandler = mock(Consumer.class); - ingestService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler); + ingestService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler, indexReq -> {}); verify(requestItemErrorHandler, never()).accept(any(), any()); verify(completionHandler, times(1)).accept(null); @@ -779,7 +779,7 @@ public void testStats() { final IndexRequest indexRequest = new IndexRequest("_index"); indexRequest.setPipeline("_id1"); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterFirstRequestStats = ingestService.stats(); assertThat(afterFirstRequestStats.getStatsPerPipeline().size(), equalTo(2)); assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L)); @@ -787,7 +787,7 @@ public void testStats() { assertThat(afterFirstRequestStats.getTotalStats().getIngestCount(), equalTo(1L)); indexRequest.setPipeline("_id2"); - ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterSecondRequestStats = ingestService.stats(); assertThat(afterSecondRequestStats.getStatsPerPipeline().size(), equalTo(2)); assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L)); From c6039b9bc3c45c1a7fbbe77eb3eabfc963052af7 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 31 Aug 2018 13:59:01 +0200 Subject: [PATCH 4/5] Remove drop exclusion --- .../org/elasticsearch/ingest/common/processors_whitelist.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/ingest-common/src/main/resources/org/elasticsearch/ingest/common/processors_whitelist.txt b/modules/ingest-common/src/main/resources/org/elasticsearch/ingest/common/processors_whitelist.txt index 5bb5be28df8df..3d93b19f0660e 100644 --- a/modules/ingest-common/src/main/resources/org/elasticsearch/ingest/common/processors_whitelist.txt +++ b/modules/ingest-common/src/main/resources/org/elasticsearch/ingest/common/processors_whitelist.txt @@ -26,5 +26,4 @@ class org.elasticsearch.ingest.common.Processors { Object json(Object) void json(Map, String) String urlDecode(String) - void drop() } \ No newline at end of file From deb3c8ef9845bd8cc815cb28018068d82da69197 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 5 Sep 2018 07:29:19 +0200 Subject: [PATCH 5/5] CR: Chain calls to IngestDocument --- .../org/elasticsearch/ingest/common/ForEachProcessor.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java index 80090820a616c..ad93298c646e1 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java @@ -72,18 +72,20 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { throw new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements."); } List newValues = new ArrayList<>(values.size()); + IngestDocument document = ingestDocument; for (Object value : values) { Object previousValue = ingestDocument.getIngestMetadata().put("_value", value); try { - if (processor.execute(ingestDocument) == null) { + document = processor.execute(document); + if (document == null) { return null; } } finally { newValues.add(ingestDocument.getIngestMetadata().put("_value", previousValue)); } } - ingestDocument.setFieldValue(field, newValues); - return ingestDocument; + document.setFieldValue(field, newValues); + return document; } @Override