From 792a8a6daab889536e5583e471f87bc0f54f9364 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 16 Dec 2019 18:02:52 +0100 Subject: [PATCH 1/4] Fix ingest simulate response document order if processor executes async If a processor executes asynchronously and the ingest simulate api simulates with multiple documents then the order of the documents in the response may not match the order of the documents in the request. Alexander Reelsen discovered this issue with the enrich processor with the following reproduction: ``` PUT cities/_doc/munich {"zip":"80331","city":"Munich"} PUT cities/_doc/berlin {"zip":"10965","city":"Berlin"} PUT /_enrich/policy/zip-policy { "match": { "indices": "cities", "match_field": "zip", "enrich_fields": [ "city" ] } } POST /_enrich/policy/zip-policy/_execute GET _cat/indices/.enrich-* POST /_ingest/pipeline/_simulate { "pipeline": { "processors" : [ { "enrich" : { "policy_name": "zip-policy", "field" : "zip", "target_field": "city", "max_matches": "1" } } ] }, "docs": [ { "_id": "first", "_source" : { "zip" : "80331" } } , { "_id": "second", "_source" : { "zip" : "50667" } } ] } ``` --- .../ingest/SimulateExecutionService.java | 8 ++- .../ingest/SimulateExecutionServiceTests.java | 61 +++++++++++++++++++ .../action/EnrichCoordinatorProxyAction.java | 3 +- 3 files changed, 69 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java index 79de0d0c2a7fd..09bba6eb0da07 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java @@ -67,17 +67,21 @@ void executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean v public void execute(SimulatePipelineRequest.Parsed request, ActionListener listener) { threadPool.executor(THREAD_POOL_NAME).execute(ActionRunnable.wrap(listener, l -> { final AtomicInteger counter = new AtomicInteger(); - final List responses = new CopyOnWriteArrayList<>(); + final List responses = + new CopyOnWriteArrayList<>(new SimulateDocumentBaseResult[request.getDocuments().size()]); + int iter = 0; for (IngestDocument ingestDocument : request.getDocuments()) { + final int index = iter; executeDocument(request.getPipeline(), ingestDocument, request.isVerbose(), (response, e) -> { if (response != null) { - responses.add(response); + responses.set(index, response); } if (counter.incrementAndGet() == request.getDocuments().size()) { l.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(), request.isVerbose(), responses)); } }); + iter++; } })); } diff --git a/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java b/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java index 2ced9d1e23dd2..432e2a1af7caf 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java @@ -19,6 +19,9 @@ package org.elasticsearch.action.ingest; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.CompoundProcessor; import org.elasticsearch.ingest.DropProcessor; import org.elasticsearch.ingest.IngestDocument; @@ -29,17 +32,23 @@ import org.elasticsearch.ingest.TestProcessor; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; import org.junit.Before; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -330,4 +339,56 @@ public void testDropDocumentVerboseExtraProcessor() throws Exception { assertThat(verboseResult.getProcessorResults().get(1).getFailure(), nullValue()); } + public void test() throws Exception { + int numDocs = randomIntBetween(1, 64); + List documents = new ArrayList<>(numDocs); + for (int id = 0; id < numDocs; id++) { + documents.add(new IngestDocument("_index", Integer.toString(id), null, 0L, VersionType.INTERNAL, new HashMap<>())); + } + Processor processor1 = new AbstractProcessor(null) { + + @Override + public void execute(IngestDocument ingestDocument, BiConsumer handler) { + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + ingestDocument.setFieldValue("processed", true); + handler.accept(ingestDocument, null); + }); + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public String getType() { + return "none-of-your-business"; + } + }; + Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1)); + SimulatePipelineRequest.Parsed request = new SimulatePipelineRequest.Parsed(pipeline, documents, false); + + AtomicReference responseHolder = new AtomicReference<>(); + AtomicReference errorHolder = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + executionService.execute(request, ActionListener.wrap(response -> { + responseHolder.set(response); + latch.countDown(); + }, e -> { + errorHolder.set(e); + latch.countDown(); + })); + latch.await(); + assertThat(errorHolder.get(), nullValue()); + SimulatePipelineResponse response = responseHolder.get(); + assertThat(response, notNullValue()); + assertThat(response.getResults().size(), equalTo(numDocs)); + + for (int id = 0; id < numDocs; id++) { + SimulateDocumentBaseResult result = (SimulateDocumentBaseResult) response.getResults().get(id); + assertThat(result.getIngestDocument().getMetadata().get(IngestDocument.MetaData.ID), equalTo(Integer.toString(id))); + assertThat(result.getIngestDocument().getSourceAndMetadata().get("processed"), is(true)); + } + } + } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java index add1a90c21a60..503d86e3dd013 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java @@ -63,7 +63,8 @@ public TransportAction(TransportService transportService, ActionFilters actionFi @Override protected void doExecute(Task task, SearchRequest request, ActionListener listener) { - assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE); + assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE) || + Thread.currentThread().getName().contains(ThreadPool.Names.MANAGEMENT); coordinator.schedule(request, listener); } } From e024dcf271e533adc727bdbd054592ccf8341406 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 16 Dec 2019 18:06:57 +0100 Subject: [PATCH 2/4] rename --- .../action/ingest/SimulateExecutionServiceTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java b/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java index 432e2a1af7caf..fabcf16537427 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java @@ -339,7 +339,7 @@ public void testDropDocumentVerboseExtraProcessor() throws Exception { assertThat(verboseResult.getProcessorResults().get(1).getFailure(), nullValue()); } - public void test() throws Exception { + public void testAsyncSimulation() throws Exception { int numDocs = randomIntBetween(1, 64); List documents = new ArrayList<>(numDocs); for (int id = 0; id < numDocs; id++) { From 022822540fc42e3415b0083e544af880cb5d4ea7 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 16 Dec 2019 18:19:03 +0100 Subject: [PATCH 3/4] fixed code formatting and added comment --- .../xpack/enrich/action/EnrichCoordinatorProxyAction.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java index 503d86e3dd013..e285c8fef233e 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java @@ -63,8 +63,10 @@ public TransportAction(TransportService transportService, ActionFilters actionFi @Override protected void doExecute(Task task, SearchRequest request, ActionListener listener) { - assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE) || - Thread.currentThread().getName().contains(ThreadPool.Names.MANAGEMENT); + // Write tp is expected when executing enrich processor from index / bulk api + // Management tp is expected when executing enrich processor from ingest simulate api + assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE) + || Thread.currentThread().getName().contains(ThreadPool.Names.MANAGEMENT); coordinator.schedule(request, listener); } } From 40676bc7be624d2aa9a767faf9ed41731a81f9d9 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 17 Dec 2019 09:59:25 +0100 Subject: [PATCH 4/4] added timeout --- .../action/ingest/SimulateExecutionServiceTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java b/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java index fabcf16537427..7276ef2ebada4 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java @@ -42,6 +42,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -378,7 +379,7 @@ public String getType() { errorHolder.set(e); latch.countDown(); })); - latch.await(); + latch.await(1, TimeUnit.MINUTES); assertThat(errorHolder.get(), nullValue()); SimulatePipelineResponse response = responseHolder.get(); assertThat(response, notNullValue());