From 8dfe0ac88ab52466d1aecdae3918e142ddef7a20 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Wed, 17 Jul 2019 14:43:25 +0200 Subject: [PATCH 1/2] Associate sub-search requests to their parent task in multi search API Multi search accepts multiple search requests and runs them as independent requests, each one as part of their own search task. Today they don't get associated though with their parent multi search task, which would be useful to monitor which msearch a certain search was part of, if any, and also to cancel all of the sub-requests in case the parent msearch gets cancelled. --- .../search/TransportMultiSearchAction.java | 1 + .../TransportMultiSearchActionTests.java | 60 ++++++++++++++++++- 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java index f03e1fd4dd1c9..66aa15c569279 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java @@ -86,6 +86,7 @@ protected void doExecute(Task task, MultiSearchRequest request, ActionListener searchRequestSlots = new ConcurrentLinkedQueue<>(); for (int i = 0; i < request.requests().size(); i++) { SearchRequest searchRequest = request.requests().get(i); + searchRequest.setParentTask(client.getLocalNodeId(), task.getId()); searchRequestSlots.add(new SearchRequestSlot(searchRequest, i)); } diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java index b405b0790debc..354eed4935aa1 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.support.ActionFilter; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionTestUtils; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -35,6 +36,7 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; @@ -53,6 +55,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static org.elasticsearch.action.support.PlainActionFuture.newFuture; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.mock; @@ -76,7 +79,62 @@ public void tearDown() throws Exception { super.tearDown(); } - public void testBatchExecute() throws Exception { + public void testParentTaskId() throws Exception { + // Initialize dependencies of TransportMultiSearchAction + Settings settings = Settings.builder() + .put("node.name", TransportMultiSearchActionTests.class.getSimpleName()) + .build(); + ActionFilters actionFilters = mock(ActionFilters.class); + when(actionFilters.filters()).thenReturn(new ActionFilter[0]); + ThreadPool threadPool = new ThreadPool(settings); + try { + TransportService transportService = new TransportService(Settings.EMPTY, mock(Transport.class), threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null, + Collections.emptySet()) { + @Override + public TaskManager getTaskManager() { + return taskManager; + } + }; + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("test")).build()); + + String localNodeId = randomAlphaOfLengthBetween(3, 10); + int numSearchRequests = randomIntBetween(1, 100); + MultiSearchRequest multiSearchRequest = new MultiSearchRequest(); + for (int i = 0; i < numSearchRequests; i++) { + multiSearchRequest.add(new SearchRequest()); + } + AtomicInteger counter = new AtomicInteger(0); + Task task = multiSearchRequest.createTask(randomLong(), "type", "action", null, Collections.emptyMap()); + NodeClient client = new NodeClient(settings, threadPool) { + @Override + public void search(final SearchRequest request, final ActionListener listener) { + assertEquals(task.getId(), request.getParentTask().getId()); + assertEquals(localNodeId, request.getParentTask().getNodeId()); + counter.incrementAndGet(); + listener.onResponse(SearchResponse.empty(() -> 1L, SearchResponse.Clusters.EMPTY)); + } + + @Override + public String getLocalNodeId() { + return localNodeId; + } + }; + TransportMultiSearchAction action = + new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, 10, System::nanoTime, client); + + PlainActionFuture future = newFuture(); + action.execute(task, multiSearchRequest, future); + future.get(); + assertEquals(numSearchRequests, counter.get()); + } finally { + assertTrue(ESTestCase.terminate(threadPool)); + } + } + + public void testBatchExecute() { // Initialize dependencies of TransportMultiSearchAction Settings settings = Settings.builder() .put("node.name", TransportMultiSearchActionTests.class.getSimpleName()) From 22109378662bb9c1c34007e60a1e0c147e0b1296 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Wed, 17 Jul 2019 16:08:08 +0200 Subject: [PATCH 2/2] address failing tests --- .../action/search/MultiSearchActionTookTests.java | 5 +++++ .../action/search/TransportMultiSearchActionTests.java | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java b/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java index 01f1109ef3bed..19b53e2f8d380 100644 --- a/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java @@ -159,6 +159,11 @@ public void search(final SearchRequest request, final ActionListener