Skip to content

Commit 17ad88d

Browse files
committed
Makes search action cancelable by task management API
Long running searches now can be cancelled using standard task cancellation mechanism.
1 parent c3761b8 commit 17ad88d

File tree

45 files changed

+1034
-138
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1034
-138
lines changed

core/src/main/java/org/elasticsearch/ElasticsearchException.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -693,7 +693,9 @@ enum ElasticsearchExceptionHandle {
693693
ShardStateAction.NoLongerPrimaryShardException::new, 142),
694694
SCRIPT_EXCEPTION(org.elasticsearch.script.ScriptException.class, org.elasticsearch.script.ScriptException::new, 143),
695695
NOT_MASTER_EXCEPTION(org.elasticsearch.cluster.NotMasterException.class, org.elasticsearch.cluster.NotMasterException::new, 144),
696-
STATUS_EXCEPTION(org.elasticsearch.ElasticsearchStatusException.class, org.elasticsearch.ElasticsearchStatusException::new, 145);
696+
STATUS_EXCEPTION(org.elasticsearch.ElasticsearchStatusException.class, org.elasticsearch.ElasticsearchStatusException::new, 145),
697+
TASK_CANCELLED_EXCEPTION(org.elasticsearch.tasks.TaskCancelledException.class,
698+
org.elasticsearch.tasks.TaskCancelledException::new, 146);
697699

698700
final Class<? extends ElasticsearchException> exceptionClass;
699701
final FunctionThatThrowsIOException<StreamInput, ? extends ElasticsearchException> constructor;

core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
5959
protected final SearchRequest request;
6060
/** Used by subclasses to resolve node ids to DiscoveryNodes. **/
6161
protected final Function<String, DiscoveryNode> nodeIdToDiscoveryNode;
62+
protected final SearchTask task;
6263
protected final int expectedSuccessfulOps;
6364
private final int expectedTotalOps;
6465
protected final AtomicInteger successfulOps = new AtomicInteger();
@@ -74,12 +75,13 @@ protected AbstractSearchAsyncAction(Logger logger, SearchTransportService search
7475
Function<String, DiscoveryNode> nodeIdToDiscoveryNode,
7576
Map<String, AliasFilter> aliasFilter, Executor executor, SearchRequest request,
7677
ActionListener<SearchResponse> listener, GroupShardsIterator shardsIts, long startTime,
77-
long clusterStateVersion) {
78+
long clusterStateVersion, SearchTask task) {
7879
super(startTime);
7980
this.logger = logger;
8081
this.searchTransportService = searchTransportService;
8182
this.executor = executor;
8283
this.request = request;
84+
this.task = task;
8385
this.listener = listener;
8486
this.nodeIdToDiscoveryNode = nodeIdToDiscoveryNode;
8587
this.clusterStateVersion = clusterStateVersion;

core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,9 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<DfsSea
4949
Function<String, DiscoveryNode> nodeIdToDiscoveryNode,
5050
Map<String, AliasFilter> aliasFilter, SearchPhaseController searchPhaseController,
5151
Executor executor, SearchRequest request, ActionListener<SearchResponse> listener,
52-
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion) {
52+
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion, SearchTask task) {
5353
super(logger, searchTransportService, nodeIdToDiscoveryNode, aliasFilter, executor,
54-
request, listener, shardsIts, startTime, clusterStateVersion);
54+
request, listener, shardsIts, startTime, clusterStateVersion, task);
5555
this.searchPhaseController = searchPhaseController;
5656
queryFetchResults = new AtomicArray<>(firstResults.length());
5757
}
@@ -64,7 +64,7 @@ protected String firstPhaseName() {
6464
@Override
6565
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
6666
ActionListener<DfsSearchResult> listener) {
67-
searchTransportService.sendExecuteDfs(node, request, listener);
67+
searchTransportService.sendExecuteDfs(node, request, task, listener);
6868
}
6969

