Skip to content

Commit d6d3e2b

Browse files
authored
Make for each processor resistant to field modification (#62791)
This change provides consistent view of field that foreach processor is iterating over. That prevents it to go into infinite loop and put great pressure on the cluster. Closes #62790
1 parent daade44 commit d6d3e2b

File tree

2 files changed

+23
-1
lines changed

2 files changed

+23
-1
lines changed

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
7575
handler.accept(null, new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements."));
7676
}
7777
} else {
78-
innerExecute(0, values, new ArrayList<>(values.size()), ingestDocument, handler);
78+
innerExecute(0, new ArrayList<>(values), new ArrayList<>(values.size()), ingestDocument, handler);
7979
}
8080
}
8181

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,28 @@ public void testIgnoreMissing() throws Exception {
289289
assertThat(testProcessor.getInvokedCounter(), equalTo(0));
290290
}
291291

292+
public void testAppendingToTheSameField() {
293+
IngestDocument originalIngestDocument = new IngestDocument("_index", "_id", null, null, null, Map.of("field", List.of("a", "b")));
294+
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
295+
TestProcessor testProcessor = new TestProcessor(id->id.appendFieldValue("field", "a"));
296+
ForEachProcessor processor = new ForEachProcessor("_tag", null, "field", testProcessor, true);
297+
processor.execute(ingestDocument, (result, e) -> {});
298+
assertThat(testProcessor.getInvokedCounter(), equalTo(2));
299+
ingestDocument.removeField("_ingest._value");
300+
assertThat(ingestDocument, equalTo(originalIngestDocument));
301+
}
302+
303+
public void testRemovingFromTheSameField() {
304+
IngestDocument originalIngestDocument = new IngestDocument("_index", "_id", null, null, null, Map.of("field", List.of("a", "b")));
305+
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
306+
TestProcessor testProcessor = new TestProcessor(id -> id.removeField("field.0"));
307+
ForEachProcessor processor = new ForEachProcessor("_tag", null, "field", testProcessor, true);
308+
processor.execute(ingestDocument, (result, e) -> {});
309+
assertThat(testProcessor.getInvokedCounter(), equalTo(2));
310+
ingestDocument.removeField("_ingest._value");
311+
assertThat(ingestDocument, equalTo(originalIngestDocument));
312+
}
313+
292314
private class AsyncUpperCaseProcessor implements Processor {
293315

294316
private final String field;

0 commit comments

Comments
 (0)