Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/91475.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 91475
summary: Catching exceptions to prevent ingest listeners from being called multiple
times
area: Ingest Node
type: bug
issues: []
23 changes: 17 additions & 6 deletions server/src/main/java/org/elasticsearch/ingest/IngestDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.elasticsearch.ingest;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.LazyMap;
import org.elasticsearch.common.util.Maps;
Expand Down Expand Up @@ -49,6 +51,8 @@ public final class IngestDocument {

static final String TIMESTAMP = "timestamp";

private static final Logger logger = LogManager.getLogger(IngestDocument.class);

private final IngestCtxMap ctxMap;
private final Map<String, Object> ingestMetadata;

Expand Down Expand Up @@ -829,13 +833,20 @@ public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Except
if (executedPipelines.add(pipeline.getId())) {
Object previousPipeline = ingestMetadata.put("pipeline", pipeline.getId());
pipeline.execute(this, (result, e) -> {
executedPipelines.remove(pipeline.getId());
if (previousPipeline != null) {
ingestMetadata.put("pipeline", previousPipeline);
} else {
ingestMetadata.remove("pipeline");
try {
executedPipelines.remove(pipeline.getId());
if (previousPipeline != null) {
ingestMetadata.put("pipeline", previousPipeline);
} else {
ingestMetadata.remove("pipeline");
}
handler.accept(result, e);
} catch (Exception e2) {
if (e != null) {
logger.error("Exception encountered while handling another exception", e);
}
handler.accept(result, e2);
}
handler.accept(result, e);
});
} else {
handler.accept(null, new IllegalStateException(PIPELINE_CYCLE_ERROR_MESSAGE + pipeline.getId()));
Expand Down
115 changes: 61 additions & 54 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -912,64 +912,71 @@ private void innerExecute(
*/
final AtomicBoolean listenerHasBeenCalled = new AtomicBoolean(false);
ingestDocument.executePipeline(pipeline, (result, e) -> {
if (listenerHasBeenCalled.getAndSet(true)) {
logger.warn("A listener was unexpectedly called more than once", new RuntimeException());
assert false : "A listener was unexpectedly called more than once";
} else {
long ingestTimeInNanos = System.nanoTime() - startTimeInNanos;
totalMetrics.postIngest(ingestTimeInNanos);
if (e != null) {
totalMetrics.ingestFailed();
handler.accept(e);
} else if (result == null) {
itemDroppedHandler.accept(slot);
handler.accept(null);
try {
if (listenerHasBeenCalled.getAndSet(true)) {
logger.warn("A listener was unexpectedly called more than once", new RuntimeException());
assert false : "A listener was unexpectedly called more than once";
} else {
org.elasticsearch.script.Metadata metadata = ingestDocument.getMetadata();

// it's fine to set all metadata fields all the time, as ingest document holds their starting values
// before ingestion, which might also get modified during ingestion.
indexRequest.index(metadata.getIndex());
indexRequest.id(metadata.getId());
indexRequest.routing(metadata.getRouting());
indexRequest.version(metadata.getVersion());
if (metadata.getVersionType() != null) {
indexRequest.versionType(VersionType.fromString(metadata.getVersionType()));
}
Number number;
if ((number = metadata.getIfSeqNo()) != null) {
indexRequest.setIfSeqNo(number.longValue());
}
if ((number = metadata.getIfPrimaryTerm()) != null) {
indexRequest.setIfPrimaryTerm(number.longValue());
}
try {
boolean ensureNoSelfReferences = ingestDocument.doNoSelfReferencesCheck();
indexRequest.source(ingestDocument.getSource(), indexRequest.getContentType(), ensureNoSelfReferences);
} catch (IllegalArgumentException ex) {
// An IllegalArgumentException can be thrown when an ingest
// processor creates a source map that is self-referencing.
// In that case, we catch and wrap the exception so we can
// include which pipeline failed.
long ingestTimeInNanos = System.nanoTime() - startTimeInNanos;
totalMetrics.postIngest(ingestTimeInNanos);
if (e != null) {
totalMetrics.ingestFailed();
handler.accept(
new IllegalArgumentException(
"Failed to generate the source document for ingest pipeline [" + pipeline.getId() + "]",
ex
)
);
return;
}
Map<String, String> map;
if ((map = metadata.getDynamicTemplates()) != null) {
Map<String, String> mergedDynamicTemplates = new HashMap<>(indexRequest.getDynamicTemplates());
mergedDynamicTemplates.putAll(map);
indexRequest.setDynamicTemplates(mergedDynamicTemplates);
}
postIngest(ingestDocument, indexRequest);
handler.accept(e);
} else if (result == null) {
itemDroppedHandler.accept(slot);
handler.accept(null);
} else {
org.elasticsearch.script.Metadata metadata = ingestDocument.getMetadata();

// it's fine to set all metadata fields all the time, as ingest document holds their starting values
// before ingestion, which might also get modified during ingestion.
indexRequest.index(metadata.getIndex());
indexRequest.id(metadata.getId());
indexRequest.routing(metadata.getRouting());
indexRequest.version(metadata.getVersion());
if (metadata.getVersionType() != null) {
indexRequest.versionType(VersionType.fromString(metadata.getVersionType()));
}
Number number;
if ((number = metadata.getIfSeqNo()) != null) {
indexRequest.setIfSeqNo(number.longValue());
}
if ((number = metadata.getIfPrimaryTerm()) != null) {
indexRequest.setIfPrimaryTerm(number.longValue());
}
try {
boolean ensureNoSelfReferences = ingestDocument.doNoSelfReferencesCheck();
indexRequest.source(ingestDocument.getSource(), indexRequest.getContentType(), ensureNoSelfReferences);
} catch (IllegalArgumentException ex) {
// An IllegalArgumentException can be thrown when an ingest
// processor creates a source map that is self-referencing.
// In that case, we catch and wrap the exception so we can
// include which pipeline failed.
totalMetrics.ingestFailed();
handler.accept(
new IllegalArgumentException(
"Failed to generate the source document for ingest pipeline [" + pipeline.getId() + "]",
ex
)
);
return;
}
Map<String, String> map;
if ((map = metadata.getDynamicTemplates()) != null) {
Map<String, String> mergedDynamicTemplates = new HashMap<>(indexRequest.getDynamicTemplates());
mergedDynamicTemplates.putAll(map);
indexRequest.setDynamicTemplates(mergedDynamicTemplates);
}
postIngest(ingestDocument, indexRequest);

handler.accept(null);
handler.accept(null);
}
}
} catch (Exception e2) {
if (e != null) {
logger.error("Exception encountered while handling another exception", e);
}
handler.accept(e2);
}
});
}
Expand Down