diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index 8f1ad6ede6d3d..5f62f31eab5ec 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -8,15 +8,12 @@ package org.elasticsearch.ingest; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.core.Tuple; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.LongSupplier; import java.util.stream.Collectors; @@ -31,8 +28,6 @@ public class CompoundProcessor implements Processor { public static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag"; public static final String ON_FAILURE_PIPELINE_FIELD = "on_failure_pipeline"; - private static final Logger logger = LogManager.getLogger(CompoundProcessor.class); - private final boolean ignoreFailure; private final List processors; private final List onFailureProcessors; @@ -198,43 +193,24 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC final IngestMetric finalMetric = processorsWithMetrics.get(currentProcessor).v2(); final Processor finalProcessor = processorsWithMetrics.get(currentProcessor).v1(); final IngestDocument finalIngestDocument = ingestDocument; - /* - * Our assumption is that the listener passed to the processor is only ever called once. However, there is no way to enforce - * that in all processors and all the code that they call. If the listener is called more than once it causes problems - * such as the metrics being wrong. The listenerHasBeenCalled variable is used to make sure that the code in the listener - * is only executed once. - */ - final AtomicBoolean listenerHasBeenCalled = new AtomicBoolean(false); finalMetric.preIngest(); - final AtomicBoolean postIngestHasBeenCalled = new AtomicBoolean(false); try { finalProcessor.execute(ingestDocument, (result, e) -> { - if (listenerHasBeenCalled.getAndSet(true)) { - logger.warn("A listener was unexpectedly called more than once", new RuntimeException(e)); - assert false : "A listener was unexpectedly called more than once"; + long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos; + finalMetric.postIngest(ingestTimeInNanos); + if (e != null) { + executeOnFailureOuter(finalCurrentProcessor, finalIngestDocument, handler, finalProcessor, finalMetric, e); } else { - long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos; - finalMetric.postIngest(ingestTimeInNanos); - postIngestHasBeenCalled.set(true); - if (e != null) { - executeOnFailureOuter(finalCurrentProcessor, finalIngestDocument, handler, finalProcessor, finalMetric, e); + if (result != null) { + innerExecute(nextProcessor, result, handler); } else { - if (result != null) { - innerExecute(nextProcessor, result, handler); - } else { - handler.accept(null, null); - } + handler.accept(null, null); } } }); } catch (Exception e) { long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos; - if (postIngestHasBeenCalled.get()) { - logger.warn("Preventing postIngest from being called more than once", e); - assert false : "Attempt to call postIngest more than once"; - } else { - finalMetric.postIngest(ingestTimeInNanos); - } + finalMetric.postIngest(ingestTimeInNanos); executeOnFailureOuter(finalCurrentProcessor, finalIngestDocument, handler, finalProcessor, finalMetric, e); } } diff --git a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java index 597307d433d6f..528bb402a59e8 100644 --- a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java @@ -8,8 +8,6 @@ package org.elasticsearch.ingest; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.common.logging.DeprecationCategory; import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.script.DynamicMap; @@ -28,7 +26,6 @@ import java.util.ListIterator; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.LongSupplier; @@ -48,8 +45,6 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP return value; }); - private static final Logger logger = LogManager.getLogger(ConditionalProcessor.class); - static final String TYPE = "conditional"; private final Script condition; @@ -125,27 +120,15 @@ public void execute(IngestDocument ingestDocument, BiConsumer { - if (listenerHasBeenCalled.getAndSet(true)) { - logger.warn("A listener was unexpectedly called more than once", new RuntimeException(e)); - assert false : "A listener was unexpectedly called more than once"; + long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos; + metric.postIngest(ingestTimeInNanos); + if (e != null) { + metric.ingestFailed(); + handler.accept(null, e); } else { - long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos; - metric.postIngest(ingestTimeInNanos); - if (e != null) { - metric.ingestFailed(); - handler.accept(null, e); - } else { - handler.accept(result, null); - } + handler.accept(result, null); } }); } else { diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 3362aae1b513a..123731f542583 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -72,7 +72,6 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -905,78 +904,63 @@ private void innerExecute( VersionType versionType = indexRequest.versionType(); Map sourceAsMap = indexRequest.sourceAsMap(); IngestDocument ingestDocument = new IngestDocument(index, id, version, routing, versionType, sourceAsMap); - /* - * Our assumption is that the listener passed to the processor is only ever called once. However, there is no way to enforce - * that in all processors and all of the code that they call. If the listener is called more than once it causes problems - * such as the metrics being wrong. The listenerHasBeenCalled variable is used to make sure that the code in the listener - * is only executed once. - */ - final AtomicBoolean listenerHasBeenCalled = new AtomicBoolean(false); ingestDocument.executePipeline(pipeline, (result, e) -> { - if (listenerHasBeenCalled.getAndSet(true)) { - logger.warn("A listener was unexpectedly called more than once", new RuntimeException(e)); - assert false : "A listener was unexpectedly called more than once"; + if (e != null) { + handler.accept(e); + } else if (result == null) { + itemDroppedHandler.accept(slot); + handler.accept(null); } else { - if (e != null) { - handler.accept(e); - } else if (result == null) { - itemDroppedHandler.accept(slot); - handler.accept(null); - } else { - org.elasticsearch.script.Metadata metadata = ingestDocument.getMetadata(); - - // it's fine to set all metadata fields all the time, as ingest document holds their starting values - // before ingestion, which might also get modified during ingestion. - indexRequest.index(metadata.getIndex()); - indexRequest.id(metadata.getId()); - indexRequest.routing(metadata.getRouting()); - indexRequest.version(metadata.getVersion()); - if (metadata.getVersionType() != null) { - indexRequest.versionType(VersionType.fromString(metadata.getVersionType())); - } - Number number; - if ((number = metadata.getIfSeqNo()) != null) { - indexRequest.setIfSeqNo(number.longValue()); - } - if ((number = metadata.getIfPrimaryTerm()) != null) { - indexRequest.setIfPrimaryTerm(number.longValue()); - } - try { - boolean ensureNoSelfReferences = ingestDocument.doNoSelfReferencesCheck(); - indexRequest.source(ingestDocument.getSource(), indexRequest.getContentType(), ensureNoSelfReferences); - } catch (IllegalArgumentException ex) { - // An IllegalArgumentException can be thrown when an ingest processor creates a source map that is self-referencing. - // In that case, we catch and wrap the exception, so we can include which pipeline failed. - handler.accept( - new IllegalArgumentException( - "Failed to generate the source document for ingest pipeline [" + pipeline.getId() + "]", - ex - ) - ); - return; - } catch (Exception ex) { - // If anything goes wrong here, we want to know, and cannot proceed with normal execution. For example, - // *rarely*, a ConcurrentModificationException could be thrown if a pipeline leaks a reference to a shared mutable - // collection, and another indexing thread modifies the shared reference while we're trying to ensure it has - // no self references. - handler.accept( - new RuntimeException( - "Failed to generate the source document for ingest pipeline [" + pipeline.getId() + "]", - ex - ) - ); - return; - } - Map map; - if ((map = metadata.getDynamicTemplates()) != null) { - Map mergedDynamicTemplates = new HashMap<>(indexRequest.getDynamicTemplates()); - mergedDynamicTemplates.putAll(map); - indexRequest.setDynamicTemplates(mergedDynamicTemplates); - } - postIngest(ingestDocument, indexRequest); - - handler.accept(null); + org.elasticsearch.script.Metadata metadata = ingestDocument.getMetadata(); + + // it's fine to set all metadata fields all the time, as ingest document holds their starting values + // before ingestion, which might also get modified during ingestion. + indexRequest.index(metadata.getIndex()); + indexRequest.id(metadata.getId()); + indexRequest.routing(metadata.getRouting()); + indexRequest.version(metadata.getVersion()); + if (metadata.getVersionType() != null) { + indexRequest.versionType(VersionType.fromString(metadata.getVersionType())); + } + Number number; + if ((number = metadata.getIfSeqNo()) != null) { + indexRequest.setIfSeqNo(number.longValue()); + } + if ((number = metadata.getIfPrimaryTerm()) != null) { + indexRequest.setIfPrimaryTerm(number.longValue()); + } + try { + boolean ensureNoSelfReferences = ingestDocument.doNoSelfReferencesCheck(); + indexRequest.source(ingestDocument.getSource(), indexRequest.getContentType(), ensureNoSelfReferences); + } catch (IllegalArgumentException ex) { + // An IllegalArgumentException can be thrown when an ingest processor creates a source map that is self-referencing. + // In that case, we catch and wrap the exception, so we can include which pipeline failed. + handler.accept( + new IllegalArgumentException( + "Failed to generate the source document for ingest pipeline [" + pipeline.getId() + "]", + ex + ) + ); + return; + } catch (Exception ex) { + // If anything goes wrong here, we want to know, and cannot proceed with normal execution. For example, + // *rarely*, a ConcurrentModificationException could be thrown if a pipeline leaks a reference to a shared mutable + // collection, and another indexing thread modifies the shared reference while we're trying to ensure it has + // no self references. + handler.accept( + new RuntimeException("Failed to generate the source document for ingest pipeline [" + pipeline.getId() + "]", ex) + ); + return; } + Map map; + if ((map = metadata.getDynamicTemplates()) != null) { + Map mergedDynamicTemplates = new HashMap<>(indexRequest.getDynamicTemplates()); + mergedDynamicTemplates.putAll(map); + indexRequest.setDynamicTemplates(mergedDynamicTemplates); + } + postIngest(ingestDocument, indexRequest); + + handler.accept(null); } }); } diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java index a18119bdffba7..f4d60ecf5ce34 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -8,8 +8,6 @@ package org.elasticsearch.ingest; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.core.Nullable; import org.elasticsearch.script.ScriptService; @@ -17,7 +15,6 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.LongSupplier; @@ -32,8 +29,6 @@ public final class Pipeline { public static final String ON_FAILURE_KEY = "on_failure"; public static final String META_KEY = "_meta"; - private static final Logger logger = LogManager.getLogger(Pipeline.class); - private final String id; @Nullable private final String description; @@ -119,20 +114,14 @@ public void execute(IngestDocument ingestDocument, BiConsumer { - if (listenerHasBeenCalled.getAndSet(true)) { - logger.warn("A listener was unexpectedly called more than once", new RuntimeException(e)); - assert false : "A listener was unexpectedly called more than once"; - } else { - long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos; - metrics.postIngest(ingestTimeInNanos); - if (e != null) { - metrics.ingestFailed(); - } - handler.accept(result, e); + long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos; + metrics.postIngest(ingestTimeInNanos); + if (e != null) { + metrics.ingestFailed(); } + handler.accept(result, e); }); }