From 6ecd379cd3b1484ecc42f9efbac5857f511d81a6 Mon Sep 17 00:00:00 2001 From: Przemko Robakowski Date: Thu, 16 Jan 2020 12:57:16 +0100 Subject: [PATCH 1/5] Refactor ForEachProcessor to use iteration instead of recursion This change makes ForEachProcessor iterative and still non-blocking. In case of non-async processors we use single for loop and no recursion at all. In case of async processors we continue work on either current thread or thread started by downstream processor, whichever is slower (usually processor thread). Everything is synchronised by single atomic variable. Relates #50514 --- .../ingest/common/ForEachProcessor.java | 61 +++++------- .../ingest/common/IngestCommonPlugin.java | 2 +- .../common/ForEachProcessorFactoryTests.java | 12 +-- .../ingest/common/ForEachProcessorTests.java | 96 +++---------------- 4 files changed, 46 insertions(+), 125 deletions(-) diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java index 11c1d3e2ea764..7be88b80dc2d5 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java @@ -27,12 +27,12 @@ import org.elasticsearch.script.ScriptService; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; -import java.util.function.Consumer; import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty; @@ -50,19 +50,16 @@ public final class ForEachProcessor extends AbstractProcessor implements WrappingProcessor { public static final String TYPE = "foreach"; - static final int MAX_RECURSE_PER_THREAD = 10; private final String field; private final Processor processor; private final boolean ignoreMissing; - private final Consumer genericExecutor; - ForEachProcessor(String tag, String field, Processor processor, boolean ignoreMissing, Consumer genericExecutor) { + ForEachProcessor(String tag, String field, Processor processor, boolean ignoreMissing) { super(tag); this.field = field; this.processor = processor; this.ignoreMissing = ignoreMissing; - this.genericExecutor = genericExecutor; } boolean isIgnoreMissing() { @@ -79,41 +76,35 @@ public void execute(IngestDocument ingestDocument, BiConsumer newValues = new CopyOnWriteArrayList<>(); - innerExecute(0, values, newValues, ingestDocument, handler); + innerExecute(0, values, Collections.synchronizedList(new ArrayList<>(values.size())), ingestDocument, handler); } } void innerExecute(int index, List values, List newValues, IngestDocument document, BiConsumer handler) { + for (; index < values.size(); index++) { + AtomicBoolean shouldContinueHere = new AtomicBoolean(); + Object value = values.get(index); + Object previousValue = document.getIngestMetadata().put("_value", value); + int nextIndex = index + 1; + processor.execute(document, (result, e) -> { + newValues.add(document.getIngestMetadata().put("_value", previousValue)); + if (e != null || result == null) { + handler.accept(result, e); + } else if (shouldContinueHere.getAndSet(true)) { + innerExecute(nextIndex, values, newValues, document, handler); + } + }); + + if (shouldContinueHere.getAndSet(true) == false) { + return; + } + } + if (index == values.size()) { document.setFieldValue(field, new ArrayList<>(newValues)); handler.accept(document, null); - return; } - - Object value = values.get(index); - Object previousValue = document.getIngestMetadata().put("_value", value); - final Thread thread = Thread.currentThread(); - processor.execute(document, (result, e) -> { - if (e != null) { - newValues.add(document.getIngestMetadata().put("_value", previousValue)); - handler.accept(null, e); - } else if (result == null) { - handler.accept(null, null); - } else { - newValues.add(document.getIngestMetadata().put("_value", previousValue)); - if (thread == Thread.currentThread() && (index + 1) % MAX_RECURSE_PER_THREAD == 0) { - // we are on the same thread and we need to fork to another thread to avoid recursive stack overflow on a single thread - // only fork after 10 recursive calls, then fork every 10 to keep the number of threads down - genericExecutor.accept(() -> innerExecute(index + 1, values, newValues, document, handler)); - } else { - // we are on a different thread (we went asynchronous), it's safe to recurse - // or we have recursed less then 10 times with the same thread, it's safe to recurse - innerExecute(index + 1, values, newValues, document, handler); - } - } - }); } @Override @@ -137,11 +128,9 @@ public Processor getInnerProcessor() { public static final class Factory implements Processor.Factory { private final ScriptService scriptService; - private final Consumer genericExecutor; - Factory(ScriptService scriptService, Consumer genericExecutor) { + Factory(ScriptService scriptService) { this.scriptService = scriptService; - this.genericExecutor = genericExecutor; } @Override @@ -157,7 +146,7 @@ public ForEachProcessor create(Map factories, String Map.Entry> entry = entries.iterator().next(); Processor processor = ConfigurationUtils.readProcessor(factories, scriptService, entry.getKey(), entry.getValue()); - return new ForEachProcessor(tag, field, processor, ignoreMissing, genericExecutor); + return new ForEachProcessor(tag, field, processor, ignoreMissing); } } } diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index 22c76e6b01d73..b37e5d13e4602 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -75,7 +75,7 @@ public Map getProcessors(Processor.Parameters paramet entry(ConvertProcessor.TYPE, new ConvertProcessor.Factory()), entry(GsubProcessor.TYPE, new GsubProcessor.Factory()), entry(FailProcessor.TYPE, new FailProcessor.Factory(parameters.scriptService)), - entry(ForEachProcessor.TYPE, new ForEachProcessor.Factory(parameters.scriptService, parameters.genericExecutor)), + entry(ForEachProcessor.TYPE, new ForEachProcessor.Factory(parameters.scriptService)), entry(DateIndexNameProcessor.TYPE, new DateIndexNameProcessor.Factory(parameters.scriptService)), entry(SortProcessor.TYPE, new SortProcessor.Factory()), entry(GrokProcessor.TYPE, new GrokProcessor.Factory(GROK_PATTERNS, createGrokThreadWatchdog(parameters))), diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java index 19b3966573f7f..bc522caf9b919 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java @@ -43,7 +43,7 @@ public void testCreate() throws Exception { Processor processor = new TestProcessor(ingestDocument -> { }); Map registry = new HashMap<>(); registry.put("_name", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); Map config = new HashMap<>(); config.put("field", "_field"); @@ -59,7 +59,7 @@ public void testSetIgnoreMissing() throws Exception { Processor processor = new TestProcessor(ingestDocument -> { }); Map registry = new HashMap<>(); registry.put("_name", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); Map config = new HashMap<>(); config.put("field", "_field"); @@ -77,7 +77,7 @@ public void testCreateWithTooManyProcessorTypes() throws Exception { Map registry = new HashMap<>(); registry.put("_first", (r, t, c) -> processor); registry.put("_second", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); Map config = new HashMap<>(); config.put("field", "_field"); @@ -90,7 +90,7 @@ public void testCreateWithTooManyProcessorTypes() throws Exception { } public void testCreateWithNonExistingProcessorType() throws Exception { - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); Map config = new HashMap<>(); config.put("field", "_field"); config.put("processor", Collections.singletonMap("_name", Collections.emptyMap())); @@ -103,7 +103,7 @@ public void testCreateWithMissingField() throws Exception { Processor processor = new TestProcessor(ingestDocument -> { }); Map registry = new HashMap<>(); registry.put("_name", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); Map config = new HashMap<>(); config.put("processor", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap()))); Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(registry, null, config)); @@ -111,7 +111,7 @@ public void testCreateWithMissingField() throws Exception { } public void testCreateWithMissingProcessor() { - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); Map config = new HashMap<>(); config.put("field", "_field"); Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(Collections.emptyMap(), null, config)); diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java index 713b4a355bdab..e42737af652e8 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java @@ -19,9 +19,7 @@ package org.elasticsearch.ingest.common; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.ingest.CompoundProcessor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Processor; @@ -39,73 +37,21 @@ import java.util.Locale; import java.util.Map; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Consumer; -import java.util.stream.IntStream; import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; public class ForEachProcessorTests extends ESTestCase { - @SuppressWarnings("unchecked") - private Consumer genericExecutor = (Consumer) mock(Consumer.class); private final ExecutorService direct = EsExecutors.newDirectExecutorService(); - @Before - public void setup() { - //execute runnable on same thread for simplicity. some tests will override this and actually run async - doAnswer(invocationOnMock -> { - direct.execute((Runnable) invocationOnMock.getArguments()[0]); - return null; - }).when(genericExecutor).accept(any(Runnable.class)); - } - - public void testExecute() throws Exception { - ThreadPoolExecutor asyncExecutor = - EsExecutors.newScaling(getClass().getName() + "/" + getTestName(), between(1, 2), between(3, 4), 10, TimeUnit.SECONDS, - EsExecutors.daemonThreadFactory("test"), new ThreadContext(Settings.EMPTY)); - doAnswer(invocationOnMock -> { - asyncExecutor.execute((Runnable) invocationOnMock.getArguments()[0]); - return null; - }).when(genericExecutor).accept(any(Runnable.class)); - - List values = new ArrayList<>(); - values.add("foo"); - values.add("bar"); - values.add("baz"); - IntStream.range(0, ForEachProcessor.MAX_RECURSE_PER_THREAD).forEach(value -> values.add("a")); - IngestDocument ingestDocument = new IngestDocument( - "_index", "_id", null, null, null, Collections.singletonMap("values", values) - ); - - ForEachProcessor processor = new ForEachProcessor( - "_tag", "values", new UppercaseProcessor("_tag", "_ingest._value", false, "_ingest._value"), - false, genericExecutor - ); - processor.execute(ingestDocument, (result, e) -> {}); - - assertBusy(() -> assertEquals(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD, asyncExecutor.getCompletedTaskCount())); - asyncExecutor.shutdown(); - asyncExecutor.awaitTermination(5, TimeUnit.SECONDS); - - @SuppressWarnings("unchecked") - List result = ingestDocument.getFieldValue("values", List.class); - assertThat(result.get(0), equalTo("FOO")); - assertThat(result.get(1), equalTo("BAR")); - assertThat(result.get(2), equalTo("BAZ")); - IntStream.range(3, ForEachProcessor.MAX_RECURSE_PER_THREAD + 3).forEach(i -> assertThat(result.get(i), equalTo("A"))); - verify(genericExecutor, times(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD)).accept(any(Runnable.class)); - } - public void testExecuteWithAsyncProcessor() throws Exception { List values = new ArrayList<>(); values.add("foo"); @@ -116,7 +62,7 @@ public void testExecuteWithAsyncProcessor() throws Exception { ); ForEachProcessor processor = new ForEachProcessor("_tag", "values", new AsyncUpperCaseProcessor("_ingest._value"), - false, genericExecutor); + false); processor.execute(ingestDocument, (result, e) -> { }); @@ -128,8 +74,6 @@ public void testExecuteWithAsyncProcessor() throws Exception { assertThat(result.get(1), equalTo("BAR")); assertThat(result.get(2), equalTo("BAZ")); }); - - verifyZeroInteractions(genericExecutor); } public void testExecuteWithFailure() throws Exception { @@ -142,7 +86,7 @@ public void testExecuteWithFailure() throws Exception { throw new RuntimeException("failure"); } }); - ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor, false, genericExecutor); + ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor, false); Exception[] exceptions = new Exception[1]; processor.execute(ingestDocument, (result, e) -> {exceptions[0] = e;}); assertThat(exceptions[0].getMessage(), equalTo("failure")); @@ -160,7 +104,7 @@ public void testExecuteWithFailure() throws Exception { Processor onFailureProcessor = new TestProcessor(ingestDocument1 -> {}); processor = new ForEachProcessor( "_tag", "values", new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor)), - false, genericExecutor + false ); processor.execute(ingestDocument, (result, e) -> {}); assertThat(testProcessor.getInvokedCounter(), equalTo(3)); @@ -179,7 +123,7 @@ public void testMetaDataAvailable() throws Exception { id.setFieldValue("_ingest._value.index", id.getSourceAndMetadata().get("_index")); id.setFieldValue("_ingest._value.id", id.getSourceAndMetadata().get("_id")); }); - ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false, genericExecutor); + ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false); processor.execute(ingestDocument, (result, e) -> {}); assertThat(innerProcessor.getInvokedCounter(), equalTo(2)); @@ -205,7 +149,7 @@ public void testRestOfTheDocumentIsAvailable() throws Exception { ForEachProcessor processor = new ForEachProcessor( "_tag", "values", new SetProcessor("_tag", new TestTemplateService.MockTemplateScript.Factory("_ingest._value.new_field"), - (model) -> model.get("other")), false, genericExecutor); + (model) -> model.get("other")), false); processor.execute(ingestDocument, (result, e) -> {}); assertThat(ingestDocument.getFieldValue("values.0.new_field", String.class), equalTo("value")); @@ -215,17 +159,10 @@ public void testRestOfTheDocumentIsAvailable() throws Exception { assertThat(ingestDocument.getFieldValue("values.4.new_field", String.class), equalTo("value")); } - public void testRandom() throws Exception { - ThreadPoolExecutor asyncExecutor = - EsExecutors.newScaling(getClass().getName() + "/" + getTestName(), between(1, 2), between(3, 4), 10, TimeUnit.SECONDS, - EsExecutors.daemonThreadFactory("test"), new ThreadContext(Settings.EMPTY)); - doAnswer(invocationOnMock -> { - asyncExecutor.execute((Runnable) invocationOnMock.getArguments()[0]); - return null; - }).when(genericExecutor).accept(any(Runnable.class)); + public void testRandom() { Processor innerProcessor = new Processor() { @Override - public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) { String existingValue = ingestDocument.getFieldValue("_ingest._value", String.class); ingestDocument.setFieldValue("_ingest._value", existingValue + "."); return ingestDocument; @@ -241,7 +178,7 @@ public String getTag() { return null; } }; - int numValues = randomIntBetween(1, 10000); + int numValues = randomIntBetween(10000, 10000); List values = new ArrayList<>(numValues); for (int i = 0; i < numValues; i++) { values.add(""); @@ -250,20 +187,15 @@ public String getTag() { "_index", "_id", null, null, null, Collections.singletonMap("values", values) ); - ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false, genericExecutor); + ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false); processor.execute(ingestDocument, (result, e) -> {}); - assertBusy(() -> assertEquals(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD, asyncExecutor.getCompletedTaskCount())); - asyncExecutor.shutdown(); - asyncExecutor.awaitTermination(5, TimeUnit.SECONDS); - @SuppressWarnings("unchecked") List result = ingestDocument.getFieldValue("values", List.class); assertThat(result.size(), equalTo(numValues)); for (String r : result) { assertThat(r, equalTo(".")); } - verify(genericExecutor, times(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD)).accept(any(Runnable.class)); } public void testModifyFieldsOutsideArray() throws Exception { @@ -281,7 +213,7 @@ public void testModifyFieldsOutsideArray() throws Exception { "_tag", "values", new CompoundProcessor(false, Collections.singletonList(new UppercaseProcessor("_tag_upper", "_ingest._value", false, "_ingest._value")), Collections.singletonList(new AppendProcessor("_tag", template, (model) -> (Collections.singletonList("added")))) - ), false, genericExecutor); + ), false); processor.execute(ingestDocument, (result, e) -> {}); List result = ingestDocument.getFieldValue("values", List.class); @@ -307,7 +239,7 @@ public void testScalarValueAllowsUnderscoreValueFieldToRemainAccessible() throws TestProcessor processor = new TestProcessor(doc -> doc.setFieldValue("_ingest._value", doc.getFieldValue("_source._value", String.class))); - ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor, false, genericExecutor); + ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor, false); forEachProcessor.execute(ingestDocument, (result, e) -> {}); List result = ingestDocument.getFieldValue("values", List.class); @@ -340,8 +272,8 @@ public void testNestedForEach() throws Exception { doc -> doc.setFieldValue("_ingest._value", doc.getFieldValue("_ingest._value", String.class).toUpperCase(Locale.ENGLISH)) ); ForEachProcessor processor = new ForEachProcessor( - "_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor, false, genericExecutor), - false, genericExecutor); + "_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor, false), + false); processor.execute(ingestDocument, (result, e) -> {}); List result = ingestDocument.getFieldValue("values1.0.values2", List.class); @@ -359,7 +291,7 @@ public void testIgnoreMissing() throws Exception { ); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); TestProcessor testProcessor = new TestProcessor(doc -> {}); - ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", testProcessor, true, genericExecutor); + ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", testProcessor, true); processor.execute(ingestDocument, (result, e) -> {}); assertIngestDocument(originalIngestDocument, ingestDocument); assertThat(testProcessor.getInvokedCounter(), equalTo(0)); From 0d9276d5a4dd868fefa90a071f572920c3cf4535 Mon Sep 17 00:00:00 2001 From: Przemko Robakowski Date: Thu, 16 Jan 2020 16:02:12 +0100 Subject: [PATCH 2/5] Unused imports removed --- .../elasticsearch/ingest/common/ForEachProcessorTests.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java index e42737af652e8..4110e191b0210 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java @@ -27,7 +27,6 @@ import org.elasticsearch.ingest.TestTemplateService; import org.elasticsearch.script.TemplateScript; import org.elasticsearch.test.ESTestCase; -import org.junit.Before; import java.util.ArrayList; import java.util.Arrays; @@ -38,15 +37,9 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; -import java.util.function.Consumer; import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; import static org.hamcrest.Matchers.equalTo; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyZeroInteractions; public class ForEachProcessorTests extends ESTestCase { From 089fca096833c79cfc442e0e4cd81babb555d204 Mon Sep 17 00:00:00 2001 From: Przemko Robakowski Date: Thu, 16 Jan 2020 21:28:26 +0100 Subject: [PATCH 3/5] Removed unnecessary synchronization, tests refactorings --- .../ingest/common/ForEachProcessor.java | 2 +- .../ingest/common/ForEachProcessorTests.java | 17 ++++++----------- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java index 7be88b80dc2d5..1a9c101352cd6 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java @@ -76,7 +76,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer(values.size())), ingestDocument, handler); + innerExecute(0, values, new ArrayList<>(values.size()), ingestDocument, handler); } } diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java index 4110e191b0210..e4e59d7db1063 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java @@ -35,16 +35,15 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; import static org.hamcrest.Matchers.equalTo; public class ForEachProcessorTests extends ESTestCase { - private final ExecutorService direct = EsExecutors.newDirectExecutorService(); - public void testExecuteWithAsyncProcessor() throws Exception { List values = new ArrayList<>(); values.add("foo"); @@ -171,11 +170,9 @@ public String getTag() { return null; } }; - int numValues = randomIntBetween(10000, 10000); - List values = new ArrayList<>(numValues); - for (int i = 0; i < numValues; i++) { - values.add(""); - } + int numValues = randomIntBetween(1, 10000); + List values = IntStream.range(0, numValues).mapToObj(i->"").collect(Collectors.toList()); + IngestDocument ingestDocument = new IngestDocument( "_index", "_id", null, null, null, Collections.singletonMap("values", values) ); @@ -186,9 +183,7 @@ public String getTag() { @SuppressWarnings("unchecked") List result = ingestDocument.getFieldValue("values", List.class); assertThat(result.size(), equalTo(numValues)); - for (String r : result) { - assertThat(r, equalTo(".")); - } + result.forEach(r -> assertThat(r, equalTo("."))); } public void testModifyFieldsOutsideArray() throws Exception { From 03b0ab63c96e8ae783e0463c4554f6d4fdfed17c Mon Sep 17 00:00:00 2001 From: Przemko Robakowski Date: Thu, 16 Jan 2020 21:39:09 +0100 Subject: [PATCH 4/5] Unused import removed --- .../java/org/elasticsearch/ingest/common/ForEachProcessor.java | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java index 1a9c101352cd6..d79a64626b611 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java @@ -27,7 +27,6 @@ import org.elasticsearch.script.ScriptService; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; From bf8fdc201bf3497005cf291da1b40848f96c3a65 Mon Sep 17 00:00:00 2001 From: Przemko Robakowski Date: Thu, 16 Jan 2020 21:53:36 +0100 Subject: [PATCH 5/5] Unused import removed --- .../org/elasticsearch/ingest/common/ForEachProcessorTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java index e4e59d7db1063..76679c1f9fcd7 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.ingest.common; -import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.ingest.CompoundProcessor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Processor;