From 8a196dbe2a9a7c3cbe3b07bdfa6ff3680c320f6d Mon Sep 17 00:00:00 2001 From: Przemko Robakowski Date: Tue, 21 Jan 2020 21:43:50 +0100 Subject: [PATCH 1/2] Refactor ForEachProcessor to use iteration instead of recursion (#51104) * Refactor ForEachProcessor to use iteration instead of recursion This change makes ForEachProcessor iterative and still non-blocking. In case of non-async processors we use single for loop and no recursion at all. In case of async processors we continue work on either current thread or thread started by downstream processor, whichever is slower (usually processor thread). Everything is synchronised by single atomic variable. Relates #50514 --- .../ingest/common/ForEachProcessor.java | 60 ++++----- .../ingest/common/IngestCommonPlugin.java | 3 +- .../common/ForEachProcessorFactoryTests.java | 12 +- .../ingest/common/ForEachProcessorTests.java | 117 +++--------------- 4 files changed, 50 insertions(+), 142 deletions(-) diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java index 11c1d3e2ea764..d79a64626b611 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java @@ -30,9 +30,8 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; -import java.util.function.Consumer; import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty; @@ -50,19 +49,16 @@ public final class ForEachProcessor extends AbstractProcessor implements WrappingProcessor { public static final String TYPE = "foreach"; - static final int MAX_RECURSE_PER_THREAD = 10; private final String field; private final Processor processor; private final boolean ignoreMissing; - private final Consumer genericExecutor; - ForEachProcessor(String tag, String field, Processor processor, boolean ignoreMissing, Consumer genericExecutor) { + ForEachProcessor(String tag, String field, Processor processor, boolean ignoreMissing) { super(tag); this.field = field; this.processor = processor; this.ignoreMissing = ignoreMissing; - this.genericExecutor = genericExecutor; } boolean isIgnoreMissing() { @@ -79,41 +75,35 @@ public void execute(IngestDocument ingestDocument, BiConsumer newValues = new CopyOnWriteArrayList<>(); - innerExecute(0, values, newValues, ingestDocument, handler); + innerExecute(0, values, new ArrayList<>(values.size()), ingestDocument, handler); } } void innerExecute(int index, List values, List newValues, IngestDocument document, BiConsumer handler) { + for (; index < values.size(); index++) { + AtomicBoolean shouldContinueHere = new AtomicBoolean(); + Object value = values.get(index); + Object previousValue = document.getIngestMetadata().put("_value", value); + int nextIndex = index + 1; + processor.execute(document, (result, e) -> { + newValues.add(document.getIngestMetadata().put("_value", previousValue)); + if (e != null || result == null) { + handler.accept(result, e); + } else if (shouldContinueHere.getAndSet(true)) { + innerExecute(nextIndex, values, newValues, document, handler); + } + }); + + if (shouldContinueHere.getAndSet(true) == false) { + return; + } + } + if (index == values.size()) { document.setFieldValue(field, new ArrayList<>(newValues)); handler.accept(document, null); - return; } - - Object value = values.get(index); - Object previousValue = document.getIngestMetadata().put("_value", value); - final Thread thread = Thread.currentThread(); - processor.execute(document, (result, e) -> { - if (e != null) { - newValues.add(document.getIngestMetadata().put("_value", previousValue)); - handler.accept(null, e); - } else if (result == null) { - handler.accept(null, null); - } else { - newValues.add(document.getIngestMetadata().put("_value", previousValue)); - if (thread == Thread.currentThread() && (index + 1) % MAX_RECURSE_PER_THREAD == 0) { - // we are on the same thread and we need to fork to another thread to avoid recursive stack overflow on a single thread - // only fork after 10 recursive calls, then fork every 10 to keep the number of threads down - genericExecutor.accept(() -> innerExecute(index + 1, values, newValues, document, handler)); - } else { - // we are on a different thread (we went asynchronous), it's safe to recurse - // or we have recursed less then 10 times with the same thread, it's safe to recurse - innerExecute(index + 1, values, newValues, document, handler); - } - } - }); } @Override @@ -137,11 +127,9 @@ public Processor getInnerProcessor() { public static final class Factory implements Processor.Factory { private final ScriptService scriptService; - private final Consumer genericExecutor; - Factory(ScriptService scriptService, Consumer genericExecutor) { + Factory(ScriptService scriptService) { this.scriptService = scriptService; - this.genericExecutor = genericExecutor; } @Override @@ -157,7 +145,7 @@ public ForEachProcessor create(Map factories, String Map.Entry> entry = entries.iterator().next(); Processor processor = ConfigurationUtils.readProcessor(factories, scriptService, entry.getKey(), entry.getValue()); - return new ForEachProcessor(tag, field, processor, ignoreMissing, genericExecutor); + return new ForEachProcessor(tag, field, processor, ignoreMissing); } } } diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index b16282f7c176a..dead3967d45dc 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -60,6 +60,7 @@ public IngestCommonPlugin() { @Override public Map getProcessors(Processor.Parameters parameters) { +<<<<<<< HEAD Map processors = new HashMap<>(); processors.put(DateProcessor.TYPE, new DateProcessor.Factory(parameters.scriptService)); processors.put(SetProcessor.TYPE, new SetProcessor.Factory(parameters.scriptService)); @@ -74,7 +75,7 @@ public Map getProcessors(Processor.Parameters paramet processors.put(ConvertProcessor.TYPE, new ConvertProcessor.Factory()); processors.put(GsubProcessor.TYPE, new GsubProcessor.Factory()); processors.put(FailProcessor.TYPE, new FailProcessor.Factory(parameters.scriptService)); - processors.put(ForEachProcessor.TYPE, new ForEachProcessor.Factory(parameters.scriptService, parameters.genericExecutor)); + processors.put(ForEachProcessor.TYPE, new ForEachProcessor.Factory(parameters.scriptService)); processors.put(DateIndexNameProcessor.TYPE, new DateIndexNameProcessor.Factory(parameters.scriptService)); processors.put(SortProcessor.TYPE, new SortProcessor.Factory()); processors.put(GrokProcessor.TYPE, new GrokProcessor.Factory(GROK_PATTERNS, createGrokThreadWatchdog(parameters))); diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java index 19b3966573f7f..bc522caf9b919 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java @@ -43,7 +43,7 @@ public void testCreate() throws Exception { Processor processor = new TestProcessor(ingestDocument -> { }); Map registry = new HashMap<>(); registry.put("_name", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); Map config = new HashMap<>(); config.put("field", "_field"); @@ -59,7 +59,7 @@ public void testSetIgnoreMissing() throws Exception { Processor processor = new TestProcessor(ingestDocument -> { }); Map registry = new HashMap<>(); registry.put("_name", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); Map config = new HashMap<>(); config.put("field", "_field"); @@ -77,7 +77,7 @@ public void testCreateWithTooManyProcessorTypes() throws Exception { Map registry = new HashMap<>(); registry.put("_first", (r, t, c) -> processor); registry.put("_second", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); Map config = new HashMap<>(); config.put("field", "_field"); @@ -90,7 +90,7 @@ public void testCreateWithTooManyProcessorTypes() throws Exception { } public void testCreateWithNonExistingProcessorType() throws Exception { - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); Map config = new HashMap<>(); config.put("field", "_field"); config.put("processor", Collections.singletonMap("_name", Collections.emptyMap())); @@ -103,7 +103,7 @@ public void testCreateWithMissingField() throws Exception { Processor processor = new TestProcessor(ingestDocument -> { }); Map registry = new HashMap<>(); registry.put("_name", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); Map config = new HashMap<>(); config.put("processor", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap()))); Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(registry, null, config)); @@ -111,7 +111,7 @@ public void testCreateWithMissingField() throws Exception { } public void testCreateWithMissingProcessor() { - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); Map config = new HashMap<>(); config.put("field", "_field"); Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(Collections.emptyMap(), null, config)); diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java index 66b6cb5c71fc2..8ab3061655ac4 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java @@ -19,9 +19,6 @@ package org.elasticsearch.ingest.common; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.ingest.CompoundProcessor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Processor; @@ -29,7 +26,6 @@ import org.elasticsearch.ingest.TestTemplateService; import org.elasticsearch.script.TemplateScript; import org.elasticsearch.test.ESTestCase; -import org.junit.Before; import java.util.ArrayList; import java.util.Arrays; @@ -38,74 +34,15 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; -import java.util.function.Consumer; +import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; import static org.hamcrest.Matchers.equalTo; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyZeroInteractions; public class ForEachProcessorTests extends ESTestCase { - - @SuppressWarnings("unchecked") - private Consumer genericExecutor = (Consumer) mock(Consumer.class); - private final ExecutorService direct = EsExecutors.newDirectExecutorService(); - - @Before - public void setup() { - //execute runnable on same thread for simplicity. some tests will override this and actually run async - doAnswer(invocationOnMock -> { - direct.execute((Runnable) invocationOnMock.getArguments()[0]); - return null; - }).when(genericExecutor).accept(any(Runnable.class)); - } - - public void testExecute() throws Exception { - ThreadPoolExecutor asyncExecutor = - EsExecutors.newScaling(getClass().getName() + "/" + getTestName(), between(1, 2), between(3, 4), 10, TimeUnit.SECONDS, - EsExecutors.daemonThreadFactory("test"), new ThreadContext(Settings.EMPTY)); - doAnswer(invocationOnMock -> { - asyncExecutor.execute((Runnable) invocationOnMock.getArguments()[0]); - return null; - }).when(genericExecutor).accept(any(Runnable.class)); - - List values = new ArrayList<>(); - values.add("foo"); - values.add("bar"); - values.add("baz"); - IntStream.range(0, ForEachProcessor.MAX_RECURSE_PER_THREAD).forEach(value -> values.add("a")); - IngestDocument ingestDocument = new IngestDocument( - "_index", "_type", "_id", null, null, null, Collections.singletonMap("values", values) - ); - - ForEachProcessor processor = new ForEachProcessor( - "_tag", "values", new UppercaseProcessor("_tag", "_ingest._value", false, "_ingest._value"), - false, genericExecutor - ); - processor.execute(ingestDocument, (result, e) -> {}); - - assertBusy(() -> assertEquals(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD, asyncExecutor.getCompletedTaskCount())); - asyncExecutor.shutdown(); - asyncExecutor.awaitTermination(5, TimeUnit.SECONDS); - - @SuppressWarnings("unchecked") - List result = ingestDocument.getFieldValue("values", List.class); - assertThat(result.get(0), equalTo("FOO")); - assertThat(result.get(1), equalTo("BAR")); - assertThat(result.get(2), equalTo("BAZ")); - IntStream.range(3, ForEachProcessor.MAX_RECURSE_PER_THREAD + 3).forEach(i -> assertThat(result.get(i), equalTo("A"))); - verify(genericExecutor, times(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD)).accept(any(Runnable.class)); - } - + public void testExecuteWithAsyncProcessor() throws Exception { List values = new ArrayList<>(); values.add("foo"); @@ -116,7 +53,7 @@ public void testExecuteWithAsyncProcessor() throws Exception { ); ForEachProcessor processor = new ForEachProcessor("_tag", "values", new AsyncUpperCaseProcessor("_ingest._value"), - false, genericExecutor); + false); processor.execute(ingestDocument, (result, e) -> { }); @@ -128,8 +65,6 @@ public void testExecuteWithAsyncProcessor() throws Exception { assertThat(result.get(1), equalTo("BAR")); assertThat(result.get(2), equalTo("BAZ")); }); - - verifyZeroInteractions(genericExecutor); } public void testExecuteWithFailure() throws Exception { @@ -142,7 +77,7 @@ public void testExecuteWithFailure() throws Exception { throw new RuntimeException("failure"); } }); - ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor, false, genericExecutor); + ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor, false); Exception[] exceptions = new Exception[1]; processor.execute(ingestDocument, (result, e) -> {exceptions[0] = e;}); assertThat(exceptions[0].getMessage(), equalTo("failure")); @@ -160,7 +95,7 @@ public void testExecuteWithFailure() throws Exception { Processor onFailureProcessor = new TestProcessor(ingestDocument1 -> {}); processor = new ForEachProcessor( "_tag", "values", new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor)), - false, genericExecutor + false ); processor.execute(ingestDocument, (result, e) -> {}); assertThat(testProcessor.getInvokedCounter(), equalTo(3)); @@ -180,7 +115,7 @@ public void testMetaDataAvailable() throws Exception { id.setFieldValue("_ingest._value.type", id.getSourceAndMetadata().get("_type")); id.setFieldValue("_ingest._value.id", id.getSourceAndMetadata().get("_id")); }); - ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false, genericExecutor); + ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false); processor.execute(ingestDocument, (result, e) -> {}); assertThat(innerProcessor.getInvokedCounter(), equalTo(2)); @@ -208,7 +143,7 @@ public void testRestOfTheDocumentIsAvailable() throws Exception { ForEachProcessor processor = new ForEachProcessor( "_tag", "values", new SetProcessor("_tag", new TestTemplateService.MockTemplateScript.Factory("_ingest._value.new_field"), - (model) -> model.get("other")), false, genericExecutor); + (model) -> model.get("other")), false); processor.execute(ingestDocument, (result, e) -> {}); assertThat(ingestDocument.getFieldValue("values.0.new_field", String.class), equalTo("value")); @@ -218,17 +153,10 @@ public void testRestOfTheDocumentIsAvailable() throws Exception { assertThat(ingestDocument.getFieldValue("values.4.new_field", String.class), equalTo("value")); } - public void testRandom() throws Exception { - ThreadPoolExecutor asyncExecutor = - EsExecutors.newScaling(getClass().getName() + "/" + getTestName(), between(1, 2), between(3, 4), 10, TimeUnit.SECONDS, - EsExecutors.daemonThreadFactory("test"), new ThreadContext(Settings.EMPTY)); - doAnswer(invocationOnMock -> { - asyncExecutor.execute((Runnable) invocationOnMock.getArguments()[0]); - return null; - }).when(genericExecutor).accept(any(Runnable.class)); + public void testRandom() { Processor innerProcessor = new Processor() { @Override - public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) { String existingValue = ingestDocument.getFieldValue("_ingest._value", String.class); ingestDocument.setFieldValue("_ingest._value", existingValue + "."); return ingestDocument; @@ -245,28 +173,19 @@ public String getTag() { } }; int numValues = randomIntBetween(1, 10000); - List values = new ArrayList<>(numValues); - for (int i = 0; i < numValues; i++) { - values.add(""); - } + List values = IntStream.range(0, numValues).mapToObj(i->"").collect(Collectors.toList()); + IngestDocument ingestDocument = new IngestDocument( "_index", "_type", "_id", null, null, null, Collections.singletonMap("values", values) ); - ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false, genericExecutor); + ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false); processor.execute(ingestDocument, (result, e) -> {}); - assertBusy(() -> assertEquals(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD, asyncExecutor.getCompletedTaskCount())); - asyncExecutor.shutdown(); - asyncExecutor.awaitTermination(5, TimeUnit.SECONDS); - @SuppressWarnings("unchecked") List result = ingestDocument.getFieldValue("values", List.class); assertThat(result.size(), equalTo(numValues)); - for (String r : result) { - assertThat(r, equalTo(".")); - } - verify(genericExecutor, times(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD)).accept(any(Runnable.class)); + result.forEach(r -> assertThat(r, equalTo("."))); } public void testModifyFieldsOutsideArray() throws Exception { @@ -284,7 +203,7 @@ public void testModifyFieldsOutsideArray() throws Exception { "_tag", "values", new CompoundProcessor(false, Collections.singletonList(new UppercaseProcessor("_tag_upper", "_ingest._value", false, "_ingest._value")), Collections.singletonList(new AppendProcessor("_tag", template, (model) -> (Collections.singletonList("added")))) - ), false, genericExecutor); + ), false); processor.execute(ingestDocument, (result, e) -> {}); List result = ingestDocument.getFieldValue("values", List.class); @@ -310,7 +229,7 @@ public void testScalarValueAllowsUnderscoreValueFieldToRemainAccessible() throws TestProcessor processor = new TestProcessor(doc -> doc.setFieldValue("_ingest._value", doc.getFieldValue("_source._value", String.class))); - ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor, false, genericExecutor); + ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor, false); forEachProcessor.execute(ingestDocument, (result, e) -> {}); List result = ingestDocument.getFieldValue("values", List.class); @@ -343,8 +262,8 @@ public void testNestedForEach() throws Exception { doc -> doc.setFieldValue("_ingest._value", doc.getFieldValue("_ingest._value", String.class).toUpperCase(Locale.ENGLISH)) ); ForEachProcessor processor = new ForEachProcessor( - "_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor, false, genericExecutor), - false, genericExecutor); + "_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor, false), + false); processor.execute(ingestDocument, (result, e) -> {}); List result = ingestDocument.getFieldValue("values1.0.values2", List.class); @@ -362,7 +281,7 @@ public void testIgnoreMissing() throws Exception { ); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); TestProcessor testProcessor = new TestProcessor(doc -> {}); - ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", testProcessor, true, genericExecutor); + ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", testProcessor, true); processor.execute(ingestDocument, (result, e) -> {}); assertIngestDocument(originalIngestDocument, ingestDocument); assertThat(testProcessor.getInvokedCounter(), equalTo(0)); From 4260f4879e4139a33d6b11b5c1724e97a8aeb09d Mon Sep 17 00:00:00 2001 From: Przemko Robakowski Date: Wed, 22 Jan 2020 18:30:49 +0100 Subject: [PATCH 2/2] Update IngestCommonPlugin.java --- .../java/org/elasticsearch/ingest/common/IngestCommonPlugin.java | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index dead3967d45dc..a29756b2e1cb1 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -60,7 +60,6 @@ public IngestCommonPlugin() { @Override public Map getProcessors(Processor.Parameters parameters) { -<<<<<<< HEAD Map processors = new HashMap<>(); processors.put(DateProcessor.TYPE, new DateProcessor.Factory(parameters.scriptService)); processors.put(SetProcessor.TYPE, new SetProcessor.Factory(parameters.scriptService));