diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java index 11c1d3e2ea764..d79a64626b611 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java @@ -30,9 +30,8 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; -import java.util.function.Consumer; import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty; @@ -50,19 +49,16 @@ public final class ForEachProcessor extends AbstractProcessor implements WrappingProcessor { public static final String TYPE = "foreach"; - static final int MAX_RECURSE_PER_THREAD = 10; private final String field; private final Processor processor; private final boolean ignoreMissing; - private final Consumer genericExecutor; - ForEachProcessor(String tag, String field, Processor processor, boolean ignoreMissing, Consumer genericExecutor) { + ForEachProcessor(String tag, String field, Processor processor, boolean ignoreMissing) { super(tag); this.field = field; this.processor = processor; this.ignoreMissing = ignoreMissing; - this.genericExecutor = genericExecutor; } boolean isIgnoreMissing() { @@ -79,41 +75,35 @@ public void execute(IngestDocument ingestDocument, BiConsumer newValues = new CopyOnWriteArrayList<>(); - innerExecute(0, values, newValues, ingestDocument, handler); + innerExecute(0, values, new ArrayList<>(values.size()), ingestDocument, handler); } } void innerExecute(int index, List values, List newValues, IngestDocument document, BiConsumer handler) { + for (; index < values.size(); index++) { + AtomicBoolean shouldContinueHere = new AtomicBoolean(); + Object value = values.get(index); + Object previousValue = document.getIngestMetadata().put("_value", value); + int nextIndex = index + 1; + processor.execute(document, (result, e) -> { + newValues.add(document.getIngestMetadata().put("_value", previousValue)); + if (e != null || result == null) { + handler.accept(result, e); + } else if (shouldContinueHere.getAndSet(true)) { + innerExecute(nextIndex, values, newValues, document, handler); + } + }); + + if (shouldContinueHere.getAndSet(true) == false) { + return; + } + } + if (index == values.size()) { document.setFieldValue(field, new ArrayList<>(newValues)); handler.accept(document, null); - return; } - - Object value = values.get(index); - Object previousValue = document.getIngestMetadata().put("_value", value); - final Thread thread = Thread.currentThread(); - processor.execute(document, (result, e) -> { - if (e != null) { - newValues.add(document.getIngestMetadata().put("_value", previousValue)); - handler.accept(null, e); - } else if (result == null) { - handler.accept(null, null); - } else { - newValues.add(document.getIngestMetadata().put("_value", previousValue)); - if (thread == Thread.currentThread() && (index + 1) % MAX_RECURSE_PER_THREAD == 0) { - // we are on the same thread and we need to fork to another thread to avoid recursive stack overflow on a single thread - // only fork after 10 recursive calls, then fork every 10 to keep the number of threads down - genericExecutor.accept(() -> innerExecute(index + 1, values, newValues, document, handler)); - } else { - // we are on a different thread (we went asynchronous), it's safe to recurse - // or we have recursed less then 10 times with the same thread, it's safe to recurse - innerExecute(index + 1, values, newValues, document, handler); - } - } - }); } @Override @@ -137,11 +127,9 @@ public Processor getInnerProcessor() { public static final class Factory implements Processor.Factory { private final ScriptService scriptService; - private final Consumer genericExecutor; - Factory(ScriptService scriptService, Consumer genericExecutor) { + Factory(ScriptService scriptService) { this.scriptService = scriptService; - this.genericExecutor = genericExecutor; } @Override @@ -157,7 +145,7 @@ public ForEachProcessor create(Map factories, String Map.Entry> entry = entries.iterator().next(); Processor processor = ConfigurationUtils.readProcessor(factories, scriptService, entry.getKey(), entry.getValue()); - return new ForEachProcessor(tag, field, processor, ignoreMissing, genericExecutor); + return new ForEachProcessor(tag, field, processor, ignoreMissing); } } } diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index 22c76e6b01d73..b37e5d13e4602 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -75,7 +75,7 @@ public Map getProcessors(Processor.Parameters paramet entry(ConvertProcessor.TYPE, new ConvertProcessor.Factory()), entry(GsubProcessor.TYPE, new GsubProcessor.Factory()), entry(FailProcessor.TYPE, new FailProcessor.Factory(parameters.scriptService)), - entry(ForEachProcessor.TYPE, new ForEachProcessor.Factory(parameters.scriptService, parameters.genericExecutor)), + entry(ForEachProcessor.TYPE, new ForEachProcessor.Factory(parameters.scriptService)), entry(DateIndexNameProcessor.TYPE, new DateIndexNameProcessor.Factory(parameters.scriptService)), entry(SortProcessor.TYPE, new SortProcessor.Factory()), entry(GrokProcessor.TYPE, new GrokProcessor.Factory(GROK_PATTERNS, createGrokThreadWatchdog(parameters))), diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java index 19b3966573f7f..bc522caf9b919 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java @@ -43,7 +43,7 @@ public void testCreate() throws Exception { Processor processor = new TestProcessor(ingestDocument -> { }); Map registry = new HashMap<>(); registry.put("_name", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); Map config = new HashMap<>(); config.put("field", "_field"); @@ -59,7 +59,7 @@ public void testSetIgnoreMissing() throws Exception { Processor processor = new TestProcessor(ingestDocument -> { }); Map registry = new HashMap<>(); registry.put("_name", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); Map config = new HashMap<>(); config.put("field", "_field"); @@ -77,7 +77,7 @@ public void testCreateWithTooManyProcessorTypes() throws Exception { Map registry = new HashMap<>(); registry.put("_first", (r, t, c) -> processor); registry.put("_second", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); Map config = new HashMap<>(); config.put("field", "_field"); @@ -90,7 +90,7 @@ public void testCreateWithTooManyProcessorTypes() throws Exception { } public void testCreateWithNonExistingProcessorType() throws Exception { - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); Map config = new HashMap<>(); config.put("field", "_field"); config.put("processor", Collections.singletonMap("_name", Collections.emptyMap())); @@ -103,7 +103,7 @@ public void testCreateWithMissingField() throws Exception { Processor processor = new TestProcessor(ingestDocument -> { }); Map registry = new HashMap<>(); registry.put("_name", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); Map config = new HashMap<>(); config.put("processor", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap()))); Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(registry, null, config)); @@ -111,7 +111,7 @@ public void testCreateWithMissingField() throws Exception { } public void testCreateWithMissingProcessor() { - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); Map config = new HashMap<>(); config.put("field", "_field"); Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(Collections.emptyMap(), null, config)); diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java index 713b4a355bdab..76679c1f9fcd7 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java @@ -19,9 +19,6 @@ package org.elasticsearch.ingest.common; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.ingest.CompoundProcessor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Processor; @@ -29,7 +26,6 @@ import org.elasticsearch.ingest.TestTemplateService; import org.elasticsearch.script.TemplateScript; import org.elasticsearch.test.ESTestCase; -import org.junit.Before; import java.util.ArrayList; import java.util.Arrays; @@ -38,74 +34,15 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; -import java.util.function.Consumer; +import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; import static org.hamcrest.Matchers.equalTo; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyZeroInteractions; public class ForEachProcessorTests extends ESTestCase { - @SuppressWarnings("unchecked") - private Consumer genericExecutor = (Consumer) mock(Consumer.class); - private final ExecutorService direct = EsExecutors.newDirectExecutorService(); - - @Before - public void setup() { - //execute runnable on same thread for simplicity. some tests will override this and actually run async - doAnswer(invocationOnMock -> { - direct.execute((Runnable) invocationOnMock.getArguments()[0]); - return null; - }).when(genericExecutor).accept(any(Runnable.class)); - } - - public void testExecute() throws Exception { - ThreadPoolExecutor asyncExecutor = - EsExecutors.newScaling(getClass().getName() + "/" + getTestName(), between(1, 2), between(3, 4), 10, TimeUnit.SECONDS, - EsExecutors.daemonThreadFactory("test"), new ThreadContext(Settings.EMPTY)); - doAnswer(invocationOnMock -> { - asyncExecutor.execute((Runnable) invocationOnMock.getArguments()[0]); - return null; - }).when(genericExecutor).accept(any(Runnable.class)); - - List values = new ArrayList<>(); - values.add("foo"); - values.add("bar"); - values.add("baz"); - IntStream.range(0, ForEachProcessor.MAX_RECURSE_PER_THREAD).forEach(value -> values.add("a")); - IngestDocument ingestDocument = new IngestDocument( - "_index", "_id", null, null, null, Collections.singletonMap("values", values) - ); - - ForEachProcessor processor = new ForEachProcessor( - "_tag", "values", new UppercaseProcessor("_tag", "_ingest._value", false, "_ingest._value"), - false, genericExecutor - ); - processor.execute(ingestDocument, (result, e) -> {}); - - assertBusy(() -> assertEquals(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD, asyncExecutor.getCompletedTaskCount())); - asyncExecutor.shutdown(); - asyncExecutor.awaitTermination(5, TimeUnit.SECONDS); - - @SuppressWarnings("unchecked") - List result = ingestDocument.getFieldValue("values", List.class); - assertThat(result.get(0), equalTo("FOO")); - assertThat(result.get(1), equalTo("BAR")); - assertThat(result.get(2), equalTo("BAZ")); - IntStream.range(3, ForEachProcessor.MAX_RECURSE_PER_THREAD + 3).forEach(i -> assertThat(result.get(i), equalTo("A"))); - verify(genericExecutor, times(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD)).accept(any(Runnable.class)); - } - public void testExecuteWithAsyncProcessor() throws Exception { List values = new ArrayList<>(); values.add("foo"); @@ -116,7 +53,7 @@ public void testExecuteWithAsyncProcessor() throws Exception { ); ForEachProcessor processor = new ForEachProcessor("_tag", "values", new AsyncUpperCaseProcessor("_ingest._value"), - false, genericExecutor); + false); processor.execute(ingestDocument, (result, e) -> { }); @@ -128,8 +65,6 @@ public void testExecuteWithAsyncProcessor() throws Exception { assertThat(result.get(1), equalTo("BAR")); assertThat(result.get(2), equalTo("BAZ")); }); - - verifyZeroInteractions(genericExecutor); } public void testExecuteWithFailure() throws Exception { @@ -142,7 +77,7 @@ public void testExecuteWithFailure() throws Exception { throw new RuntimeException("failure"); } }); - ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor, false, genericExecutor); + ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor, false); Exception[] exceptions = new Exception[1]; processor.execute(ingestDocument, (result, e) -> {exceptions[0] = e;}); assertThat(exceptions[0].getMessage(), equalTo("failure")); @@ -160,7 +95,7 @@ public void testExecuteWithFailure() throws Exception { Processor onFailureProcessor = new TestProcessor(ingestDocument1 -> {}); processor = new ForEachProcessor( "_tag", "values", new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor)), - false, genericExecutor + false ); processor.execute(ingestDocument, (result, e) -> {}); assertThat(testProcessor.getInvokedCounter(), equalTo(3)); @@ -179,7 +114,7 @@ public void testMetaDataAvailable() throws Exception { id.setFieldValue("_ingest._value.index", id.getSourceAndMetadata().get("_index")); id.setFieldValue("_ingest._value.id", id.getSourceAndMetadata().get("_id")); }); - ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false, genericExecutor); + ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false); processor.execute(ingestDocument, (result, e) -> {}); assertThat(innerProcessor.getInvokedCounter(), equalTo(2)); @@ -205,7 +140,7 @@ public void testRestOfTheDocumentIsAvailable() throws Exception { ForEachProcessor processor = new ForEachProcessor( "_tag", "values", new SetProcessor("_tag", new TestTemplateService.MockTemplateScript.Factory("_ingest._value.new_field"), - (model) -> model.get("other")), false, genericExecutor); + (model) -> model.get("other")), false); processor.execute(ingestDocument, (result, e) -> {}); assertThat(ingestDocument.getFieldValue("values.0.new_field", String.class), equalTo("value")); @@ -215,17 +150,10 @@ public void testRestOfTheDocumentIsAvailable() throws Exception { assertThat(ingestDocument.getFieldValue("values.4.new_field", String.class), equalTo("value")); } - public void testRandom() throws Exception { - ThreadPoolExecutor asyncExecutor = - EsExecutors.newScaling(getClass().getName() + "/" + getTestName(), between(1, 2), between(3, 4), 10, TimeUnit.SECONDS, - EsExecutors.daemonThreadFactory("test"), new ThreadContext(Settings.EMPTY)); - doAnswer(invocationOnMock -> { - asyncExecutor.execute((Runnable) invocationOnMock.getArguments()[0]); - return null; - }).when(genericExecutor).accept(any(Runnable.class)); + public void testRandom() { Processor innerProcessor = new Processor() { @Override - public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + public IngestDocument execute(IngestDocument ingestDocument) { String existingValue = ingestDocument.getFieldValue("_ingest._value", String.class); ingestDocument.setFieldValue("_ingest._value", existingValue + "."); return ingestDocument; @@ -242,28 +170,19 @@ public String getTag() { } }; int numValues = randomIntBetween(1, 10000); - List values = new ArrayList<>(numValues); - for (int i = 0; i < numValues; i++) { - values.add(""); - } + List values = IntStream.range(0, numValues).mapToObj(i->"").collect(Collectors.toList()); + IngestDocument ingestDocument = new IngestDocument( "_index", "_id", null, null, null, Collections.singletonMap("values", values) ); - ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false, genericExecutor); + ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false); processor.execute(ingestDocument, (result, e) -> {}); - assertBusy(() -> assertEquals(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD, asyncExecutor.getCompletedTaskCount())); - asyncExecutor.shutdown(); - asyncExecutor.awaitTermination(5, TimeUnit.SECONDS); - @SuppressWarnings("unchecked") List result = ingestDocument.getFieldValue("values", List.class); assertThat(result.size(), equalTo(numValues)); - for (String r : result) { - assertThat(r, equalTo(".")); - } - verify(genericExecutor, times(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD)).accept(any(Runnable.class)); + result.forEach(r -> assertThat(r, equalTo("."))); } public void testModifyFieldsOutsideArray() throws Exception { @@ -281,7 +200,7 @@ public void testModifyFieldsOutsideArray() throws Exception { "_tag", "values", new CompoundProcessor(false, Collections.singletonList(new UppercaseProcessor("_tag_upper", "_ingest._value", false, "_ingest._value")), Collections.singletonList(new AppendProcessor("_tag", template, (model) -> (Collections.singletonList("added")))) - ), false, genericExecutor); + ), false); processor.execute(ingestDocument, (result, e) -> {}); List result = ingestDocument.getFieldValue("values", List.class); @@ -307,7 +226,7 @@ public void testScalarValueAllowsUnderscoreValueFieldToRemainAccessible() throws TestProcessor processor = new TestProcessor(doc -> doc.setFieldValue("_ingest._value", doc.getFieldValue("_source._value", String.class))); - ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor, false, genericExecutor); + ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor, false); forEachProcessor.execute(ingestDocument, (result, e) -> {}); List result = ingestDocument.getFieldValue("values", List.class); @@ -340,8 +259,8 @@ public void testNestedForEach() throws Exception { doc -> doc.setFieldValue("_ingest._value", doc.getFieldValue("_ingest._value", String.class).toUpperCase(Locale.ENGLISH)) ); ForEachProcessor processor = new ForEachProcessor( - "_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor, false, genericExecutor), - false, genericExecutor); + "_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor, false), + false); processor.execute(ingestDocument, (result, e) -> {}); List result = ingestDocument.getFieldValue("values1.0.values2", List.class); @@ -359,7 +278,7 @@ public void testIgnoreMissing() throws Exception { ); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); TestProcessor testProcessor = new TestProcessor(doc -> {}); - ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", testProcessor, true, genericExecutor); + ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", testProcessor, true); processor.execute(ingestDocument, (result, e) -> {}); assertIngestDocument(originalIngestDocument, ingestDocument); assertThat(testProcessor.getInvokedCounter(), equalTo(0));