Skip to content

Commit 19ec636

Browse files
authored
Submit _async search task should cancel children on cancellation (#58332)
This change allows the submit async search task to cancel children and removes the manual indirection that cancels the search task when the submit task is cancelled. This is now handled by the task cancellation, which can cancel grand-children since #54757.
1 parent e1af32d commit 19ec636

File tree

4 files changed

+37
-59
lines changed

4 files changed

+37
-59
lines changed

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import java.util.Map;
3838
import java.util.concurrent.atomic.AtomicBoolean;
3939
import java.util.concurrent.atomic.AtomicReference;
40-
import java.util.function.BooleanSupplier;
4140
import java.util.function.Consumer;
4241
import java.util.function.Supplier;
4342

@@ -47,7 +46,6 @@
4746
* Task that tracks the progress of a currently running {@link SearchRequest}.
4847
*/
4948
final class AsyncSearchTask extends SearchTask implements AsyncTask {
50-
private final BooleanSupplier checkSubmitCancellation;
5149
private final AsyncExecutionId searchId;
5250
private final Client client;
5351
private final ThreadPool threadPool;
@@ -74,7 +72,6 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
7472
* @param type The type of the task.
7573
* @param action The action name.
7674
* @param parentTaskId The parent task id.
77-
* @param checkSubmitCancellation A boolean supplier that checks if the submit task has been cancelled.
7875
* @param originHeaders All the request context headers.
7976
* @param taskHeaders The filtered request headers for the task.
8077
* @param searchId The {@link AsyncExecutionId} of the task.
@@ -85,7 +82,6 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
8582
String type,
8683
String action,
8784
TaskId parentTaskId,
88-
BooleanSupplier checkSubmitCancellation,
8985
TimeValue keepAlive,
9086
Map<String, String> originHeaders,
9187
Map<String, String> taskHeaders,
@@ -94,7 +90,6 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
9490
ThreadPool threadPool,
9591
Supplier<InternalAggregation.ReduceContext> aggReduceContextSupplier) {
9692
super(id, type, action, "async_search", parentTaskId, taskHeaders);
97-
this.checkSubmitCancellation = checkSubmitCancellation;
9893
this.expirationTimeMillis = getStartTime() + keepAlive.getMillis();
9994
this.originHeaders = originHeaders;
10095
this.searchId = searchId;
@@ -326,12 +321,9 @@ private AsyncSearchResponse getResponseWithHeaders() {
326321
// checks if the search task should be cancelled
327322
private synchronized void checkCancellation() {
328323
long now = System.currentTimeMillis();
329-
if (hasCompleted == false &&
330-
expirationTimeMillis < now || checkSubmitCancellation.getAsBoolean()) {
331-
// we cancel the search task if the initial submit task was cancelled,
332-
// this is needed because the task cancellation mechanism doesn't
333-
// handle the cancellation of grand-children.
334-
cancelTask(() -> {}, checkSubmitCancellation.getAsBoolean() ? "submit was cancelled" : "async search has expired");
324+
if (hasCompleted == false && expirationTimeMillis < now) {
325+
// we cancel expired search task even if they are still running
326+
cancelTask(() -> {}, "async search has expired");
335327
}
336328
}
337329

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java

Lines changed: 31 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@
2929
import org.elasticsearch.index.engine.VersionConflictEngineException;
3030
import org.elasticsearch.search.SearchService;
3131
import org.elasticsearch.search.aggregations.InternalAggregation;
32-
import org.elasticsearch.tasks.CancellableTask;
3332
import org.elasticsearch.tasks.Task;
34-
import org.elasticsearch.tasks.TaskCancelledException;
3533
import org.elasticsearch.tasks.TaskId;
3634
import org.elasticsearch.transport.TransportService;
3735
import org.elasticsearch.xpack.core.XPackPlugin;
@@ -75,8 +73,7 @@ public TransportSubmitAsyncSearchAction(ClusterService clusterService,
7573
}
7674

7775
@Override
78-
protected void doExecute(Task task, SubmitAsyncSearchRequest request, ActionListener<AsyncSearchResponse> submitListener) {
79-
CancellableTask submitTask = (CancellableTask) task;
76+
protected void doExecute(Task submitTask, SubmitAsyncSearchRequest request, ActionListener<AsyncSearchResponse> submitListener) {
8077
final SearchRequest searchRequest = createSearchRequest(request, submitTask, request.getKeepAlive());
8178
AsyncSearchTask searchTask = (AsyncSearchTask) taskManager.register("transport", SearchAction.INSTANCE.name(), searchRequest);
8279
searchAction.execute(searchTask, searchRequest, searchTask.getSearchProgressActionListener());
@@ -88,42 +85,34 @@ public void onResponse(AsyncSearchResponse searchResponse) {
8885
// the task is still running and the user cannot wait more so we create
8986
// a document for further retrieval
9087
try {
91-
if (submitTask.isCancelled()) {
92-
// the user cancelled the submit so we don't store anything
93-
// and propagate the failure
94-
Exception cause = new TaskCancelledException(submitTask.getReasonCancelled());
95-
onFatalFailure(searchTask, cause, searchResponse.isRunning(),
96-
"submit task is cancelled", submitListener);
97-
} else {
98-
final String docId = searchTask.getExecutionId().getDocId();
99-
// creates the fallback response if the node crashes/restarts in the middle of the request
100-
// TODO: store intermediate results ?
101-
AsyncSearchResponse initialResp = searchResponse.clone(searchResponse.getId());
102-
store.createResponse(docId, searchTask.getOriginHeaders(), initialResp,
103-
new ActionListener<>() {
104-
@Override
105-
public void onResponse(IndexResponse r) {
106-
if (searchResponse.isRunning()) {
107-
try {
108-
// store the final response on completion unless the submit is cancelled
109-
searchTask.addCompletionListener(finalResponse ->
110-
onFinalResponse(submitTask, searchTask, finalResponse, () -> {}));
111-
} finally {
112-
submitListener.onResponse(searchResponse);
113-
}
114-
} else {
115-
onFinalResponse(submitTask, searchTask, searchResponse,
116-
() -> submitListener.onResponse(searchResponse));
88+
final String docId = searchTask.getExecutionId().getDocId();
89+
// creates the fallback response if the node crashes/restarts in the middle of the request
90+
// TODO: store intermediate results ?
91+
AsyncSearchResponse initialResp = searchResponse.clone(searchResponse.getId());
92+
store.createResponse(docId, searchTask.getOriginHeaders(), initialResp,
93+
new ActionListener<>() {
94+
@Override
95+
public void onResponse(IndexResponse r) {
96+
if (searchResponse.isRunning()) {
97+
try {
98+
// store the final response on completion unless the submit is cancelled
99+
searchTask.addCompletionListener(finalResponse ->
100+
onFinalResponse(searchTask, finalResponse, () -> {
101+
}));
102+
} finally {
103+
submitListener.onResponse(searchResponse);
117104
}
105+
} else {
106+
onFinalResponse(searchTask, searchResponse, () -> submitListener.onResponse(searchResponse));
118107
}
108+
}
119109

120-
@Override
121-
public void onFailure(Exception exc) {
122-
onFatalFailure(searchTask, exc, searchResponse.isRunning(),
123-
"unable to store initial response", submitListener);
124-
}
125-
});
126-
}
110+
@Override
111+
public void onFailure(Exception exc) {
112+
onFatalFailure(searchTask, exc, searchResponse.isRunning(),
113+
"unable to store initial response", submitListener);
114+
}
115+
});
127116
} catch (Exception exc) {
128117
onFatalFailure(searchTask, exc, searchResponse.isRunning(), "generic error", submitListener);
129118
}
@@ -142,7 +131,7 @@ public void onFailure(Exception exc) {
142131
}, request.getWaitForCompletionTimeout());
143132
}
144133

145-
private SearchRequest createSearchRequest(SubmitAsyncSearchRequest request, CancellableTask submitTask, TimeValue keepAlive) {
134+
private SearchRequest createSearchRequest(SubmitAsyncSearchRequest request, Task submitTask, TimeValue keepAlive) {
146135
String docID = UUIDs.randomBase64UUID();
147136
Map<String, String> originHeaders = nodeClient.threadPool().getThreadContext().getHeaders();
148137
SearchRequest searchRequest = new SearchRequest(request.getSearchRequest()) {
@@ -151,9 +140,8 @@ public AsyncSearchTask createTask(long id, String type, String action, TaskId pa
151140
AsyncExecutionId searchId = new AsyncExecutionId(docID, new TaskId(nodeClient.getLocalNodeId(), id));
152141
Supplier<InternalAggregation.ReduceContext> aggReduceContextSupplier =
153142
() -> requestToAggReduceContextBuilder.apply(request.getSearchRequest());
154-
return new AsyncSearchTask(id, type, action, parentTaskId,
155-
submitTask::isCancelled, keepAlive, originHeaders, taskHeaders, searchId, store.getClient(),
156-
nodeClient.threadPool(), aggReduceContextSupplier);
143+
return new AsyncSearchTask(id, type, action, parentTaskId, keepAlive,
144+
originHeaders, taskHeaders, searchId, store.getClient(), nodeClient.threadPool(), aggReduceContextSupplier);
157145
}
158146
};
159147
searchRequest.setParentTask(new TaskId(nodeClient.getLocalNodeId(), submitTask.getId()));
@@ -179,11 +167,10 @@ private void onFatalFailure(AsyncSearchTask task, Exception error, boolean shoul
179167
}
180168
}
181169

182-
private void onFinalResponse(CancellableTask submitTask,
183-
AsyncSearchTask searchTask,
170+
private void onFinalResponse(AsyncSearchTask searchTask,
184171
AsyncSearchResponse response,
185172
Runnable nextAction) {
186-
if (submitTask.isCancelled() || searchTask.isCancelled()) {
173+
if (searchTask.isCancelled()) {
187174
// the task was cancelled so we ensure that there is nothing stored in the response index.
188175
store.deleteResponse(searchTask.getExecutionId(), ActionListener.wrap(
189176
resp -> unregisterTaskAndMoveOn(searchTask, nextAction),

x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,13 @@ public void afterTest() {
5252
}
5353

5454
private AsyncSearchTask createAsyncSearchTask() {
55-
return new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), () -> false, TimeValue.timeValueHours(1),
55+
return new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), TimeValue.timeValueHours(1),
5656
Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)),
5757
new NoOpClient(threadPool), threadPool, null);
5858
}
5959

6060
public void testWaitForInit() throws InterruptedException {
61-
AsyncSearchTask task = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), () -> false, TimeValue.timeValueHours(1),
61+
AsyncSearchTask task = new AsyncSearchTask(0L, "", "", new TaskId("node1", 0), TimeValue.timeValueHours(1),
6262
Collections.emptyMap(), Collections.emptyMap(), new AsyncExecutionId("0", new TaskId("node1", 1)),
6363
new NoOpClient(threadPool), threadPool, null);
6464
int numShards = randomIntBetween(0, 10);

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/SubmitAsyncSearchRequest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,7 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId,
154154
return new CancellableTask(id, type, action, null, parentTaskId, headers) {
155155
@Override
156156
public boolean shouldCancelChildrenOnCancellation() {
157-
// we cancel the underlying search action explicitly in the submit action
158-
return false;
157+
return true;
159158
}
160159

161160
@Override

0 commit comments

Comments
 (0)