Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,12 @@

package org.elasticsearch.ingest;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.core.Tuple;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
Expand All @@ -31,8 +28,6 @@ public class CompoundProcessor implements Processor {
public static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag";
public static final String ON_FAILURE_PIPELINE_FIELD = "on_failure_pipeline";

private static final Logger logger = LogManager.getLogger(CompoundProcessor.class);

private final boolean ignoreFailure;
private final List<Processor> processors;
private final List<Processor> onFailureProcessors;
Expand Down Expand Up @@ -198,43 +193,24 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC
final IngestMetric finalMetric = processorsWithMetrics.get(currentProcessor).v2();
final Processor finalProcessor = processorsWithMetrics.get(currentProcessor).v1();
final IngestDocument finalIngestDocument = ingestDocument;
/*
* Our assumption is that the listener passed to the processor is only ever called once. However, there is no way to enforce
* that in all processors and all the code that they call. If the listener is called more than once it causes problems
* such as the metrics being wrong. The listenerHasBeenCalled variable is used to make sure that the code in the listener
* is only executed once.
*/
final AtomicBoolean listenerHasBeenCalled = new AtomicBoolean(false);
finalMetric.preIngest();
final AtomicBoolean postIngestHasBeenCalled = new AtomicBoolean(false);
try {
finalProcessor.execute(ingestDocument, (result, e) -> {
if (listenerHasBeenCalled.getAndSet(true)) {
logger.warn("A listener was unexpectedly called more than once", new RuntimeException(e));
assert false : "A listener was unexpectedly called more than once";
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
finalMetric.postIngest(ingestTimeInNanos);
if (e != null) {
executeOnFailureOuter(finalCurrentProcessor, finalIngestDocument, handler, finalProcessor, finalMetric, e);
} else {
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
finalMetric.postIngest(ingestTimeInNanos);
postIngestHasBeenCalled.set(true);
if (e != null) {
executeOnFailureOuter(finalCurrentProcessor, finalIngestDocument, handler, finalProcessor, finalMetric, e);
if (result != null) {
innerExecute(nextProcessor, result, handler);
} else {
if (result != null) {
innerExecute(nextProcessor, result, handler);
} else {
handler.accept(null, null);
}
handler.accept(null, null);
}
}
});
} catch (Exception e) {
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
if (postIngestHasBeenCalled.get()) {
logger.warn("Preventing postIngest from being called more than once", e);
assert false : "Attempt to call postIngest more than once";
} else {
finalMetric.postIngest(ingestTimeInNanos);
}
finalMetric.postIngest(ingestTimeInNanos);
executeOnFailureOuter(finalCurrentProcessor, finalIngestDocument, handler, finalProcessor, finalMetric, e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.elasticsearch.ingest;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.script.DynamicMap;
Expand All @@ -28,7 +26,6 @@
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
Expand All @@ -48,8 +45,6 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP
return value;
});

private static final Logger logger = LogManager.getLogger(ConditionalProcessor.class);

static final String TYPE = "conditional";

private final Script condition;
Expand Down Expand Up @@ -125,27 +120,15 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex

if (matches) {
final long startTimeInNanos = relativeTimeProvider.getAsLong();
/*
* Our assumption is that the listener passed to the processor is only ever called once. However, there is no way to enforce
* that in all processors and all of the code that they call. If the listener is called more than once it causes problems
* such as the metrics being wrong. The listenerHasBeenCalled variable is used to make sure that the code in the listener
* is only executed once.
*/
final AtomicBoolean listenerHasBeenCalled = new AtomicBoolean(false);
metric.preIngest();
processor.execute(ingestDocument, (result, e) -> {
if (listenerHasBeenCalled.getAndSet(true)) {
logger.warn("A listener was unexpectedly called more than once", new RuntimeException(e));
assert false : "A listener was unexpectedly called more than once";
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
metric.postIngest(ingestTimeInNanos);
if (e != null) {
metric.ingestFailed();
handler.accept(null, e);
} else {
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
metric.postIngest(ingestTimeInNanos);
if (e != null) {
metric.ingestFailed();
handler.accept(null, e);
} else {
handler.accept(result, null);
}
handler.accept(result, null);
}
});
} else {
Expand Down
124 changes: 54 additions & 70 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
Expand Down Expand Up @@ -905,78 +904,63 @@ private void innerExecute(
VersionType versionType = indexRequest.versionType();
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
IngestDocument ingestDocument = new IngestDocument(index, id, version, routing, versionType, sourceAsMap);
/*
* Our assumption is that the listener passed to the processor is only ever called once. However, there is no way to enforce
* that in all processors and all of the code that they call. If the listener is called more than once it causes problems
* such as the metrics being wrong. The listenerHasBeenCalled variable is used to make sure that the code in the listener
* is only executed once.
*/
final AtomicBoolean listenerHasBeenCalled = new AtomicBoolean(false);
ingestDocument.executePipeline(pipeline, (result, e) -> {
if (listenerHasBeenCalled.getAndSet(true)) {
logger.warn("A listener was unexpectedly called more than once", new RuntimeException(e));
assert false : "A listener was unexpectedly called more than once";
if (e != null) {
handler.accept(e);
} else if (result == null) {
itemDroppedHandler.accept(slot);
handler.accept(null);
} else {
if (e != null) {
handler.accept(e);
} else if (result == null) {
itemDroppedHandler.accept(slot);
handler.accept(null);
} else {
org.elasticsearch.script.Metadata metadata = ingestDocument.getMetadata();

// it's fine to set all metadata fields all the time, as ingest document holds their starting values
// before ingestion, which might also get modified during ingestion.
indexRequest.index(metadata.getIndex());
indexRequest.id(metadata.getId());
indexRequest.routing(metadata.getRouting());
indexRequest.version(metadata.getVersion());
if (metadata.getVersionType() != null) {
indexRequest.versionType(VersionType.fromString(metadata.getVersionType()));
}
Number number;
if ((number = metadata.getIfSeqNo()) != null) {
indexRequest.setIfSeqNo(number.longValue());
}
if ((number = metadata.getIfPrimaryTerm()) != null) {
indexRequest.setIfPrimaryTerm(number.longValue());
}
try {
boolean ensureNoSelfReferences = ingestDocument.doNoSelfReferencesCheck();
indexRequest.source(ingestDocument.getSource(), indexRequest.getContentType(), ensureNoSelfReferences);
} catch (IllegalArgumentException ex) {
// An IllegalArgumentException can be thrown when an ingest processor creates a source map that is self-referencing.
// In that case, we catch and wrap the exception, so we can include which pipeline failed.
handler.accept(
new IllegalArgumentException(
"Failed to generate the source document for ingest pipeline [" + pipeline.getId() + "]",
ex
)
);
return;
} catch (Exception ex) {
// If anything goes wrong here, we want to know, and cannot proceed with normal execution. For example,
// *rarely*, a ConcurrentModificationException could be thrown if a pipeline leaks a reference to a shared mutable
// collection, and another indexing thread modifies the shared reference while we're trying to ensure it has
// no self references.
handler.accept(
new RuntimeException(
"Failed to generate the source document for ingest pipeline [" + pipeline.getId() + "]",
ex
)
);
return;
}
Map<String, String> map;
if ((map = metadata.getDynamicTemplates()) != null) {
Map<String, String> mergedDynamicTemplates = new HashMap<>(indexRequest.getDynamicTemplates());
mergedDynamicTemplates.putAll(map);
indexRequest.setDynamicTemplates(mergedDynamicTemplates);
}
postIngest(ingestDocument, indexRequest);

handler.accept(null);
org.elasticsearch.script.Metadata metadata = ingestDocument.getMetadata();

// it's fine to set all metadata fields all the time, as ingest document holds their starting values
// before ingestion, which might also get modified during ingestion.
indexRequest.index(metadata.getIndex());
indexRequest.id(metadata.getId());
indexRequest.routing(metadata.getRouting());
indexRequest.version(metadata.getVersion());
if (metadata.getVersionType() != null) {
indexRequest.versionType(VersionType.fromString(metadata.getVersionType()));
}
Number number;
if ((number = metadata.getIfSeqNo()) != null) {
indexRequest.setIfSeqNo(number.longValue());
}
if ((number = metadata.getIfPrimaryTerm()) != null) {
indexRequest.setIfPrimaryTerm(number.longValue());
}
try {
boolean ensureNoSelfReferences = ingestDocument.doNoSelfReferencesCheck();
indexRequest.source(ingestDocument.getSource(), indexRequest.getContentType(), ensureNoSelfReferences);
} catch (IllegalArgumentException ex) {
// An IllegalArgumentException can be thrown when an ingest processor creates a source map that is self-referencing.
// In that case, we catch and wrap the exception, so we can include which pipeline failed.
handler.accept(
new IllegalArgumentException(
"Failed to generate the source document for ingest pipeline [" + pipeline.getId() + "]",
ex
)
);
return;
} catch (Exception ex) {
// If anything goes wrong here, we want to know, and cannot proceed with normal execution. For example,
// *rarely*, a ConcurrentModificationException could be thrown if a pipeline leaks a reference to a shared mutable
// collection, and another indexing thread modifies the shared reference while we're trying to ensure it has
// no self references.
handler.accept(
new RuntimeException("Failed to generate the source document for ingest pipeline [" + pipeline.getId() + "]", ex)
);
return;
}
Map<String, String> map;
if ((map = metadata.getDynamicTemplates()) != null) {
Map<String, String> mergedDynamicTemplates = new HashMap<>(indexRequest.getDynamicTemplates());
mergedDynamicTemplates.putAll(map);
indexRequest.setDynamicTemplates(mergedDynamicTemplates);
}
postIngest(ingestDocument, indexRequest);

handler.accept(null);
}
});
}
Expand Down
21 changes: 5 additions & 16 deletions server/src/main/java/org/elasticsearch/ingest/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,13 @@

package org.elasticsearch.ingest;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.script.ScriptService;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier;

Expand All @@ -32,8 +29,6 @@ public final class Pipeline {
public static final String ON_FAILURE_KEY = "on_failure";
public static final String META_KEY = "_meta";

private static final Logger logger = LogManager.getLogger(Pipeline.class);

private final String id;
@Nullable
private final String description;
Expand Down Expand Up @@ -119,20 +114,14 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
* such as the metrics being wrong. The listenerHasBeenCalled variable is used to make sure that the code in the listener
* is only executed once.
*/
final AtomicBoolean listenerHasBeenCalled = new AtomicBoolean(false);
metrics.preIngest();
compoundProcessor.execute(ingestDocument, (result, e) -> {
if (listenerHasBeenCalled.getAndSet(true)) {
logger.warn("A listener was unexpectedly called more than once", new RuntimeException(e));
assert false : "A listener was unexpectedly called more than once";
} else {
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
metrics.postIngest(ingestTimeInNanos);
if (e != null) {
metrics.ingestFailed();
}
handler.accept(result, e);
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
metrics.postIngest(ingestTimeInNanos);
if (e != null) {
metrics.ingestFailed();
}
handler.accept(result, e);
});
}

Expand Down