7070
@Override
@@ -82,7 +82,7 @@ protected void moveToSecondPhase() {
8282

8383
void executeSecondPhase(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter,
8484
final DiscoveryNode node, final QuerySearchRequest querySearchRequest) {
85-
searchTransportService.sendExecuteFetch(node, querySearchRequest, new ActionListener<QueryFetchSearchResult>() {
85+
searchTransportService.sendExecuteFetch(node, querySearchRequest, task, new ActionListener<QueryFetchSearchResult>() {
8686
@Override
8787
public void onResponse(QueryFetchSearchResult result) {
8888
result.shardTarget(dfsResult.shardTarget());

core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,10 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
5757
Function<String, DiscoveryNode> nodeIdToDiscoveryNode,
5858
Map<String, AliasFilter> aliasFilter, SearchPhaseController searchPhaseController,
5959
Executor executor, SearchRequest request, ActionListener<SearchResponse> listener,
60-
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion) {
60+
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion,
61+
SearchTask task) {
6162
super(logger, searchTransportService, nodeIdToDiscoveryNode, aliasFilter, executor,
62-
request, listener, shardsIts, startTime, clusterStateVersion);
63+
request, listener, shardsIts, startTime, clusterStateVersion, task);
6364
this.searchPhaseController = searchPhaseController;
6465
queryResults = new AtomicArray<>(firstResults.length());
6566
fetchResults = new AtomicArray<>(firstResults.length());
@@ -74,7 +75,7 @@ protected String firstPhaseName() {
7475
@Override
7576
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
7677
ActionListener<DfsSearchResult> listener) {
77-
searchTransportService.sendExecuteDfs(node, request, listener);
78+
searchTransportService.sendExecuteDfs(node, request, task, listener);
7879
}
7980

8081
@Override
@@ -91,7 +92,7 @@ protected void moveToSecondPhase() {
9192

9293
void executeQuery(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter,
9394
final QuerySearchRequest querySearchRequest, final DiscoveryNode node) {
94-
searchTransportService.sendExecuteQuery(node, querySearchRequest, new ActionListener<QuerySearchResult>() {
95+
searchTransportService.sendExecuteQuery(node, querySearchRequest, task, new ActionListener<QuerySearchResult>() {
9596
@Override
9697
public void onResponse(QuerySearchResult result) {
9798
result.shardTarget(dfsResult.shardTarget());
@@ -162,7 +163,7 @@ void innerExecuteFetchPhase() throws Exception {
162163

163164
void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter,
164165
final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
165-
searchTransportService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener<FetchSearchResult>() {
166+
searchTransportService.sendExecuteFetch(node, fetchSearchRequest, task, new ActionListener<FetchSearchResult>() {
166167
@Override
167168
public void onResponse(FetchSearchResult result) {
168169
result.shardTarget(shardTarget);

core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,10 @@ class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<QueryFetc
4343
Map<String, AliasFilter> aliasFilter,
4444
SearchPhaseController searchPhaseController, Executor executor,
4545
SearchRequest request, ActionListener<SearchResponse> listener,
46-
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion) {
46+
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion,
47+
SearchTask task) {
4748
super(logger, searchTransportService, nodeIdToDiscoveryNode, aliasFilter, executor,
48-
request, listener, shardsIts, startTime, clusterStateVersion);
49+
request, listener, shardsIts, startTime, clusterStateVersion, task);
4950
this.searchPhaseController = searchPhaseController;
5051

5152
}
@@ -58,7 +59,7 @@ protected String firstPhaseName() {
5859
@Override
5960
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
6061
ActionListener<QueryFetchSearchResult> listener) {
61-
searchTransportService.sendExecuteFetch(node, request, listener);
62+
searchTransportService.sendExecuteFetch(node, request, task, listener);
6263
}
6364

6465
@Override

core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,10 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySea
5454
AliasFilter> aliasFilter,
5555
SearchPhaseController searchPhaseController, Executor executor,
5656
SearchRequest request, ActionListener<SearchResponse> listener,
57-
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion) {
57+
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion,
58+
SearchTask task) {
5859
super(logger, searchTransportService, nodeIdToDiscoveryNode, aliasFilter, executor, request, listener,
59-
shardsIts, startTime, clusterStateVersion);
60+
shardsIts, startTime, clusterStateVersion, task);
6061
this.searchPhaseController = searchPhaseController;
6162
fetchResults = new AtomicArray<>(firstResults.length());
6263
docIdsToLoad = new AtomicArray<>(firstResults.length());
@@ -70,7 +71,7 @@ protected String firstPhaseName() {
7071
@Override
7172
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
7273
ActionListener<QuerySearchResultProvider> listener) {
73-
searchTransportService.sendExecuteQuery(node, request, listener);
74+
searchTransportService.sendExecuteQuery(node, request, task, listener);
7475
}
7576

7677
@Override
@@ -97,7 +98,7 @@ protected void moveToSecondPhase() throws Exception {
9798

9899
void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter,
99100
final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
100-
searchTransportService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener<FetchSearchResult>() {
101+
searchTransportService.sendExecuteFetch(node, fetchSearchRequest, task, new ActionListener<FetchSearchResult>() {
101102
@Override
102103
public void onResponse(FetchSearchResult result) {
103104
result.shardTarget(shardTarget);

core/src/main/java/org/elasticsearch/action/search/SearchRequest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.elasticsearch.common.unit.TimeValue;
3232
import org.elasticsearch.search.Scroll;
3333
import org.elasticsearch.search.builder.SearchSourceBuilder;
34+
import org.elasticsearch.tasks.Task;
35+
import org.elasticsearch.tasks.TaskId;
3436

3537
import java.io.IOException;
3638
import java.util.Arrays;
@@ -275,6 +277,11 @@ public boolean isSuggestOnly() {
275277
return source != null && source.isSuggestOnly();
276278
}
277279

280+
@Override
281+
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
282+
return new SearchTask(id, type, action, getDescription(), parentTaskId);
283+
}
284+
278285
@Override
279286
public void readFrom(StreamInput in) throws IOException {
280287
super.readFrom(in);

core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction {
4444
private final SearchPhaseController searchPhaseController;
4545
private final SearchTransportService searchTransportService;
4646
private final SearchScrollRequest request;
47+
private final SearchTask task;
4748
private final ActionListener<SearchResponse> listener;
4849
private final ParsedScrollId scrollId;
4950
private final DiscoveryNodes nodes;
@@ -52,13 +53,14 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction {
5253
private final AtomicInteger successfulOps;
5354
private final AtomicInteger counter;
5455

55-
SearchScrollQueryAndFetchAsyncAction(Logger logger, ClusterService clusterService,
56-
SearchTransportService searchTransportService, SearchPhaseController searchPhaseController,
57-
SearchScrollRequest request, ParsedScrollId scrollId, ActionListener<SearchResponse> listener) {
56+
SearchScrollQueryAndFetchAsyncAction(Logger logger, ClusterService clusterService, SearchTransportService searchTransportService,
57+
SearchPhaseController searchPhaseController, SearchScrollRequest request, SearchTask task,
58+
ParsedScrollId scrollId, ActionListener<SearchResponse> listener) {
5859
this.logger = logger;
5960
this.searchPhaseController = searchPhaseController;
6061
this.searchTransportService = searchTransportService;
6162
this.request = request;
63+
this.task = task;
6264
this.listener = listener;
6365
this.scrollId = scrollId;
6466
this.nodes = clusterService.state().nodes();
@@ -128,7 +130,7 @@ public void start() {
128130

129131
void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) {
130132
InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request);
131-
searchTransportService.sendExecuteFetch(node, internalRequest, new ActionListener<ScrollQueryFetchSearchResult>() {
133+
searchTransportService.sendExecuteFetch(node, internalRequest, task, new ActionListener<ScrollQueryFetchSearchResult>() {
132134
@Override
133135
public void onResponse(ScrollQueryFetchSearchResult result) {
134136
queryFetchResults.set(shardIndex, result.result());

core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
4545

4646
private final Logger logger;
47+
private final SearchTask task;
4748
private final SearchTransportService searchTransportService;
4849
private final SearchPhaseController searchPhaseController;
4950
private final SearchScrollRequest request;
@@ -56,13 +57,14 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
5657
private volatile ScoreDoc[] sortedShardDocs;
5758
private final AtomicInteger successfulOps;
5859

59-
SearchScrollQueryThenFetchAsyncAction(Logger logger, ClusterService clusterService,
60-
SearchTransportService searchTransportService, SearchPhaseController searchPhaseController,
61-
SearchScrollRequest request, ParsedScrollId scrollId, ActionListener<SearchResponse> listener) {
60+
SearchScrollQueryThenFetchAsyncAction(Logger logger, ClusterService clusterService, SearchTransportService searchTransportService,
61+
SearchPhaseController searchPhaseController, SearchScrollRequest request, SearchTask task,
62+
ParsedScrollId scrollId, ActionListener<SearchResponse> listener) {
6263
this.logger = logger;
6364
this.searchTransportService = searchTransportService;
6465
this.searchPhaseController = searchPhaseController;
6566
this.request = request;
67+
this.task = task;
6668
this.listener = listener;
6769
this.scrollId = scrollId;
6870
this.nodes = clusterService.state().nodes();
@@ -124,7 +126,7 @@ public void start() {
124126

125127
private void executeQueryPhase(final int shardIndex, final AtomicInteger counter, DiscoveryNode node, final long searchId) {
126128
InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request);
127-
searchTransportService.sendExecuteQuery(node, internalRequest, new ActionListener<ScrollQuerySearchResult>() {
129+
searchTransportService.sendExecuteQuery(node, internalRequest, task, new ActionListener<ScrollQuerySearchResult>() {
128130
@Override
129131
public void onResponse(ScrollQuerySearchResult result) {
130132
queryResults.set(shardIndex, result.queryResult());
@@ -184,7 +186,7 @@ private void executeFetchPhase() throws Exception {
184186
ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index];
185187
ShardFetchRequest shardFetchRequest = new ShardFetchRequest(querySearchResult.id(), docIds, lastEmittedDoc);
186188
DiscoveryNode node = nodes.get(querySearchResult.shardTarget().nodeId());
187-
searchTransportService.sendExecuteFetchScroll(node, shardFetchRequest, new ActionListener<FetchSearchResult>() {
189+
searchTransportService.sendExecuteFetchScroll(node, shardFetchRequest, task, new ActionListener<FetchSearchResult>() {
188190
@Override
189191
public void onResponse(FetchSearchResult result) {
190192
result.shardTarget(querySearchResult.shardTarget());

core/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.elasticsearch.common.io.stream.StreamOutput;
2626
import org.elasticsearch.common.unit.TimeValue;
2727
import org.elasticsearch.search.Scroll;
28+
import org.elasticsearch.tasks.Task;
29+
import org.elasticsearch.tasks.TaskId;
2830

2931
import java.io.IOException;
3032
import java.util.Objects;
@@ -107,6 +109,11 @@ public void writeTo(StreamOutput out) throws IOException {
107109
out.writeOptionalWriteable(scroll);
108110
}
109111

112+
@Override
113+
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
114+
return new SearchTask(id, type, action, getDescription(), parentTaskId);
115+
}
116+
110117
@Override
111118
public boolean equals(Object o) {
112119
if (this == o) {

0 commit comments

Comments
 (0)