diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index d9dba2cc10073..d58a48e70c9ad 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.grok.Grok; import org.elasticsearch.grok.ThreadWatchdog; +import org.elasticsearch.ingest.PipelineProcessor; import org.elasticsearch.ingest.Processor; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.IngestPlugin; diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml index 355ba2d42104a..c7c5df1e06f99 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml @@ -110,4 +110,4 @@ teardown: pipeline: "outer" body: {} - match: { error.root_cause.0.type: "exception" } -- match: { error.root_cause.0.reason: "java.lang.IllegalArgumentException: java.lang.IllegalStateException: Recursive invocation of pipeline [inner] detected." } +- match: { error.root_cause.0.reason: "java.lang.IllegalArgumentException: java.lang.IllegalStateException: Cycle detected for pipeline: inner" } diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml index 8b3ed313314bb..fca80ab8fac1d 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml @@ -605,3 +605,150 @@ teardown: - length: { docs.0.processor_results.1: 2 } - match: { docs.0.processor_results.1.tag: "rename-1" } - match: { docs.0.processor_results.1.doc._source.new_status: 200 } + +--- +"Test verbose simulate with Pipeline Processor with Circular Pipelines": +- do: + ingest.put_pipeline: + id: "outer" + body: > + { + "description" : "outer pipeline", + "processors" : [ + { + "pipeline" : { + "pipeline": "inner" + } + } + ] + } +- match: { acknowledged: true } + +- do: + ingest.put_pipeline: + id: "inner" + body: > + { + "description" : "inner pipeline", + "processors" : [ + { + "pipeline" : { + "pipeline": "outer" + } + } + ] + } +- match: { acknowledged: true } + +- do: + catch: /illegal_state_exception/ + ingest.simulate: + verbose: true + body: > + { + "pipeline": { + "processors" : [ + { + "pipeline" : { + "pipeline": "outer" + } + } + ] + } + , + "docs": [ + { + "_index": "index", + "_type": "type", + "_id": "id", + "_source": { + "field1": "123.42 400 " + } + } + ] + } +- match: { error.root_cause.0.type: "illegal_state_exception" } +- match: { error.root_cause.0.reason: "Cycle detected for pipeline: inner" } + +--- +"Test verbose simulate with Pipeline Processor with Multiple Pipelines": +- do: + ingest.put_pipeline: + id: "pipeline1" + body: > + { + "processors": [ + { + "set": { + "field": "pipeline1", + "value": true + } + }, + { + "pipeline": { + "pipeline": "pipeline2" + } + } + ] + } +- match: { acknowledged: true } + +- do: + ingest.put_pipeline: + id: "pipeline2" + body: > + { + "processors": [ + { + "set": { + "field": "pipeline2", + "value": true + } + } + ] + } +- match: { acknowledged: true } + +- do: + ingest.simulate: + verbose: true + body: > + { + "pipeline": { + "processors": [ + { + "set": { + "field": "pipeline0", + "value": true + } + }, + { + "pipeline": { + "pipeline": "pipeline1" + } + } + ] + }, + "docs": [ + { + "_index": "index", + "_type": "type", + "_id": "id", + "_source": { + "field1": "123.42 400 " + } + } + ] + } +- length: { docs: 1 } +- length: { docs.0.processor_results: 3 } +- match: { docs.0.processor_results.0.doc._source.pipeline0: true } +- is_false: docs.0.processor_results.0.doc._source.pipeline1 +- is_false: docs.0.processor_results.0.doc._source.pipeline2 +- match: { docs.0.processor_results.1.doc._source.pipeline0: true } +- match: { docs.0.processor_results.1.doc._source.pipeline1: true } +- is_false: docs.0.processor_results.1.doc._source.pipeline2 +- match: { docs.0.processor_results.2.doc._source.pipeline0: true } +- match: { docs.0.processor_results.2.doc._source.pipeline1: true } +- match: { docs.0.processor_results.2.doc._source.pipeline2: true } + diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java index 430da9955bafa..c081707f4dbda 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java @@ -24,12 +24,16 @@ import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.CompoundProcessor; +import org.elasticsearch.ingest.PipelineProcessor; import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; +import java.util.Collections; +import java.util.IdentityHashMap; import java.util.List; +import java.util.Set; -import static org.elasticsearch.action.ingest.TrackingResultProcessor.decorate; +import static org.elasticsearch.ingest.TrackingResultProcessor.decorate; class SimulateExecutionService { @@ -42,11 +46,15 @@ class SimulateExecutionService { } SimulateDocumentResult executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose) { + // Prevent cycles in pipeline decoration + final Set pipelinesSeen = Collections.newSetFromMap(new IdentityHashMap<>()); if (verbose) { List processorResultList = new ArrayList<>(); - CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList); + CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList, pipelinesSeen); try { - verbosePipelineProcessor.execute(ingestDocument); + Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), + verbosePipelineProcessor); + ingestDocument.executePipeline(verbosePipeline); return new SimulateDocumentVerboseResult(processorResultList); } catch (Exception e) { return new SimulateDocumentVerboseResult(processorResultList); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 5ea1b3ac56696..1ad2d0f747221 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -649,10 +649,14 @@ private static Object deepCopy(Object value) { * @throws Exception On exception in pipeline execution */ public IngestDocument executePipeline(Pipeline pipeline) throws Exception { - if (this.executedPipelines.add(pipeline) == false) { - throw new IllegalStateException("Recursive invocation of pipeline [" + pipeline.getId() + "] detected."); + try { + if (this.executedPipelines.add(pipeline) == false) { + throw new IllegalStateException("Cycle detected for pipeline: " + pipeline.getId()); + } + return pipeline.execute(this); + } finally { + executedPipelines.remove(pipeline); } - return pipeline.execute(this); } @Override diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestMetric.java b/server/src/main/java/org/elasticsearch/ingest/IngestMetric.java new file mode 100644 index 0000000000000..4e809aa5b444e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/ingest/IngestMetric.java @@ -0,0 +1,95 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest; + +import org.elasticsearch.common.metrics.CounterMetric; +import org.elasticsearch.common.metrics.MeanMetric; + +/** + *

Metrics to measure ingest actions. + *

This counts measure documents and timings for a given scope. + * The scope is determined by the calling code. For example you can use this class to count all documents across all pipeline, + * or you can use this class to count documents for a given pipeline or a specific processor. + * This class does not make assumptions about it's given scope. + */ +class IngestMetric { + + /** + * The time it takes to complete the measured item. + */ + private final MeanMetric ingestTime = new MeanMetric(); + /** + * The current count of things being measure. Should most likely ever be 0 or 1. + * Useful when aggregating multiple metrics to see how many things are in flight. + */ + private final CounterMetric ingestCurrent = new CounterMetric(); + /** + * The ever increasing count of things being measured + */ + private final CounterMetric ingestCount = new CounterMetric(); + /** + * The only increasing count of failures + */ + private final CounterMetric ingestFailed = new CounterMetric(); + + /** + * Call this prior to the ingest action. + */ + void preIngest() { + ingestCurrent.inc(); + } + + /** + * Call this after the performing the ingest action, even if the action failed. + * @param ingestTimeInMillis The time it took to perform the action. + */ + void postIngest(long ingestTimeInMillis) { + ingestCurrent.dec(); + ingestTime.inc(ingestTimeInMillis); + ingestCount.inc(); + } + + /** + * Call this if the ingest action failed. + */ + void ingestFailed() { + ingestFailed.inc(); + } + + /** + *

Add two sets of metrics together. + *

Note - this method does not add the current count values. + * The current count value is ephemeral and requires a increase/decrease operation pairs to keep the value correct. + * + * @param metrics The metric to add. + */ + void add(IngestMetric metrics) { + ingestCount.inc(metrics.ingestCount.count()); + ingestTime.inc(metrics.ingestTime.sum()); + ingestFailed.inc(metrics.ingestFailed.count()); + } + + /** + * Creates a serializable representation for these metrics. + */ + IngestStats.Stats createStats() { + return new IngestStats.Stats(ingestCount.count(), ingestTime.sum(), ingestCurrent.count(), ingestFailed.count()); + } +} diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 3cba98a45016a..5bc24a367da33 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -23,16 +23,16 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.stream.Collectors; + import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; @@ -50,8 +50,6 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.metrics.CounterMetric; -import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -80,8 +78,7 @@ public class IngestService implements ClusterStateApplier { // are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around. private volatile Map pipelines = new HashMap<>(); private final ThreadPool threadPool; - private final StatsHolder totalStats = new StatsHolder(); - private volatile Map statsHolderPerPipeline = Collections.emptyMap(); + private final IngestMetric totalMetrics = new IngestMetric(); public IngestService(ClusterService clusterService, ThreadPool threadPool, Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, @@ -258,10 +255,16 @@ Map pipelines() { @Override public void applyClusterState(final ClusterChangedEvent event) { ClusterState state = event.state(); + Map originalPipelines = pipelines; innerUpdatePipelines(event.previousState(), state); - IngestMetadata ingestMetadata = state.getMetaData().custom(IngestMetadata.TYPE); - if (ingestMetadata != null) { - updatePipelineStats(ingestMetadata); + //pipelines changed, so add the old metrics to the new metrics + if (originalPipelines != pipelines) { + pipelines.forEach((id, pipeline) -> { + Pipeline originalPipeline = originalPipelines.get(id); + if (originalPipeline != null) { + pipeline.getMetrics().add(originalPipeline.getMetrics()); + } + }); } } @@ -326,6 +329,7 @@ void validatePipeline(Map ingestInfos, PutPipelineReq public void executeBulkRequest(Iterable> actionRequests, BiConsumer itemFailureHandler, Consumer completionHandler, Consumer itemDroppedHandler) { + threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() { @Override @@ -368,37 +372,11 @@ protected void doRun() { } public IngestStats stats() { - Map statsHolderPerPipeline = this.statsHolderPerPipeline; - Map statsPerPipeline = new HashMap<>(statsHolderPerPipeline.size()); - for (Map.Entry entry : statsHolderPerPipeline.entrySet()) { - statsPerPipeline.put(entry.getKey(), entry.getValue().createStats()); - } + Map statsPerPipeline = + pipelines.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, v -> v.getValue().getMetrics().createStats())); - return new IngestStats(totalStats.createStats(), statsPerPipeline); - } - - void updatePipelineStats(IngestMetadata ingestMetadata) { - boolean changed = false; - Map newStatsPerPipeline = new HashMap<>(statsHolderPerPipeline); - Iterator iterator = newStatsPerPipeline.keySet().iterator(); - while (iterator.hasNext()) { - String pipeline = iterator.next(); - if (ingestMetadata.getPipelines().containsKey(pipeline) == false) { - iterator.remove(); - changed = true; - } - } - for (String pipeline : ingestMetadata.getPipelines().keySet()) { - if (newStatsPerPipeline.containsKey(pipeline) == false) { - newStatsPerPipeline.put(pipeline, new StatsHolder()); - changed = true; - } - } - - if (changed) { - statsHolderPerPipeline = Collections.unmodifiableMap(newStatsPerPipeline); - } + return new IngestStats(totalMetrics.createStats(), statsPerPipeline); } private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer itemDroppedHandler) throws Exception { @@ -409,10 +387,8 @@ private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer long startTimeInNanos = System.nanoTime(); // the pipeline specific stat holder may not exist and that is fine: // (e.g. the pipeline may have been removed while we're ingesting a document - Optional pipelineStats = Optional.ofNullable(statsHolderPerPipeline.get(pipeline.getId())); try { - totalStats.preIngest(); - pipelineStats.ifPresent(StatsHolder::preIngest); + totalMetrics.preIngest(); String index = indexRequest.index(); String type = indexRequest.type(); String id = indexRequest.id(); @@ -438,13 +414,11 @@ private void innerExecute(IndexRequest indexRequest, Pipeline pipeline, Consumer indexRequest.source(ingestDocument.getSourceAndMetadata()); } } catch (Exception e) { - totalStats.ingestFailed(); - pipelineStats.ifPresent(StatsHolder::ingestFailed); + totalMetrics.ingestFailed(); throw e; } finally { long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos); - totalStats.postIngest(ingestTimeInMillis); - pipelineStats.ifPresent(statsHolder -> statsHolder.postIngest(ingestTimeInMillis)); + totalMetrics.postIngest(ingestTimeInMillis); } } @@ -481,27 +455,4 @@ private void innerUpdatePipelines(ClusterState previousState, ClusterState state ExceptionsHelper.rethrowAndSuppress(exceptions); } - private static class StatsHolder { - - private final MeanMetric ingestMetric = new MeanMetric(); - private final CounterMetric ingestCurrent = new CounterMetric(); - private final CounterMetric ingestFailed = new CounterMetric(); - - void preIngest() { - ingestCurrent.inc(); - } - - void postIngest(long ingestTimeInMillis) { - ingestCurrent.dec(); - ingestMetric.inc(ingestTimeInMillis); - } - - void ingestFailed() { - ingestFailed.inc(); - } - - IngestStats.Stats createStats() { - return new IngestStats.Stats(ingestMetric.count(), ingestMetric.sum(), ingestCurrent.count(), ingestFailed.count()); - } - } } diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java index 1d345ea5f7884..8d5f6d6ff7c54 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -22,10 +22,12 @@ import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.Nullable; +import java.time.Clock; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; + import org.elasticsearch.script.ScriptService; /** @@ -44,12 +46,21 @@ public final class Pipeline { @Nullable private final Integer version; private final CompoundProcessor compoundProcessor; + private final IngestMetric metrics; + private final Clock clock; public Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor) { + this(id, description, version, compoundProcessor, Clock.systemUTC()); + } + + //package private for testing + Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor, Clock clock) { this.id = id; this.description = description; this.compoundProcessor = compoundProcessor; this.version = version; + this.metrics = new IngestMetric(); + this.clock = clock; } public static Pipeline create(String id, Map config, @@ -78,7 +89,17 @@ public static Pipeline create(String id, Map config, * Modifies the data of a document to be indexed based on the processor this pipeline holds */ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { - return compoundProcessor.execute(ingestDocument); + long startTimeInMillis = clock.millis(); + try { + metrics.preIngest(); + return compoundProcessor.execute(ingestDocument); + } catch (Exception e) { + metrics.ingestFailed(); + throw e; + } finally { + long ingestTimeInMillis = clock.millis() - startTimeInMillis; + metrics.postIngest(ingestTimeInMillis); + } } /** @@ -135,4 +156,11 @@ public List getOnFailureProcessors() { public List flattenAllProcessors() { return compoundProcessor.flattenProcessors(); } + + /** + * The metrics associated with this pipeline. + */ + public IngestMetric getMetrics() { + return metrics; + } } diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/PipelineProcessor.java b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java similarity index 87% rename from modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/PipelineProcessor.java rename to server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java index 1958a3e5232b8..918ff6b8aefee 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/PipelineProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java @@ -17,15 +17,9 @@ * under the License. */ -package org.elasticsearch.ingest.common; +package org.elasticsearch.ingest; import java.util.Map; -import org.elasticsearch.ingest.AbstractProcessor; -import org.elasticsearch.ingest.ConfigurationUtils; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.IngestService; -import org.elasticsearch.ingest.Pipeline; -import org.elasticsearch.ingest.Processor; public class PipelineProcessor extends AbstractProcessor { @@ -50,6 +44,10 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { return ingestDocument.executePipeline(pipeline); } + Pipeline getPipeline(){ + return ingestService.getPipeline(pipelineName); + } + @Override public String getType() { return TYPE; diff --git a/server/src/main/java/org/elasticsearch/action/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java similarity index 65% rename from server/src/main/java/org/elasticsearch/action/ingest/TrackingResultProcessor.java rename to server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java index 04c0fe7ca49dc..41a984be5adad 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/TrackingResultProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java @@ -17,14 +17,13 @@ * under the License. */ -package org.elasticsearch.action.ingest; +package org.elasticsearch.ingest; -import org.elasticsearch.ingest.CompoundProcessor; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.Processor; +import org.elasticsearch.action.ingest.SimulateProcessorResult; import java.util.ArrayList; import java.util.List; +import java.util.Set; /** * Processor to be used within Simulate API to keep track of processors executed in pipeline. @@ -35,7 +34,7 @@ public final class TrackingResultProcessor implements Processor { private final List processorResultList; private final boolean ignoreFailure; - public TrackingResultProcessor(boolean ignoreFailure, Processor actualProcessor, List processorResultList) { + TrackingResultProcessor(boolean ignoreFailure, Processor actualProcessor, List processorResultList) { this.ignoreFailure = ignoreFailure; this.processorResultList = processorResultList; this.actualProcessor = actualProcessor; @@ -67,19 +66,35 @@ public String getTag() { return actualProcessor.getTag(); } - public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List processorResultList) { + public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List processorResultList, + Set pipelinesSeen) { List processors = new ArrayList<>(compoundProcessor.getProcessors().size()); for (Processor processor : compoundProcessor.getProcessors()) { - if (processor instanceof CompoundProcessor) { - processors.add(decorate((CompoundProcessor) processor, processorResultList)); + if (processor instanceof PipelineProcessor) { + PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor); + if (pipelinesSeen.add(pipelineProcessor) == false) { + throw new IllegalStateException("Cycle detected for pipeline: " + pipelineProcessor.getPipeline().getId()); + } + processors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList, pipelinesSeen)); + pipelinesSeen.remove(pipelineProcessor); + } else if (processor instanceof CompoundProcessor) { + processors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen)); } else { processors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList)); } } List onFailureProcessors = new ArrayList<>(compoundProcessor.getProcessors().size()); for (Processor processor : compoundProcessor.getOnFailureProcessors()) { - if (processor instanceof CompoundProcessor) { - onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList)); + if (processor instanceof PipelineProcessor) { + PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor); + if (pipelinesSeen.add(pipelineProcessor) == false) { + throw new IllegalStateException("Cycle detected for pipeline: " + pipelineProcessor.getPipeline().getId()); + } + onFailureProcessors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList, + pipelinesSeen)); + pipelinesSeen.remove(pipelineProcessor); + } else if (processor instanceof CompoundProcessor) { + onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen)); } else { onFailureProcessors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList)); } diff --git a/server/src/test/java/org/elasticsearch/action/ingest/TrackingResultProcessorTests.java b/server/src/test/java/org/elasticsearch/action/ingest/TrackingResultProcessorTests.java deleted file mode 100644 index 3572a04529b43..0000000000000 --- a/server/src/test/java/org/elasticsearch/action/ingest/TrackingResultProcessorTests.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.ingest; - -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ingest.TestProcessor; -import org.elasticsearch.ingest.CompoundProcessor; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.test.ESTestCase; -import org.junit.Before; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_MESSAGE_FIELD; -import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD; -import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD; -import static org.elasticsearch.action.ingest.TrackingResultProcessor.decorate; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.Matchers.nullValue; -import static org.hamcrest.Matchers.sameInstance; - -public class TrackingResultProcessorTests extends ESTestCase { - - private IngestDocument ingestDocument; - private List resultList; - - @Before - public void init() { - ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>()); - resultList = new ArrayList<>(); - } - - public void testActualProcessor() throws Exception { - TestProcessor actualProcessor = new TestProcessor(ingestDocument -> {}); - TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); - - SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); - - assertThat(actualProcessor.getInvokedCounter(), equalTo(1)); - assertThat(resultList.size(), equalTo(1)); - - assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); - assertThat(resultList.get(0).getFailure(), nullValue()); - assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag())); - } - - public void testActualCompoundProcessorWithoutOnFailure() throws Exception { - RuntimeException exception = new RuntimeException("processor failed"); - TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); - CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - - try { - trackingProcessor.execute(ingestDocument); - fail("processor should throw exception"); - } catch (ElasticsearchException e) { - assertThat(e.getRootCause().getMessage(), equalTo(exception.getMessage())); - } - - SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument); - assertThat(testProcessor.getInvokedCounter(), equalTo(1)); - assertThat(resultList.size(), equalTo(1)); - assertThat(resultList.get(0).getIngestDocument(), nullValue()); - assertThat(resultList.get(0).getFailure(), equalTo(exception)); - assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFirstResult.getProcessorTag())); - } - - public void testActualCompoundProcessorWithOnFailure() throws Exception { - RuntimeException exception = new RuntimeException("fail"); - TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> { throw exception; }); - TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {}); - CompoundProcessor actualProcessor = new CompoundProcessor(false, - Arrays.asList(new CompoundProcessor(false, - Arrays.asList(failProcessor, onFailureProcessor), - Arrays.asList(onFailureProcessor, failProcessor))), - Arrays.asList(onFailureProcessor)); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - trackingProcessor.execute(ingestDocument); - - SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument); - SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), ingestDocument); - - assertThat(failProcessor.getInvokedCounter(), equalTo(2)); - assertThat(onFailureProcessor.getInvokedCounter(), equalTo(2)); - assertThat(resultList.size(), equalTo(4)); - - assertThat(resultList.get(0).getIngestDocument(), nullValue()); - assertThat(resultList.get(0).getFailure(), equalTo(exception)); - assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag())); - - Map metadata = resultList.get(1).getIngestDocument().getIngestMetadata(); - assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail")); - assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test")); - assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail")); - assertThat(resultList.get(1).getFailure(), nullValue()); - assertThat(resultList.get(1).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag())); - - assertThat(resultList.get(2).getIngestDocument(), nullValue()); - assertThat(resultList.get(2).getFailure(), equalTo(exception)); - assertThat(resultList.get(2).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag())); - - metadata = resultList.get(3).getIngestDocument().getIngestMetadata(); - assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail")); - assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test")); - assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail")); - assertThat(resultList.get(3).getFailure(), nullValue()); - assertThat(resultList.get(3).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag())); - } - - public void testActualCompoundProcessorWithIgnoreFailure() throws Exception { - RuntimeException exception = new RuntimeException("processor failed"); - TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); - CompoundProcessor actualProcessor = new CompoundProcessor(true, Collections.singletonList(testProcessor), - Collections.emptyList()); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); - - trackingProcessor.execute(ingestDocument); - - SimulateProcessorResult expectedResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument); - assertThat(testProcessor.getInvokedCounter(), equalTo(1)); - assertThat(resultList.size(), equalTo(1)); - assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); - assertThat(resultList.get(0).getFailure(), sameInstance(exception)); - assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag())); - } -} diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 8d1302a2ada0e..afae36427ad17 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -19,16 +19,6 @@ package org.elasticsearch.ingest; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ExecutorService; -import java.util.function.BiConsumer; -import java.util.function.Consumer; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; @@ -59,13 +49,22 @@ import org.mockito.ArgumentMatcher; import org.mockito.invocation.InvocationOnMock; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; @@ -769,16 +768,14 @@ public void testStats() { previousClusterState = clusterState; clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); - final Map configurationMap = new HashMap<>(); - configurationMap.put("_id1", new PipelineConfiguration("_id1", new BytesArray("{}"), XContentType.JSON)); - configurationMap.put("_id2", new PipelineConfiguration("_id2", new BytesArray("{}"), XContentType.JSON)); - ingestService.updatePipelineStats(new IngestMetadata(configurationMap)); + @SuppressWarnings("unchecked") final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") final Consumer completionHandler = mock(Consumer.class); final IndexRequest indexRequest = new IndexRequest("_index"); indexRequest.setPipeline("_id1"); + indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10)); ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterFirstRequestStats = ingestService.stats(); assertThat(afterFirstRequestStats.getStatsPerPipeline().size(), equalTo(2)); @@ -793,23 +790,21 @@ public void testStats() { assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L)); assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L)); assertThat(afterSecondRequestStats.getTotalStats().getIngestCount(), equalTo(2L)); - } - // issue: https://github.com/elastic/elasticsearch/issues/18126 - public void testUpdatingStatsWhenRemovingPipelineWorks() { - IngestService ingestService = createWithProcessors(); - Map configurationMap = new HashMap<>(); - configurationMap.put("_id1", new PipelineConfiguration("_id1", new BytesArray("{}"), XContentType.JSON)); - configurationMap.put("_id2", new PipelineConfiguration("_id2", new BytesArray("{}"), XContentType.JSON)); - ingestService.updatePipelineStats(new IngestMetadata(configurationMap)); - assertThat(ingestService.stats().getStatsPerPipeline(), hasKey("_id1")); - assertThat(ingestService.stats().getStatsPerPipeline(), hasKey("_id2")); - - configurationMap = new HashMap<>(); - configurationMap.put("_id3", new PipelineConfiguration("_id3", new BytesArray("{}"), XContentType.JSON)); - ingestService.updatePipelineStats(new IngestMetadata(configurationMap)); - assertThat(ingestService.stats().getStatsPerPipeline(), not(hasKey("_id1"))); - assertThat(ingestService.stats().getStatsPerPipeline(), not(hasKey("_id2"))); + //update cluster state and ensure that new stats are added to old stats + putRequest = new PutPipelineRequest("_id1", + new BytesArray("{\"processors\": [{\"mock\" : {}}, {\"mock\" : {}}]}"), XContentType.JSON); + previousClusterState = clusterState; + clusterState = IngestService.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + indexRequest.setPipeline("_id1"); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); + final IngestStats afterThirdRequestStats = ingestService.stats(); + assertThat(afterThirdRequestStats.getStatsPerPipeline().size(), equalTo(2)); + assertThat(afterThirdRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(2L)); + assertThat(afterThirdRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L)); + assertThat(afterThirdRequestStats.getTotalStats().getIngestCount(), equalTo(3L)); + } private IngestDocument eqIndexTypeId(final Map source) { diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java similarity index 50% rename from modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java rename to server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java index 3103fb0392e96..018ded346d4fc 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java @@ -16,21 +16,19 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.ingest.common; +package org.elasticsearch.ingest; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.test.ESTestCase; + +import java.time.Clock; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ingest.CompoundProcessor; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.IngestService; -import org.elasticsearch.ingest.Pipeline; -import org.elasticsearch.ingest.Processor; -import org.elasticsearch.ingest.RandomDocumentPicks; -import org.elasticsearch.test.ESTestCase; +import static org.hamcrest.CoreMatchers.equalTo; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -110,7 +108,100 @@ public void testThrowsOnRecursivePipelineInvocations() throws Exception { () -> factory.create(Collections.emptyMap(), null, outerConfig).execute(testIngestDocument) ); assertEquals( - "Recursive invocation of pipeline [inner] detected.", e.getRootCause().getMessage() + "Cycle detected for pipeline: inner", e.getRootCause().getMessage() ); } + + public void testAllowsRepeatedPipelineInvocations() throws Exception { + String innerPipelineId = "inner"; + IngestService ingestService = mock(IngestService.class); + IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); + Map outerConfig = new HashMap<>(); + outerConfig.put("pipeline", innerPipelineId); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + Pipeline inner = new Pipeline( + innerPipelineId, null, null, new CompoundProcessor() + ); + when(ingestService.getPipeline(innerPipelineId)).thenReturn(inner); + Processor outerProc = factory.create(Collections.emptyMap(), null, outerConfig); + outerProc.execute(testIngestDocument); + outerProc.execute(testIngestDocument); + } + + public void testPipelineProcessorWithPipelineChain() throws Exception { + String pipeline1Id = "pipeline1"; + String pipeline2Id = "pipeline2"; + String pipeline3Id = "pipeline3"; + IngestService ingestService = mock(IngestService.class); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + + Map pipeline1ProcessorConfig = new HashMap<>(); + pipeline1ProcessorConfig.put("pipeline", pipeline2Id); + PipelineProcessor pipeline1Processor = factory.create(Collections.emptyMap(), null, pipeline1ProcessorConfig); + + Map pipeline2ProcessorConfig = new HashMap<>(); + pipeline2ProcessorConfig.put("pipeline", pipeline3Id); + PipelineProcessor pipeline2Processor = factory.create(Collections.emptyMap(), null, pipeline2ProcessorConfig); + + Clock clock = mock(Clock.class); + when(clock.millis()).thenReturn(0L).thenReturn(0L); + Pipeline pipeline1 = new Pipeline( + pipeline1Id, null, null, new CompoundProcessor(pipeline1Processor), clock + ); + + String key1 = randomAlphaOfLength(10); + clock = mock(Clock.class); + when(clock.millis()).thenReturn(0L).thenReturn(3L); + Pipeline pipeline2 = new Pipeline( + pipeline2Id, null, null, new CompoundProcessor(true, + Arrays.asList( + new TestProcessor(ingestDocument -> { + ingestDocument.setFieldValue(key1, randomInt()); + }), + pipeline2Processor), + Collections.emptyList()), + clock + ); + clock = mock(Clock.class); + when(clock.millis()).thenReturn(0L).thenReturn(2L); + Pipeline pipeline3 = new Pipeline( + pipeline3Id, null, null, new CompoundProcessor( + new TestProcessor(ingestDocument -> { + throw new RuntimeException("error"); + })), clock + ); + when(ingestService.getPipeline(pipeline1Id)).thenReturn(pipeline1); + when(ingestService.getPipeline(pipeline2Id)).thenReturn(pipeline2); + when(ingestService.getPipeline(pipeline3Id)).thenReturn(pipeline3); + + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); + //start the chain + ingestDocument.executePipeline(pipeline1); + assertNotNull(ingestDocument.getSourceAndMetadata().get(key1)); + + //check the stats + IngestStats.Stats pipeline1Stats = pipeline1.getMetrics().createStats(); + IngestStats.Stats pipeline2Stats = pipeline2.getMetrics().createStats(); + IngestStats.Stats pipeline3Stats = pipeline3.getMetrics().createStats(); + + //current + assertThat(pipeline1Stats.getIngestCurrent(), equalTo(0L)); + assertThat(pipeline2Stats.getIngestCurrent(), equalTo(0L)); + assertThat(pipeline3Stats.getIngestCurrent(), equalTo(0L)); + + //count + assertThat(pipeline1Stats.getIngestCount(), equalTo(1L)); + assertThat(pipeline2Stats.getIngestCount(), equalTo(1L)); + assertThat(pipeline3Stats.getIngestCount(), equalTo(1L)); + + //time + assertThat(pipeline1Stats.getIngestTimeInMillis(), equalTo(0L)); + assertThat(pipeline2Stats.getIngestTimeInMillis(), equalTo(3L)); + assertThat(pipeline3Stats.getIngestTimeInMillis(), equalTo(2L)); + + //failure + assertThat(pipeline1Stats.getIngestFailedCount(), equalTo(0L)); + assertThat(pipeline2Stats.getIngestFailedCount(), equalTo(0L)); + assertThat(pipeline3Stats.getIngestFailedCount(), equalTo(1L)); + } } diff --git a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java new file mode 100644 index 0000000000000..7a7f9b773727f --- /dev/null +++ b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java @@ -0,0 +1,315 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ingest.SimulateProcessorResult; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_MESSAGE_FIELD; +import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD; +import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD; +import static org.elasticsearch.ingest.TrackingResultProcessor.decorate; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TrackingResultProcessorTests extends ESTestCase { + + private IngestDocument ingestDocument; + private List resultList; + private Set pipelinesSeen; + + @Before + public void init() { + ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>()); + resultList = new ArrayList<>(); + pipelinesSeen = Collections.newSetFromMap(new IdentityHashMap<>()); + } + + public void testActualProcessor() throws Exception { + TestProcessor actualProcessor = new TestProcessor(ingestDocument -> {}); + TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, resultList); + trackingProcessor.execute(ingestDocument); + + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + + assertThat(actualProcessor.getInvokedCounter(), equalTo(1)); + assertThat(resultList.size(), equalTo(1)); + + assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(0).getFailure(), nullValue()); + assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag())); + } + + public void testActualCompoundProcessorWithoutOnFailure() throws Exception { + RuntimeException exception = new RuntimeException("processor failed"); + TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); + CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + + try { + trackingProcessor.execute(ingestDocument); + fail("processor should throw exception"); + } catch (ElasticsearchException e) { + assertThat(e.getRootCause().getMessage(), equalTo(exception.getMessage())); + } + + SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument); + assertThat(testProcessor.getInvokedCounter(), equalTo(1)); + assertThat(resultList.size(), equalTo(1)); + assertThat(resultList.get(0).getIngestDocument(), nullValue()); + assertThat(resultList.get(0).getFailure(), equalTo(exception)); + assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFirstResult.getProcessorTag())); + } + + public void testActualCompoundProcessorWithOnFailure() throws Exception { + RuntimeException exception = new RuntimeException("fail"); + TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> { throw exception; }); + TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {}); + CompoundProcessor actualProcessor = new CompoundProcessor(false, + Arrays.asList(new CompoundProcessor(false, + Arrays.asList(failProcessor, onFailureProcessor), + Arrays.asList(onFailureProcessor, failProcessor))), + Arrays.asList(onFailureProcessor)); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + trackingProcessor.execute(ingestDocument); + + SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument); + SimulateProcessorResult expectedSuccessResult = new SimulateProcessorResult(onFailureProcessor.getTag(), ingestDocument); + + assertThat(failProcessor.getInvokedCounter(), equalTo(2)); + assertThat(onFailureProcessor.getInvokedCounter(), equalTo(2)); + assertThat(resultList.size(), equalTo(4)); + + assertThat(resultList.get(0).getIngestDocument(), nullValue()); + assertThat(resultList.get(0).getFailure(), equalTo(exception)); + assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag())); + + Map metadata = resultList.get(1).getIngestDocument().getIngestMetadata(); + assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail")); + assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test")); + assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail")); + assertThat(resultList.get(1).getFailure(), nullValue()); + assertThat(resultList.get(1).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag())); + + assertThat(resultList.get(2).getIngestDocument(), nullValue()); + assertThat(resultList.get(2).getFailure(), equalTo(exception)); + assertThat(resultList.get(2).getProcessorTag(), equalTo(expectedFailResult.getProcessorTag())); + + metadata = resultList.get(3).getIngestDocument().getIngestMetadata(); + assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail")); + assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test")); + assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail")); + assertThat(resultList.get(3).getFailure(), nullValue()); + assertThat(resultList.get(3).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag())); + } + + public void testActualCompoundProcessorWithIgnoreFailure() throws Exception { + RuntimeException exception = new RuntimeException("processor failed"); + TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); + CompoundProcessor actualProcessor = new CompoundProcessor(true, Collections.singletonList(testProcessor), + Collections.emptyList()); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + + trackingProcessor.execute(ingestDocument); + + SimulateProcessorResult expectedResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument); + assertThat(testProcessor.getInvokedCounter(), equalTo(1)); + assertThat(resultList.size(), equalTo(1)); + assertThat(resultList.get(0).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(0).getFailure(), sameInstance(exception)); + assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag())); + } + + public void testActualPipelineProcessor() throws Exception { + String pipelineId = "pipeline1"; + IngestService ingestService = mock(IngestService.class); + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put("pipeline", pipelineId); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + + String key1 = randomAlphaOfLength(10); + String key2 = randomAlphaOfLength(10); + String key3 = randomAlphaOfLength(10); + + Pipeline pipeline = new Pipeline( + pipelineId, null, null, new CompoundProcessor( + new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key1, randomInt()); }), + new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key2, randomInt()); }), + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key3, randomInt()); })) + ); + when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); + + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); + CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); + + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + + trackingProcessor.execute(ingestDocument); + + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + + verify(ingestService).getPipeline(pipelineId); + assertThat(resultList.size(), equalTo(3)); + + assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key3)); + + assertTrue(resultList.get(1).getIngestDocument().hasField(key1)); + assertTrue(resultList.get(1).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(1).getIngestDocument().hasField(key3)); + + assertThat(resultList.get(2).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(2).getFailure(), nullValue()); + assertThat(resultList.get(2).getProcessorTag(), nullValue()); + } + + public void testActualPipelineProcessorWithHandledFailure() throws Exception { + RuntimeException exception = new RuntimeException("processor failed"); + + String pipelineId = "pipeline1"; + IngestService ingestService = mock(IngestService.class); + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put("pipeline", pipelineId); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + + String key1 = randomAlphaOfLength(10); + String key2 = randomAlphaOfLength(10); + String key3 = randomAlphaOfLength(10); + + Pipeline pipeline = new Pipeline( + pipelineId, null, null, new CompoundProcessor( + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key1, randomInt()); }), + new CompoundProcessor( + false, + Collections.singletonList(new TestProcessor(ingestDocument -> { throw exception; })), + Collections.singletonList(new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key2, randomInt()); })) + ), + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key3, randomInt()); })) + ); + when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); + + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); + CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); + + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + + trackingProcessor.execute(ingestDocument); + + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + + verify(ingestService).getPipeline(pipelineId); + assertThat(resultList.size(), equalTo(4)); + + assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key3)); + + //failed processor + assertNull(resultList.get(1).getIngestDocument()); + assertThat(resultList.get(1).getFailure().getMessage(), equalTo(exception.getMessage())); + + assertTrue(resultList.get(2).getIngestDocument().hasField(key1)); + assertTrue(resultList.get(2).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(2).getIngestDocument().hasField(key3)); + + assertThat(resultList.get(3).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(3).getFailure(), nullValue()); + assertThat(resultList.get(3).getProcessorTag(), nullValue()); + } + + public void testActualPipelineProcessorWithCycle() throws Exception { + String pipelineId = "pipeline1"; + IngestService ingestService = mock(IngestService.class); + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put("pipeline", pipelineId); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); + Pipeline pipeline = new Pipeline( + pipelineId, null, null, new CompoundProcessor(pipelineProcessor) + ); + when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); + + CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); + + IllegalStateException exception = expectThrows(IllegalStateException.class, + () -> decorate(actualProcessor, resultList, pipelinesSeen)); + assertThat(exception.getMessage(), equalTo("Cycle detected for pipeline: pipeline1")); + } + + + public void testActualPipelineProcessorRepeatedInvocation() throws Exception { + String pipelineId = "pipeline1"; + IngestService ingestService = mock(IngestService.class); + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put("pipeline", pipelineId); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + + String key1 = randomAlphaOfLength(10); + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); + Pipeline pipeline = new Pipeline( + pipelineId, null, null, new CompoundProcessor( + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key1, randomInt()); })) + ); + when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); + + CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor, pipelineProcessor); + + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + + trackingProcessor.execute(ingestDocument); + + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + + verify(ingestService, times(2)).getPipeline(pipelineId); + assertThat(resultList.size(), equalTo(2)); + + assertThat(resultList.get(0).getIngestDocument(), not(equalTo(expectedResult.getIngestDocument()))); + assertThat(resultList.get(0).getFailure(), nullValue()); + assertThat(resultList.get(0).getProcessorTag(), nullValue()); + + assertThat(resultList.get(1).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(1).getFailure(), nullValue()); + assertThat(resultList.get(1).getProcessorTag(), nullValue()); + + //each invocation updates key1 with a random int + assertNotEquals(resultList.get(0).getIngestDocument().getSourceAndMetadata().get(key1), + resultList.get(1).getIngestDocument().getSourceAndMetadata().get(key1)); + } + +}