Skip to content

Commit 2843992

Browse files
authored
Associate sub-requests to their parent task in multi search API (#44492)
Multi search accepts multiple search requests and runs them as independent requests, each one as part of their own search task. Today they don't get associated though with their parent multi search task, which would be useful to monitor which msearch a certain search was part of, if any, and also to cancel all of the sub-requests in case the parent msearch gets cancelled (though this will also require making the multi search task cancellable as a follow-up).
1 parent db93b0a commit 2843992

File tree

3 files changed

+70
-1
lines changed

3 files changed

+70
-1
lines changed

server/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ protected void doExecute(Task task, MultiSearchRequest request, ActionListener<M
8686
Queue<SearchRequestSlot> searchRequestSlots = new ConcurrentLinkedQueue<>();
8787
for (int i = 0; i < request.requests().size(); i++) {
8888
SearchRequest searchRequest = request.requests().get(i);
89+
searchRequest.setParentTask(client.getLocalNodeId(), task.getId());
8990
searchRequestSlots.add(new SearchRequestSlot(searchRequest, i));
9091
}
9192

server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,11 @@ public void search(final SearchRequest request, final ActionListener<SearchRespo
159159
ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY));
160160
});
161161
}
162+
163+
@Override
164+
public String getLocalNodeId() {
165+
return "local_node_id";
166+
}
162167
};
163168

164169
if (controlledClock) {

server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.action.support.ActionFilter;
2525
import org.elasticsearch.action.support.ActionFilters;
2626
import org.elasticsearch.action.support.ActionTestUtils;
27+
import org.elasticsearch.action.support.PlainActionFuture;
2728
import org.elasticsearch.client.node.NodeClient;
2829
import org.elasticsearch.cluster.ClusterName;
2930
import org.elasticsearch.cluster.ClusterState;
@@ -35,6 +36,7 @@
3536
import org.elasticsearch.common.UUIDs;
3637
import org.elasticsearch.common.settings.Settings;
3738
import org.elasticsearch.search.internal.InternalSearchResponse;
39+
import org.elasticsearch.tasks.Task;
3840
import org.elasticsearch.tasks.TaskManager;
3941
import org.elasticsearch.test.ESTestCase;
4042
import org.elasticsearch.threadpool.TestThreadPool;
@@ -53,6 +55,7 @@
5355
import java.util.concurrent.atomic.AtomicInteger;
5456
import java.util.concurrent.atomic.AtomicReference;
5557

58+
import static org.elasticsearch.action.support.PlainActionFuture.newFuture;
5659
import static org.hamcrest.Matchers.equalTo;
5760
import static org.hamcrest.Matchers.nullValue;
5861
import static org.mockito.Mockito.mock;
@@ -76,7 +79,62 @@ public void tearDown() throws Exception {
7679
super.tearDown();
7780
}
7881

79-
public void testBatchExecute() throws Exception {
82+
public void testParentTaskId() throws Exception {
83+
// Initialize dependencies of TransportMultiSearchAction
84+
Settings settings = Settings.builder()
85+
.put("node.name", TransportMultiSearchActionTests.class.getSimpleName())
86+
.build();
87+
ActionFilters actionFilters = mock(ActionFilters.class);
88+
when(actionFilters.filters()).thenReturn(new ActionFilter[0]);
89+
ThreadPool threadPool = new ThreadPool(settings);
90+
try {
91+
TransportService transportService = new TransportService(Settings.EMPTY, mock(Transport.class), threadPool,
92+
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
93+
boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null,
94+
Collections.emptySet()) {
95+
@Override
96+
public TaskManager getTaskManager() {
97+
return taskManager;
98+
}
99+
};
100+
ClusterService clusterService = mock(ClusterService.class);
101+
when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("test")).build());
102+
103+
String localNodeId = randomAlphaOfLengthBetween(3, 10);
104+
int numSearchRequests = randomIntBetween(1, 100);
105+
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
106+
for (int i = 0; i < numSearchRequests; i++) {
107+
multiSearchRequest.add(new SearchRequest());
108+
}
109+
AtomicInteger counter = new AtomicInteger(0);
110+
Task task = multiSearchRequest.createTask(randomLong(), "type", "action", null, Collections.emptyMap());
111+
NodeClient client = new NodeClient(settings, threadPool) {
112+
@Override
113+
public void search(final SearchRequest request, final ActionListener<SearchResponse> listener) {
114+
assertEquals(task.getId(), request.getParentTask().getId());
115+
assertEquals(localNodeId, request.getParentTask().getNodeId());
116+
counter.incrementAndGet();
117+
listener.onResponse(SearchResponse.empty(() -> 1L, SearchResponse.Clusters.EMPTY));
118+
}
119+
120+
@Override
121+
public String getLocalNodeId() {
122+
return localNodeId;
123+
}
124+
};
125+
TransportMultiSearchAction action =
126+
new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, 10, System::nanoTime, client);
127+
128+
PlainActionFuture<MultiSearchResponse> future = newFuture();
129+
action.execute(task, multiSearchRequest, future);
130+
future.get();
131+
assertEquals(numSearchRequests, counter.get());
132+
} finally {
133+
assertTrue(ESTestCase.terminate(threadPool));
134+
}
135+
}
136+
137+
public void testBatchExecute() {
80138
// Initialize dependencies of TransportMultiSearchAction
81139
Settings settings = Settings.builder()
82140
.put("node.name", TransportMultiSearchActionTests.class.getSimpleName())
@@ -123,6 +181,11 @@ public void search(final SearchRequest request, final ActionListener<SearchRespo
123181
ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY));
124182
});
125183
}
184+
185+
@Override
186+
public String getLocalNodeId() {
187+
return "local_node_id";
188+
}
126189
};
127190

128191
TransportMultiSearchAction action =

0 commit comments

Comments
 (0)