From 17ad88d539ded5a935fc7c37e13a565a3b4a697c Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Tue, 25 Oct 2016 12:27:33 -1000 Subject: [PATCH] Makes search action cancelable by task management API Long running searches now can be cancelled using standard task cancellation mechanism. --- .../elasticsearch/ElasticsearchException.java | 4 +- .../search/AbstractSearchAsyncAction.java | 4 +- .../SearchDfsQueryAndFetchAsyncAction.java | 8 +- .../SearchDfsQueryThenFetchAsyncAction.java | 11 +- .../SearchQueryAndFetchAsyncAction.java | 7 +- .../SearchQueryThenFetchAsyncAction.java | 9 +- .../action/search/SearchRequest.java | 7 + .../SearchScrollQueryAndFetchAsyncAction.java | 10 +- ...SearchScrollQueryThenFetchAsyncAction.java | 12 +- .../action/search/SearchScrollRequest.java | 7 + .../action/search/SearchTask.java | 34 ++ .../action/search/SearchTransportService.java | 191 ++++++----- .../action/search/TransportSearchAction.java | 24 +- .../search/TransportSearchScrollAction.java | 12 +- .../common/settings/ClusterSettings.java | 1 + .../search/DefaultSearchContext.java | 28 ++ .../elasticsearch/search/SearchService.java | 44 ++- .../elasticsearch/search/dfs/DfsPhase.java | 8 + .../dfs/DfsPhaseExecutionException.java | 6 +- .../search/fetch/FetchPhase.java | 4 + .../fetch/FetchPhaseExecutionException.java | 4 + .../search/fetch/ShardFetchRequest.java | 8 + .../internal/FilteredSearchContext.java | 21 ++ .../internal/InternalScrollSearchRequest.java | 8 + .../search/internal/SearchContext.java | 15 + .../internal/ShardSearchTransportRequest.java | 8 + .../search/profile/query/CollectorResult.java | 1 + .../search/query/CancellableCollector.java | 78 +++++ .../search/query/QueryPhase.java | 9 + .../search/query/QuerySearchRequest.java | 8 + .../tasks/TaskCancelledException.java | 38 +++ .../org/elasticsearch/tasks/TaskManager.java | 2 +- .../TaskAwareTransportRequestHandler.java | 30 ++ .../transport/TransportService.java | 23 ++ .../ExceptionSerializationTests.java | 1 + .../node/tasks/CancellableTasksTests.java | 3 +- .../action/search/SearchAsyncActionTests.java | 2 +- .../search/SearchCancellationIT.java | 297 ++++++++++++++++++ .../search/SearchCancellationTests.java | 97 ++++++ .../search/SearchServiceTests.java | 6 +- .../search/query/QueryPhaseTests.java | 4 + docs/reference/cluster/tasks.asciidoc | 1 + docs/reference/search.asciidoc | 13 + docs/reference/search/profile.asciidoc | 43 ++- .../elasticsearch/test/TestSearchContext.java | 21 ++ 45 files changed, 1034 insertions(+), 138 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/action/search/SearchTask.java create mode 100644 core/src/main/java/org/elasticsearch/search/query/CancellableCollector.java create mode 100644 core/src/main/java/org/elasticsearch/tasks/TaskCancelledException.java create mode 100644 core/src/main/java/org/elasticsearch/transport/TaskAwareTransportRequestHandler.java create mode 100644 core/src/test/java/org/elasticsearch/search/SearchCancellationIT.java create mode 100644 core/src/test/java/org/elasticsearch/search/SearchCancellationTests.java diff --git a/core/src/main/java/org/elasticsearch/ElasticsearchException.java b/core/src/main/java/org/elasticsearch/ElasticsearchException.java index 63161a0a1875f..9fd978feb4f0c 100644 --- a/core/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/core/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -693,7 +693,9 @@ enum ElasticsearchExceptionHandle { ShardStateAction.NoLongerPrimaryShardException::new, 142), SCRIPT_EXCEPTION(org.elasticsearch.script.ScriptException.class, org.elasticsearch.script.ScriptException::new, 143), NOT_MASTER_EXCEPTION(org.elasticsearch.cluster.NotMasterException.class, org.elasticsearch.cluster.NotMasterException::new, 144), - STATUS_EXCEPTION(org.elasticsearch.ElasticsearchStatusException.class, org.elasticsearch.ElasticsearchStatusException::new, 145); + STATUS_EXCEPTION(org.elasticsearch.ElasticsearchStatusException.class, org.elasticsearch.ElasticsearchStatusException::new, 145), + TASK_CANCELLED_EXCEPTION(org.elasticsearch.tasks.TaskCancelledException.class, + org.elasticsearch.tasks.TaskCancelledException::new, 146); final Class exceptionClass; final FunctionThatThrowsIOException constructor; diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index bf6d34c93fbd0..c973804a39dba 100644 --- a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -59,6 +59,7 @@ abstract class AbstractSearchAsyncAction protected final SearchRequest request; /** Used by subclasses to resolve node ids to DiscoveryNodes. **/ protected final Function nodeIdToDiscoveryNode; + protected final SearchTask task; protected final int expectedSuccessfulOps; private final int expectedTotalOps; protected final AtomicInteger successfulOps = new AtomicInteger(); @@ -74,12 +75,13 @@ protected AbstractSearchAsyncAction(Logger logger, SearchTransportService search Function nodeIdToDiscoveryNode, Map aliasFilter, Executor executor, SearchRequest request, ActionListener listener, GroupShardsIterator shardsIts, long startTime, - long clusterStateVersion) { + long clusterStateVersion, SearchTask task) { super(startTime); this.logger = logger; this.searchTransportService = searchTransportService; this.executor = executor; this.request = request; + this.task = task; this.listener = listener; this.nodeIdToDiscoveryNode = nodeIdToDiscoveryNode; this.clusterStateVersion = clusterStateVersion; diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java index 24b1033ca5f88..54117495cbabe 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java @@ -49,9 +49,9 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction nodeIdToDiscoveryNode, Map aliasFilter, SearchPhaseController searchPhaseController, Executor executor, SearchRequest request, ActionListener listener, - GroupShardsIterator shardsIts, long startTime, long clusterStateVersion) { + GroupShardsIterator shardsIts, long startTime, long clusterStateVersion, SearchTask task) { super(logger, searchTransportService, nodeIdToDiscoveryNode, aliasFilter, executor, - request, listener, shardsIts, startTime, clusterStateVersion); + request, listener, shardsIts, startTime, clusterStateVersion, task); this.searchPhaseController = searchPhaseController; queryFetchResults = new AtomicArray<>(firstResults.length()); } @@ -64,7 +64,7 @@ protected String firstPhaseName() { @Override protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener listener) { - searchTransportService.sendExecuteDfs(node, request, listener); + searchTransportService.sendExecuteDfs(node, request, task, listener); } @Override @@ -82,7 +82,7 @@ protected void moveToSecondPhase() { void executeSecondPhase(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, final DiscoveryNode node, final QuerySearchRequest querySearchRequest) { - searchTransportService.sendExecuteFetch(node, querySearchRequest, new ActionListener() { + searchTransportService.sendExecuteFetch(node, querySearchRequest, task, new ActionListener() { @Override public void onResponse(QueryFetchSearchResult result) { result.shardTarget(dfsResult.shardTarget()); diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java index 1af6d4da4d145..3f8b20bc1fa79 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -57,9 +57,10 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction nodeIdToDiscoveryNode, Map aliasFilter, SearchPhaseController searchPhaseController, Executor executor, SearchRequest request, ActionListener listener, - GroupShardsIterator shardsIts, long startTime, long clusterStateVersion) { + GroupShardsIterator shardsIts, long startTime, long clusterStateVersion, + SearchTask task) { super(logger, searchTransportService, nodeIdToDiscoveryNode, aliasFilter, executor, - request, listener, shardsIts, startTime, clusterStateVersion); + request, listener, shardsIts, startTime, clusterStateVersion, task); this.searchPhaseController = searchPhaseController; queryResults = new AtomicArray<>(firstResults.length()); fetchResults = new AtomicArray<>(firstResults.length()); @@ -74,7 +75,7 @@ protected String firstPhaseName() { @Override protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener listener) { - searchTransportService.sendExecuteDfs(node, request, listener); + searchTransportService.sendExecuteDfs(node, request, task, listener); } @Override @@ -91,7 +92,7 @@ protected void moveToSecondPhase() { void executeQuery(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, final QuerySearchRequest querySearchRequest, final DiscoveryNode node) { - searchTransportService.sendExecuteQuery(node, querySearchRequest, new ActionListener() { + searchTransportService.sendExecuteQuery(node, querySearchRequest, task, new ActionListener() { @Override public void onResponse(QuerySearchResult result) { result.shardTarget(dfsResult.shardTarget()); @@ -162,7 +163,7 @@ void innerExecuteFetchPhase() throws Exception { void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) { - searchTransportService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener() { + searchTransportService.sendExecuteFetch(node, fetchSearchRequest, task, new ActionListener() { @Override public void onResponse(FetchSearchResult result) { result.shardTarget(shardTarget); diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java index 4e8c3847ffc28..25e7e14bb8790 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java @@ -43,9 +43,10 @@ class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction aliasFilter, SearchPhaseController searchPhaseController, Executor executor, SearchRequest request, ActionListener listener, - GroupShardsIterator shardsIts, long startTime, long clusterStateVersion) { + GroupShardsIterator shardsIts, long startTime, long clusterStateVersion, + SearchTask task) { super(logger, searchTransportService, nodeIdToDiscoveryNode, aliasFilter, executor, - request, listener, shardsIts, startTime, clusterStateVersion); + request, listener, shardsIts, startTime, clusterStateVersion, task); this.searchPhaseController = searchPhaseController; } @@ -58,7 +59,7 @@ protected String firstPhaseName() { @Override protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener listener) { - searchTransportService.sendExecuteFetch(node, request, listener); + searchTransportService.sendExecuteFetch(node, request, task, listener); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 0bcae7502ee1c..23b744e5de194 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -54,9 +54,10 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction aliasFilter, SearchPhaseController searchPhaseController, Executor executor, SearchRequest request, ActionListener listener, - GroupShardsIterator shardsIts, long startTime, long clusterStateVersion) { + GroupShardsIterator shardsIts, long startTime, long clusterStateVersion, + SearchTask task) { super(logger, searchTransportService, nodeIdToDiscoveryNode, aliasFilter, executor, request, listener, - shardsIts, startTime, clusterStateVersion); + shardsIts, startTime, clusterStateVersion, task); this.searchPhaseController = searchPhaseController; fetchResults = new AtomicArray<>(firstResults.length()); docIdsToLoad = new AtomicArray<>(firstResults.length()); @@ -70,7 +71,7 @@ protected String firstPhaseName() { @Override protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener listener) { - searchTransportService.sendExecuteQuery(node, request, listener); + searchTransportService.sendExecuteQuery(node, request, task, listener); } @Override @@ -97,7 +98,7 @@ protected void moveToSecondPhase() throws Exception { void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) { - searchTransportService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener() { + searchTransportService.sendExecuteFetch(node, fetchSearchRequest, task, new ActionListener() { @Override public void onResponse(FetchSearchResult result) { result.shardTarget(shardTarget); diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java index a1b1a02a97e5b..de27805b13943 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -31,6 +31,8 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.Scroll; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import java.io.IOException; import java.util.Arrays; @@ -275,6 +277,11 @@ public boolean isSuggestOnly() { return source != null && source.isSuggestOnly(); } + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId) { + return new SearchTask(id, type, action, getDescription(), parentTaskId); + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java index 2bdf7dc30f94a..bf53fc719c6c3 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java @@ -44,6 +44,7 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction { private final SearchPhaseController searchPhaseController; private final SearchTransportService searchTransportService; private final SearchScrollRequest request; + private final SearchTask task; private final ActionListener listener; private final ParsedScrollId scrollId; private final DiscoveryNodes nodes; @@ -52,13 +53,14 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction { private final AtomicInteger successfulOps; private final AtomicInteger counter; - SearchScrollQueryAndFetchAsyncAction(Logger logger, ClusterService clusterService, - SearchTransportService searchTransportService, SearchPhaseController searchPhaseController, - SearchScrollRequest request, ParsedScrollId scrollId, ActionListener listener) { + SearchScrollQueryAndFetchAsyncAction(Logger logger, ClusterService clusterService, SearchTransportService searchTransportService, + SearchPhaseController searchPhaseController, SearchScrollRequest request, SearchTask task, + ParsedScrollId scrollId, ActionListener listener) { this.logger = logger; this.searchPhaseController = searchPhaseController; this.searchTransportService = searchTransportService; this.request = request; + this.task = task; this.listener = listener; this.scrollId = scrollId; this.nodes = clusterService.state().nodes(); @@ -128,7 +130,7 @@ public void start() { void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) { InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request); - searchTransportService.sendExecuteFetch(node, internalRequest, new ActionListener() { + searchTransportService.sendExecuteFetch(node, internalRequest, task, new ActionListener() { @Override public void onResponse(ScrollQueryFetchSearchResult result) { queryFetchResults.set(shardIndex, result.result()); diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java index 4024d3b5f394d..851e3343bc2ed 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java @@ -44,6 +44,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { private final Logger logger; + private final SearchTask task; private final SearchTransportService searchTransportService; private final SearchPhaseController searchPhaseController; private final SearchScrollRequest request; @@ -56,13 +57,14 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { private volatile ScoreDoc[] sortedShardDocs; private final AtomicInteger successfulOps; - SearchScrollQueryThenFetchAsyncAction(Logger logger, ClusterService clusterService, - SearchTransportService searchTransportService, SearchPhaseController searchPhaseController, - SearchScrollRequest request, ParsedScrollId scrollId, ActionListener listener) { + SearchScrollQueryThenFetchAsyncAction(Logger logger, ClusterService clusterService, SearchTransportService searchTransportService, + SearchPhaseController searchPhaseController, SearchScrollRequest request, SearchTask task, + ParsedScrollId scrollId, ActionListener listener) { this.logger = logger; this.searchTransportService = searchTransportService; this.searchPhaseController = searchPhaseController; this.request = request; + this.task = task; this.listener = listener; this.scrollId = scrollId; this.nodes = clusterService.state().nodes(); @@ -124,7 +126,7 @@ public void start() { private void executeQueryPhase(final int shardIndex, final AtomicInteger counter, DiscoveryNode node, final long searchId) { InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request); - searchTransportService.sendExecuteQuery(node, internalRequest, new ActionListener() { + searchTransportService.sendExecuteQuery(node, internalRequest, task, new ActionListener() { @Override public void onResponse(ScrollQuerySearchResult result) { queryResults.set(shardIndex, result.queryResult()); @@ -184,7 +186,7 @@ private void executeFetchPhase() throws Exception { ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index]; ShardFetchRequest shardFetchRequest = new ShardFetchRequest(querySearchResult.id(), docIds, lastEmittedDoc); DiscoveryNode node = nodes.get(querySearchResult.shardTarget().nodeId()); - searchTransportService.sendExecuteFetchScroll(node, shardFetchRequest, new ActionListener() { + searchTransportService.sendExecuteFetchScroll(node, shardFetchRequest, task, new ActionListener() { @Override public void onResponse(FetchSearchResult result) { result.shardTarget(querySearchResult.shardTarget()); diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java index eff033a760021..8a171e24a1ef1 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java @@ -25,6 +25,8 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.Scroll; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import java.io.IOException; import java.util.Objects; @@ -107,6 +109,11 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(scroll); } + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId) { + return new SearchTask(id, type, action, getDescription(), parentTaskId); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTask.java b/core/src/main/java/org/elasticsearch/action/search/SearchTask.java new file mode 100644 index 0000000000000..24f94a4331909 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTask.java @@ -0,0 +1,34 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.search; + +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.TaskId; + +/** + * Task storing information about a currently running search request. + */ +public class SearchTask extends CancellableTask { + + public SearchTask(long id, String type, String action, String description, TaskId parentTaskId) { + super(id, type, action, description, parentTaskId); + } + +} diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 0451a8920eb9d..9b5d180ce10d3 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -23,6 +23,8 @@ import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchTask; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractComponent; @@ -42,7 +44,10 @@ import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.search.query.ScrollQuerySearchResult; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TaskAwareTransportRequestHandler; +import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; @@ -77,22 +82,22 @@ public class SearchTransportService extends AbstractComponent { public void sendFreeContext(DiscoveryNode node, final long contextId, SearchRequest request) { transportService.sendRequest(node, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(request, contextId), - new ActionListenerResponseHandler<>(new ActionListener() { - @Override - public void onResponse(SearchFreeContextResponse response) { - // no need to respond if it was freed or not - } + new ActionListenerResponseHandler<>(new ActionListener() { + @Override + public void onResponse(SearchFreeContextResponse response) { + // no need to respond if it was freed or not + } - @Override - public void onFailure(Exception e) { + @Override + public void onFailure(Exception e) { - } - }, SearchFreeContextResponse::new)); + } + }, SearchFreeContextResponse::new)); } public void sendFreeContext(DiscoveryNode node, long contextId, final ActionListener listener) { transportService.sendRequest(node, FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextRequest(contextId), - new ActionListenerResponseHandler<>(listener, SearchFreeContextResponse::new)); + new ActionListenerResponseHandler<>(listener, SearchFreeContextResponse::new)); } public void sendClearAllScrollContexts(DiscoveryNode node, final ActionListener listener) { @@ -100,59 +105,62 @@ public void sendClearAllScrollContexts(DiscoveryNode node, final ActionListener< new ActionListenerResponseHandler<>(listener, () -> TransportResponse.Empty.INSTANCE)); } - public void sendExecuteDfs(DiscoveryNode node, final ShardSearchTransportRequest request, + public void sendExecuteDfs(DiscoveryNode node, final ShardSearchTransportRequest request, SearchTask task, final ActionListener listener) { - transportService.sendRequest(node, DFS_ACTION_NAME, request, new ActionListenerResponseHandler<>(listener, DfsSearchResult::new)); + transportService.sendChildRequest(node, DFS_ACTION_NAME, request, task, + new ActionListenerResponseHandler<>(listener, DfsSearchResult::new)); } - public void sendExecuteQuery(DiscoveryNode node, final ShardSearchTransportRequest request, + public void sendExecuteQuery(DiscoveryNode node, final ShardSearchTransportRequest request, SearchTask task, final ActionListener listener) { - transportService.sendRequest(node, QUERY_ACTION_NAME, request, - new ActionListenerResponseHandler<>(listener, QuerySearchResult::new)); + transportService.sendChildRequest(node, QUERY_ACTION_NAME, request, task, + new ActionListenerResponseHandler<>(listener, QuerySearchResult::new)); } - public void sendExecuteQuery(DiscoveryNode node, final QuerySearchRequest request, final ActionListener listener) { - transportService.sendRequest(node, QUERY_ID_ACTION_NAME, request, + public void sendExecuteQuery(DiscoveryNode node, final QuerySearchRequest request, SearchTask task, + final ActionListener listener) { + transportService.sendChildRequest(node, QUERY_ID_ACTION_NAME, request, task, new ActionListenerResponseHandler<>(listener, QuerySearchResult::new)); } - public void sendExecuteQuery(DiscoveryNode node, final InternalScrollSearchRequest request, + public void sendExecuteQuery(DiscoveryNode node, final InternalScrollSearchRequest request, SearchTask task, final ActionListener listener) { - transportService.sendRequest(node, QUERY_SCROLL_ACTION_NAME, request, - new ActionListenerResponseHandler<>(listener, ScrollQuerySearchResult::new)); + transportService.sendChildRequest(node, QUERY_SCROLL_ACTION_NAME, request, task, + new ActionListenerResponseHandler<>(listener, ScrollQuerySearchResult::new)); } - public void sendExecuteFetch(DiscoveryNode node, final ShardSearchTransportRequest request, + public void sendExecuteFetch(DiscoveryNode node, final ShardSearchTransportRequest request, SearchTask task, final ActionListener listener) { - transportService.sendRequest(node, QUERY_FETCH_ACTION_NAME, request, - new ActionListenerResponseHandler<>(listener, QueryFetchSearchResult::new)); + transportService.sendChildRequest(node, QUERY_FETCH_ACTION_NAME, request, task, + new ActionListenerResponseHandler<>(listener, QueryFetchSearchResult::new)); } - public void sendExecuteFetch(DiscoveryNode node, final QuerySearchRequest request, + public void sendExecuteFetch(DiscoveryNode node, final QuerySearchRequest request, SearchTask task, final ActionListener listener) { - transportService.sendRequest(node, QUERY_QUERY_FETCH_ACTION_NAME, request, - new ActionListenerResponseHandler<>(listener, QueryFetchSearchResult::new)); + transportService.sendChildRequest(node, QUERY_QUERY_FETCH_ACTION_NAME, request, task, + new ActionListenerResponseHandler<>(listener, QueryFetchSearchResult::new)); } - public void sendExecuteFetch(DiscoveryNode node, final InternalScrollSearchRequest request, + public void sendExecuteFetch(DiscoveryNode node, final InternalScrollSearchRequest request, SearchTask task, final ActionListener listener) { - transportService.sendRequest(node, QUERY_FETCH_SCROLL_ACTION_NAME, request, - new ActionListenerResponseHandler<>(listener, ScrollQueryFetchSearchResult::new)); + transportService.sendChildRequest(node, QUERY_FETCH_SCROLL_ACTION_NAME, request, task, + new ActionListenerResponseHandler<>(listener, ScrollQueryFetchSearchResult::new)); } - public void sendExecuteFetch(DiscoveryNode node, final ShardFetchSearchRequest request, + public void sendExecuteFetch(DiscoveryNode node, final ShardFetchSearchRequest request, SearchTask task, final ActionListener listener) { - sendExecuteFetch(node, FETCH_ID_ACTION_NAME, request, listener); + sendExecuteFetch(node, FETCH_ID_ACTION_NAME, request, task, listener); } - public void sendExecuteFetchScroll(DiscoveryNode node, final ShardFetchRequest request, + public void sendExecuteFetchScroll(DiscoveryNode node, final ShardFetchRequest request, SearchTask task, final ActionListener listener) { - sendExecuteFetch(node, FETCH_ID_SCROLL_ACTION_NAME, request, listener); + sendExecuteFetch(node, FETCH_ID_SCROLL_ACTION_NAME, request, task, listener); } - private void sendExecuteFetch(DiscoveryNode node, String action, final ShardFetchRequest request, + private void sendExecuteFetch(DiscoveryNode node, String action, final ShardFetchRequest request, SearchTask task, final ActionListener listener) { - transportService.sendRequest(node, action, request, new ActionListenerResponseHandler<>(listener, FetchSearchResult::new)); + transportService.sendChildRequest(node, action, request, task, + new ActionListenerResponseHandler<>(listener, FetchSearchResult::new)); } static class ScrollFreeContextRequest extends TransportRequest { @@ -252,64 +260,103 @@ public void writeTo(StreamOutput out) throws IOException { public static void registerRequestHandler(TransportService transportService, SearchService searchService) { transportService.registerRequestHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, ScrollFreeContextRequest::new, ThreadPool.Names.SAME, - ((request, channel) -> { - boolean freed = searchService.freeContext(request.id()); - channel.sendResponse(new SearchFreeContextResponse(freed)); - })); + new TaskAwareTransportRequestHandler() { + @Override + public void messageReceived(ScrollFreeContextRequest request, TransportChannel channel, Task task) throws Exception { + boolean freed = searchService.freeContext(request.id()); + channel.sendResponse(new SearchFreeContextResponse(freed)); + } + }); transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, SearchFreeContextRequest::new, ThreadPool.Names.SAME, - (request, channel) -> { - boolean freed = searchService.freeContext(request.id()); - channel.sendResponse(new SearchFreeContextResponse(freed)); + new TaskAwareTransportRequestHandler() { + @Override + public void messageReceived(SearchFreeContextRequest request, TransportChannel channel, Task task) throws Exception { + boolean freed = searchService.freeContext(request.id()); + channel.sendResponse(new SearchFreeContextResponse(freed)); + } }); transportService.registerRequestHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, () -> TransportRequest.Empty.INSTANCE, - ThreadPool.Names.SAME, (request, channel) -> { - searchService.freeAllScrollContexts(); - channel.sendResponse(TransportResponse.Empty.INSTANCE); + ThreadPool.Names.SAME, + new TaskAwareTransportRequestHandler() { + @Override + public void messageReceived(TransportRequest.Empty request, TransportChannel channel, Task task) throws Exception { + searchService.freeAllScrollContexts(); + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } }); transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, - (request, channel) -> { - DfsSearchResult result = searchService.executeDfsPhase(request); - channel.sendResponse(result); + new TaskAwareTransportRequestHandler() { + @Override + public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception { + DfsSearchResult result = searchService.executeDfsPhase(request, (SearchTask)task); + channel.sendResponse(result); + + } }); transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, - (request, channel) -> { - QuerySearchResultProvider result = searchService.executeQueryPhase(request); - channel.sendResponse(result); + new TaskAwareTransportRequestHandler() { + @Override + public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception { + QuerySearchResultProvider result = searchService.executeQueryPhase(request, (SearchTask)task); + channel.sendResponse(result); + } }); transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH, - (request, channel) -> { - QuerySearchResult result = searchService.executeQueryPhase(request); - channel.sendResponse(result); + new TaskAwareTransportRequestHandler() { + @Override + public void messageReceived(QuerySearchRequest request, TransportChannel channel, Task task) throws Exception { + QuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task); + channel.sendResponse(result); + } }); transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH, - (request, channel) -> { - ScrollQuerySearchResult result = searchService.executeQueryPhase(request); - channel.sendResponse(result); + new TaskAwareTransportRequestHandler() { + @Override + public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel, Task task) throws Exception { + ScrollQuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task); + channel.sendResponse(result); + } }); transportService.registerRequestHandler(QUERY_FETCH_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, - (request, channel) -> { - QueryFetchSearchResult result = searchService.executeFetchPhase(request); - channel.sendResponse(result); + new TaskAwareTransportRequestHandler() { + @Override + public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception { + QueryFetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task); + channel.sendResponse(result); + } }); transportService.registerRequestHandler(QUERY_QUERY_FETCH_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH, - (request, channel) -> { - QueryFetchSearchResult result = searchService.executeFetchPhase(request); - channel.sendResponse(result); + new TaskAwareTransportRequestHandler() { + @Override + public void messageReceived(QuerySearchRequest request, TransportChannel channel, Task task) throws Exception { + QueryFetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task); + channel.sendResponse(result); + } }); transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH, - (request, channel) -> { - ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request); - channel.sendResponse(result); + new TaskAwareTransportRequestHandler() { + @Override + public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel, Task task) throws Exception { + ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task); + channel.sendResponse(result); + } }); transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ShardFetchRequest::new, ThreadPool.Names.SEARCH, - (request, channel) -> { - FetchSearchResult result = searchService.executeFetchPhase(request); - channel.sendResponse(result); + new TaskAwareTransportRequestHandler() { + @Override + public void messageReceived(ShardFetchRequest request, TransportChannel channel, Task task) throws Exception { + FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task); + channel.sendResponse(result); + } }); transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ShardFetchSearchRequest::new, ThreadPool.Names.SEARCH, - (request, channel) -> { - FetchSearchResult result = searchService.executeFetchPhase(request); - channel.sendResponse(result); + new TaskAwareTransportRequestHandler() { + @Override + public void messageReceived(ShardFetchSearchRequest request, TransportChannel channel, Task task) throws Exception { + FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task); + channel.sendResponse(result); + } }); + } } diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index d8caa72f6122c..022519d3d9dd4 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -38,6 +38,7 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -88,7 +89,7 @@ private Map buildPerIndexAliasFilter(SearchRequest request, } @Override - protected void doExecute(SearchRequest searchRequest, ActionListener listener) { + protected void doExecute(Task task, SearchRequest searchRequest, ActionListener listener) { // pure paranoia if time goes backwards we are at least positive final long startTimeInMillis = Math.max(0, System.currentTimeMillis()); ClusterState clusterState = clusterService.state(); @@ -129,12 +130,17 @@ protected void doExecute(SearchRequest searchRequest, ActionListener aliasFilter, + @Override + protected final void doExecute(SearchRequest searchRequest, ActionListener listener) { + throw new UnsupportedOperationException("the task parameter is required"); + } + + private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchRequest searchRequest, GroupShardsIterator shardIterators, + long startTime, ClusterState state, Map aliasFilter, ActionListener listener) { final Function nodesLookup = state.nodes()::get; final long clusterStateVersion = state.version(); @@ -144,22 +150,22 @@ private AbstractSearchAsyncAction searchAsyncAction(SearchRequest searchRequest, case DFS_QUERY_THEN_FETCH: searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, nodesLookup, aliasFilter, searchPhaseController, executor, searchRequest, listener, shardIterators, startTime, - clusterStateVersion); + clusterStateVersion, task); break; case QUERY_THEN_FETCH: searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, nodesLookup, aliasFilter, searchPhaseController, executor, searchRequest, listener, shardIterators, startTime, - clusterStateVersion); + clusterStateVersion, task); break; case DFS_QUERY_AND_FETCH: searchAsyncAction = new SearchDfsQueryAndFetchAsyncAction(logger, searchTransportService, nodesLookup, aliasFilter, searchPhaseController, executor, searchRequest, listener, shardIterators, startTime, - clusterStateVersion); + clusterStateVersion, task); break; case QUERY_AND_FETCH: searchAsyncAction = new SearchQueryAndFetchAsyncAction(logger, searchTransportService, nodesLookup, aliasFilter, searchPhaseController, executor, searchRequest, listener, shardIterators, startTime, - clusterStateVersion); + clusterStateVersion, task); break; default: throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]"); diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchScrollAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchScrollAction.java index 71f3b6239d98d..219c856617a49 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchScrollAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchScrollAction.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.script.ScriptService; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -52,19 +53,24 @@ public TransportSearchScrollAction(Settings settings, BigArrays bigArrays, Threa this.searchPhaseController = new SearchPhaseController(settings, bigArrays, scriptService, clusterService); } + + @Override + protected final void doExecute(SearchScrollRequest request, ActionListener listener) { + throw new UnsupportedOperationException("the task parameter is required"); + } @Override - protected void doExecute(SearchScrollRequest request, ActionListener listener) { + protected void doExecute(Task task, SearchScrollRequest request, ActionListener listener) { try { ParsedScrollId scrollId = parseScrollId(request.scrollId()); AbstractAsyncAction action; switch (scrollId.getType()) { case QUERY_THEN_FETCH_TYPE: action = new SearchScrollQueryThenFetchAsyncAction(logger, clusterService, searchTransportService, - searchPhaseController, request, scrollId, listener); + searchPhaseController, request, (SearchTask)task, scrollId, listener); break; case QUERY_AND_FETCH_TYPE: action = new SearchScrollQueryAndFetchAsyncAction(logger, clusterService, searchTransportService, - searchPhaseController, request, scrollId, listener); + searchPhaseController, request, (SearchTask)task, scrollId, listener); break; default: throw new IllegalArgumentException("Scroll id type [" + scrollId.getType() + "] unrecognized"); diff --git a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 1c6f0abcbcf3b..ac4f3931b7879 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -345,6 +345,7 @@ public void apply(Settings value, Settings current, Settings previous) { UnicastZenPing.DISCOVERY_ZEN_PING_UNICAST_CONCURRENT_CONNECTS_SETTING, SearchService.DEFAULT_KEEPALIVE_SETTING, SearchService.KEEPALIVE_INTERVAL_SETTING, + SearchService.LOW_LEVEL_CANCELLATION_SETTING, Node.WRITE_PORTS_FIELD_SETTING, Node.NODE_NAME_SETTING, Node.NODE_DATA_SETTING, diff --git a/core/src/main/java/org/elasticsearch/search/DefaultSearchContext.java b/core/src/main/java/org/elasticsearch/search/DefaultSearchContext.java index 1e36522794a99..8c4981b5541e2 100644 --- a/core/src/main/java/org/elasticsearch/search/DefaultSearchContext.java +++ b/core/src/main/java/org/elasticsearch/search/DefaultSearchContext.java @@ -27,6 +27,7 @@ import org.apache.lucene.search.Query; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.Counter; +import org.elasticsearch.action.search.SearchTask; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseFieldMatcher; @@ -113,8 +114,11 @@ final class DefaultSearchContext extends SearchContext { private Float minimumScore; private boolean trackScores = false; // when sorting, track scores as well... private FieldDoc searchAfter; + private boolean lowLevelCancellation; // filter for sliced scroll private SliceBuilder sliceBuilder; + private SearchTask task; + /** * The original query as sent by the user without the types and aliases @@ -571,6 +575,15 @@ public SearchContext searchAfter(FieldDoc searchAfter) { return this; } + @Override + public boolean lowLevelCancellation() { + return lowLevelCancellation; + } + + public void lowLevelCancellation(boolean lowLevelCancellation) { + this.lowLevelCancellation = lowLevelCancellation; + } + @Override public FieldDoc searchAfter() { return searchAfter; @@ -792,4 +805,19 @@ public Profilers getProfilers() { public void setProfilers(Profilers profilers) { this.profilers = profilers; } + + @Override + public void setTask(SearchTask task) { + this.task = task; + } + + @Override + public SearchTask getTask() { + return task; + } + + @Override + public boolean isCancelled() { + return task.isCancelled(); + } } diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index c12d0ff5263f3..d6ef1e0c54d4b 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -26,6 +26,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.action.search.SearchTask; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; @@ -84,6 +85,7 @@ import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.search.suggest.completion.CompletionSuggestion; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Cancellable; import org.elasticsearch.threadpool.ThreadPool.Names; @@ -107,6 +109,13 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv Setting.positiveTimeSetting("search.default_keep_alive", timeValueMinutes(5), Property.NodeScope); public static final Setting KEEPALIVE_INTERVAL_SETTING = Setting.positiveTimeSetting("search.keep_alive_interval", timeValueMinutes(1), Property.NodeScope); + /** + * Enables low-level, frequent search cancellation checks. Enabling low-level checks will make long running searches to react + * to the cancellation request faster. However, since it will produce more cancellation checks it might slow the search performance + * down. + */ + public static final Setting LOW_LEVEL_CANCELLATION_SETTING = + Setting.boolSetting("search.low_level_cancellation", false, Property.Dynamic, Property.NodeScope); public static final TimeValue NO_TIMEOUT = timeValueMillis(-1); public static final Setting DEFAULT_SEARCH_TIMEOUT_SETTING = @@ -133,6 +142,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private volatile TimeValue defaultSearchTimeout; + private volatile boolean lowLevelCancellation; + private final Cancellable keepAliveReaper; private final AtomicLong idGenerator = new AtomicLong(); @@ -160,12 +171,19 @@ public SearchService(ClusterService clusterService, IndicesService indicesServic defaultSearchTimeout = DEFAULT_SEARCH_TIMEOUT_SETTING.get(settings); clusterService.getClusterSettings().addSettingsUpdateConsumer(DEFAULT_SEARCH_TIMEOUT_SETTING, this::setDefaultSearchTimeout); + + lowLevelCancellation = LOW_LEVEL_CANCELLATION_SETTING.get(settings); + clusterService.getClusterSettings().addSettingsUpdateConsumer(LOW_LEVEL_CANCELLATION_SETTING, this::setLowLevelCancellation); } private void setDefaultSearchTimeout(TimeValue defaultSearchTimeout) { this.defaultSearchTimeout = defaultSearchTimeout; } + private void setLowLevelCancellation(Boolean lowLevelCancellation) { + this.lowLevelCancellation = lowLevelCancellation; + } + @Override public void afterIndexClosed(Index index, Settings indexSettings) { // once an index is closed we can just clean up all the pending search context information @@ -212,10 +230,11 @@ protected void doClose() { keepAliveReaper.cancel(); } - public DfsSearchResult executeDfsPhase(ShardSearchRequest request) throws IOException { + public DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchTask task) throws IOException { final SearchContext context = createAndPutContext(request); context.incRef(); try { + context.setTask(task); contextProcessing(context); dfsPhase.execute(context); contextProcessedSuccessfully(context); @@ -242,11 +261,12 @@ private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final Sea } } - public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) throws IOException { + public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request, SearchTask task) throws IOException { final SearchContext context = createAndPutContext(request); final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); context.incRef(); try { + context.setTask(task); operationListener.onPreQueryPhase(context); long time = System.nanoTime(); contextProcessing(context); @@ -276,11 +296,12 @@ public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) t } } - public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest request) { + public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest request, SearchTask task) { final SearchContext context = findContext(request.id()); SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); context.incRef(); try { + context.setTask(task); operationListener.onPreQueryPhase(context); long time = System.nanoTime(); contextProcessing(context); @@ -299,8 +320,9 @@ public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest req } } - public QuerySearchResult executeQueryPhase(QuerySearchRequest request) { + public QuerySearchResult executeQueryPhase(QuerySearchRequest request, SearchTask task) { final SearchContext context = findContext(request.id()); + context.setTask(task); IndexShard indexShard = context.indexShard(); SearchOperationListener operationListener = indexShard.getSearchOperationListener(); context.incRef(); @@ -339,11 +361,12 @@ private boolean fetchPhaseShouldFreeContext(SearchContext context) { } } - public QueryFetchSearchResult executeFetchPhase(ShardSearchRequest request) throws IOException { + public QueryFetchSearchResult executeFetchPhase(ShardSearchRequest request, SearchTask task) throws IOException { final SearchContext context = createAndPutContext(request); context.incRef(); try { contextProcessing(context); + context.setTask(task); SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); operationListener.onPreQueryPhase(context); long time = System.nanoTime(); @@ -379,10 +402,11 @@ public QueryFetchSearchResult executeFetchPhase(ShardSearchRequest request) thro } } - public QueryFetchSearchResult executeFetchPhase(QuerySearchRequest request) { + public QueryFetchSearchResult executeFetchPhase(QuerySearchRequest request, SearchTask task) { final SearchContext context = findContext(request.id()); context.incRef(); try { + context.setTask(task); contextProcessing(context); context.searcher().setAggregatedDfs(request.dfs()); SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); @@ -420,10 +444,11 @@ public QueryFetchSearchResult executeFetchPhase(QuerySearchRequest request) { } } - public ScrollQueryFetchSearchResult executeFetchPhase(InternalScrollSearchRequest request) { + public ScrollQueryFetchSearchResult executeFetchPhase(InternalScrollSearchRequest request, SearchTask task) { final SearchContext context = findContext(request.id()); context.incRef(); try { + context.setTask(task); contextProcessing(context); SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); processScroll(request, context); @@ -462,11 +487,12 @@ public ScrollQueryFetchSearchResult executeFetchPhase(InternalScrollSearchReques } } - public FetchSearchResult executeFetchPhase(ShardFetchRequest request) { + public FetchSearchResult executeFetchPhase(ShardFetchRequest request, SearchTask task) { final SearchContext context = findContext(request.id()); final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); context.incRef(); try { + context.setTask(task); contextProcessing(context); if (request.lastEmittedDoc() != null) { context.scrollContext().lastEmittedDoc = request.lastEmittedDoc(); @@ -546,6 +572,7 @@ final SearchContext createContext(ShardSearchRequest request, @Nullable Engine.S keepAlive = request.scroll().keepAlive().millis(); } context.keepAlive(keepAlive); + context.lowLevelCancellation(lowLevelCancellation); } catch (Exception e) { context.close(); throw ExceptionsHelper.convertToRuntime(e); @@ -627,6 +654,7 @@ private void contextProcessedSuccessfully(SearchContext context) { private void cleanContext(SearchContext context) { try { context.clearReleasables(Lifetime.PHASE); + context.setTask(null); } finally { context.decRef(); } diff --git a/core/src/main/java/org/elasticsearch/search/dfs/DfsPhase.java b/core/src/main/java/org/elasticsearch/search/dfs/DfsPhase.java index 1359be24a1520..6be95a8bceb20 100644 --- a/core/src/main/java/org/elasticsearch/search/dfs/DfsPhase.java +++ b/core/src/main/java/org/elasticsearch/search/dfs/DfsPhase.java @@ -28,9 +28,11 @@ import org.apache.lucene.search.CollectionStatistics; import org.apache.lucene.search.TermStatistics; import org.elasticsearch.common.collect.HppcMaps; +import org.elasticsearch.search.SearchContextException; import org.elasticsearch.search.SearchPhase; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.rescore.RescoreSearchContext; +import org.elasticsearch.tasks.TaskCancelledException; import java.util.AbstractSet; import java.util.Collection; @@ -59,6 +61,9 @@ public void execute(SearchContext context) { TermStatistics[] termStatistics = new TermStatistics[terms.length]; IndexReaderContext indexReaderContext = context.searcher().getTopReaderContext(); for (int i = 0; i < terms.length; i++) { + if(context.isCancelled()) { + throw new TaskCancelledException("cancelled"); + } // LUCENE 4 UPGRADE: cache TermContext? TermContext termContext = TermContext.build(indexReaderContext, terms[i]); termStatistics[i] = context.searcher().termStatistics(terms[i], termContext); @@ -70,6 +75,9 @@ public void execute(SearchContext context) { if (!fieldStatistics.containsKey(term.field())) { final CollectionStatistics collectionStatistics = context.searcher().collectionStatistics(term.field()); fieldStatistics.put(term.field(), collectionStatistics); + if(context.isCancelled()) { + throw new TaskCancelledException("cancelled"); + } } } diff --git a/core/src/main/java/org/elasticsearch/search/dfs/DfsPhaseExecutionException.java b/core/src/main/java/org/elasticsearch/search/dfs/DfsPhaseExecutionException.java index f5bcd5980a48f..f493bb4d05231 100644 --- a/core/src/main/java/org/elasticsearch/search/dfs/DfsPhaseExecutionException.java +++ b/core/src/main/java/org/elasticsearch/search/dfs/DfsPhaseExecutionException.java @@ -31,7 +31,11 @@ public DfsPhaseExecutionException(SearchContext context, String msg, Throwable t super(context, "Dfs Failed [" + msg + "]", t); } + public DfsPhaseExecutionException(SearchContext context, String msg) { + super(context, "Dfs Failed [" + msg + "]"); + } + public DfsPhaseExecutionException(StreamInput in) throws IOException { super(in); } -} \ No newline at end of file +} diff --git a/core/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java b/core/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java index ef6fd979a699d..dcf55872e35c6 100644 --- a/core/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java +++ b/core/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java @@ -51,6 +51,7 @@ import org.elasticsearch.search.internal.InternalSearchHits; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.lookup.SourceLookup; +import org.elasticsearch.tasks.TaskCancelledException; import java.io.IOException; import java.util.ArrayList; @@ -136,6 +137,9 @@ public void execute(SearchContext context) { InternalSearchHit[] hits = new InternalSearchHit[context.docIdsToLoadSize()]; FetchSubPhase.HitContext hitContext = new FetchSubPhase.HitContext(); for (int index = 0; index < context.docIdsToLoadSize(); index++) { + if(context.isCancelled()) { + throw new TaskCancelledException("cancelled"); + } int docId = context.docIdsToLoad()[context.docIdsToLoadFrom() + index]; int readerIndex = ReaderUtil.subIndex(docId, context.searcher().getIndexReader().leaves()); LeafReaderContext subReaderContext = context.searcher().getIndexReader().leaves().get(readerIndex); diff --git a/core/src/main/java/org/elasticsearch/search/fetch/FetchPhaseExecutionException.java b/core/src/main/java/org/elasticsearch/search/fetch/FetchPhaseExecutionException.java index 20d88687b0f6e..e3fb542134eb5 100644 --- a/core/src/main/java/org/elasticsearch/search/fetch/FetchPhaseExecutionException.java +++ b/core/src/main/java/org/elasticsearch/search/fetch/FetchPhaseExecutionException.java @@ -31,6 +31,10 @@ public FetchPhaseExecutionException(SearchContext context, String msg, Throwable super(context, "Fetch Failed [" + msg + "]", t); } + public FetchPhaseExecutionException(SearchContext context, String msg) { + super(context, "Fetch Failed [" + msg + "]"); + } + public FetchPhaseExecutionException(StreamInput in) throws IOException { super(in); } diff --git a/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java b/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java index 4087eb9a01cf8..2148da57d397e 100644 --- a/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java +++ b/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchRequest.java @@ -22,9 +22,12 @@ import com.carrotsearch.hppc.IntArrayList; import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.ScoreDoc; +import org.elasticsearch.action.search.SearchTask; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.TransportRequest; import java.io.IOException; @@ -106,4 +109,9 @@ public void writeTo(StreamOutput out) throws IOException { Lucene.writeScoreDoc(out, lastEmittedDoc); } } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId) { + return new SearchTask(id, type, action, getDescription(), parentTaskId); + } } diff --git a/core/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java b/core/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java index 153a43fb8952d..ec5cbf145d3d3 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java +++ b/core/src/main/java/org/elasticsearch/search/internal/FilteredSearchContext.java @@ -23,6 +23,7 @@ import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.Query; import org.apache.lucene.util.Counter; +import org.elasticsearch.action.search.SearchTask; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.unit.TimeValue; @@ -293,6 +294,11 @@ public void terminateAfter(int terminateAfter) { in.terminateAfter(terminateAfter); } + @Override + public boolean lowLevelCancellation() { + return in.lowLevelCancellation(); + } + @Override public SearchContext minimumScore(float minimumScore) { return in.minimumScore(minimumScore); @@ -516,4 +522,19 @@ public Profilers getProfilers() { public QueryShardContext getQueryShardContext() { return in.getQueryShardContext(); } + + @Override + public void setTask(SearchTask task) { + in.setTask(task); + } + + @Override + public SearchTask getTask() { + return in.getTask(); + } + + @Override + public boolean isCancelled() { + return in.isCancelled(); + } } diff --git a/core/src/main/java/org/elasticsearch/search/internal/InternalScrollSearchRequest.java b/core/src/main/java/org/elasticsearch/search/internal/InternalScrollSearchRequest.java index 82a74de73d295..742269dba1b7b 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/InternalScrollSearchRequest.java +++ b/core/src/main/java/org/elasticsearch/search/internal/InternalScrollSearchRequest.java @@ -20,9 +20,12 @@ package org.elasticsearch.search.internal; import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.action.search.SearchTask; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.Scroll; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.TransportRequest; import java.io.IOException; @@ -67,4 +70,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(id); out.writeOptionalWriteable(scroll); } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId) { + return new SearchTask(id, type, action, getDescription(), parentTaskId); + } } diff --git a/core/src/main/java/org/elasticsearch/search/internal/SearchContext.java b/core/src/main/java/org/elasticsearch/search/internal/SearchContext.java index 17f9b602f7fc9..ce845a8416919 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/SearchContext.java +++ b/core/src/main/java/org/elasticsearch/search/internal/SearchContext.java @@ -23,6 +23,7 @@ import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.Query; import org.apache.lucene.util.Counter; +import org.elasticsearch.action.search.SearchTask; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseFieldMatcher; @@ -96,6 +97,12 @@ public ParseFieldMatcher parseFieldMatcher() { return parseFieldMatcher; } + public abstract void setTask(SearchTask task); + + public abstract SearchTask getTask(); + + public abstract boolean isCancelled(); + @Override public final void close() { if (closed.compareAndSet(false, true)) { // prevent double closing @@ -220,6 +227,14 @@ public InnerHitsContext innerHits() { public abstract void terminateAfter(int terminateAfter); + /** + * Indicates if the current index should perform frequent low level search cancellation check. + * + * Enabling low-level checks will make long running searches to react to the cancellation request faster. However, + * since it will produce more cancellation checks it might slow the search performance down. + */ + public abstract boolean lowLevelCancellation(); + public abstract SearchContext minimumScore(float minimumScore); public abstract Float minimumScore(); diff --git a/core/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java b/core/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java index 1a92257dc3462..c443f1ab68827 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java +++ b/core/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchTask; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.routing.ShardRouting; @@ -33,6 +34,8 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.Scroll; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.TransportRequest; import java.io.IOException; @@ -158,4 +161,9 @@ public boolean isProfile() { public void rewrite(QueryShardContext context) throws IOException { shardSearchLocalRequest.rewrite(context); } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId) { + return new SearchTask(id, type, action, getDescription(), parentTaskId); + } } diff --git a/core/src/main/java/org/elasticsearch/search/profile/query/CollectorResult.java b/core/src/main/java/org/elasticsearch/search/profile/query/CollectorResult.java index c517c8730e481..6b4d7c0e84297 100644 --- a/core/src/main/java/org/elasticsearch/search/profile/query/CollectorResult.java +++ b/core/src/main/java/org/elasticsearch/search/profile/query/CollectorResult.java @@ -45,6 +45,7 @@ public class CollectorResult implements ToXContent, Writeable { public static final String REASON_SEARCH_MIN_SCORE = "search_min_score"; public static final String REASON_SEARCH_MULTI = "search_multi"; public static final String REASON_SEARCH_TIMEOUT = "search_timeout"; + public static final String REASON_SEARCH_CANCELLED = "search_cancelled"; public static final String REASON_AGGREGATION = "aggregation"; public static final String REASON_AGGREGATION_GLOBAL = "aggregation_global"; diff --git a/core/src/main/java/org/elasticsearch/search/query/CancellableCollector.java b/core/src/main/java/org/elasticsearch/search/query/CancellableCollector.java new file mode 100644 index 0000000000000..1c702ac0e1f8d --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/query/CancellableCollector.java @@ -0,0 +1,78 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.query; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.FilterCollector; +import org.apache.lucene.search.FilterLeafCollector; +import org.apache.lucene.search.LeafCollector; +import org.elasticsearch.common.inject.Provider; +import org.elasticsearch.tasks.TaskCancelledException; + +import java.io.IOException; + +/** + * Collector that checks if the task it is executed under is cancelled. + */ +public class CancellableCollector extends FilterCollector { + private final Provider cancelled; + private final boolean leafLevel; + + /** + * Constructor + * @param cancelled supplier of the cancellation flag, the supplier will be called for each segment if lowLevelCancellation is set + * to false and for each collected record if lowLevelCancellation is set to true. In other words this class assumes + * that the supplier is fast, with performance on the order of a volatile read. + * @param lowLevelCancellation true if collector should check for cancellation for each collected record, false if check should be + * performed only once per segment + * @param in wrapped collector + */ + public CancellableCollector(Provider cancelled, boolean lowLevelCancellation, Collector in) { + super(in); + this.cancelled = cancelled; + this.leafLevel = lowLevelCancellation; + } + + @Override + public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException { + if (cancelled.get()) { + throw new TaskCancelledException("cancelled"); + } + if (leafLevel) { + return new CancellableLeafCollector(super.getLeafCollector(context)); + } else { + return super.getLeafCollector(context); + } + } + + private class CancellableLeafCollector extends FilterLeafCollector { + private CancellableLeafCollector(LeafCollector in) { + super(in); + } + + @Override + public void collect(int doc) throws IOException { + if (cancelled.get()) { + throw new TaskCancelledException("cancelled"); + } + super.collect(doc); + } + } +} diff --git a/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java index d1e90b2e9a55e..5579e55826e9e 100644 --- a/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/core/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -362,6 +362,15 @@ public TopDocs call() throws Exception { } } + if (collector != null) { + final Collector child = collector; + collector = new CancellableCollector(searchContext.getTask()::isCancelled, searchContext.lowLevelCancellation(), collector); + if (doProfile) { + collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_CANCELLED, + Collections.singletonList((InternalProfileCollector) child)); + } + } + try { if (collector != null) { if (doProfile) { diff --git a/core/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java b/core/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java index 3fd8ad2e84c29..012d96262fc2d 100644 --- a/core/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java +++ b/core/src/main/java/org/elasticsearch/search/query/QuerySearchRequest.java @@ -22,10 +22,13 @@ import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchTask; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.search.dfs.AggregatedDfs; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.TransportRequest; import java.io.IOException; @@ -82,4 +85,9 @@ public void writeTo(StreamOutput out) throws IOException { dfs.writeTo(out); OriginalIndices.writeOriginalIndices(originalIndices, out); } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId) { + return new SearchTask(id, type, action, getDescription(), parentTaskId); + } } diff --git a/core/src/main/java/org/elasticsearch/tasks/TaskCancelledException.java b/core/src/main/java/org/elasticsearch/tasks/TaskCancelledException.java new file mode 100644 index 0000000000000..4da9eb05a6870 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/tasks/TaskCancelledException.java @@ -0,0 +1,38 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.tasks; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.io.stream.StreamInput; + +import java.io.IOException; + +/** + * A generic exception that can be thrown by a task when it's cancelled by the task manager API + */ +public class TaskCancelledException extends ElasticsearchException { + + public TaskCancelledException(String msg) { + super(msg); + } + + public TaskCancelledException(StreamInput in) throws IOException{ + super(in); + } +} diff --git a/core/src/main/java/org/elasticsearch/tasks/TaskManager.java b/core/src/main/java/org/elasticsearch/tasks/TaskManager.java index 003a51c31756b..0f2165824feea 100644 --- a/core/src/main/java/org/elasticsearch/tasks/TaskManager.java +++ b/core/src/main/java/org/elasticsearch/tasks/TaskManager.java @@ -459,7 +459,7 @@ public synchronized void registerChildTaskNode(String nodeId) { if (cancellationReason == null) { nodesWithChildTasks.add(nodeId); } else { - throw new IllegalStateException("cannot register child task request, the task is already cancelled"); + throw new TaskCancelledException("cannot register child task request, the task is already cancelled"); } } } diff --git a/core/src/main/java/org/elasticsearch/transport/TaskAwareTransportRequestHandler.java b/core/src/main/java/org/elasticsearch/transport/TaskAwareTransportRequestHandler.java new file mode 100644 index 0000000000000..12899d86d430d --- /dev/null +++ b/core/src/main/java/org/elasticsearch/transport/TaskAwareTransportRequestHandler.java @@ -0,0 +1,30 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +/** + * Transport request handlers that is using task context + */ +public abstract class TaskAwareTransportRequestHandler implements TransportRequestHandler { + @Override + public final void messageReceived(T request, TransportChannel channel) throws Exception { + throw new UnsupportedOperationException("the task parameter is required"); + } +} diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index f0aa991208410..9ec3dc6b7fe18 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -46,6 +46,8 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; @@ -462,6 +464,27 @@ public final void sendRequest(final DiscoveryNode asyncSender.sendRequest(node, action, request, options, handler); } + public void sendChildRequest(final DiscoveryNode node, final String action, + final TransportRequest request, final Task parentTask, + final TransportResponseHandler handler) { + sendChildRequest(node, action, request, parentTask, TransportRequestOptions.EMPTY, handler); + } + + public void sendChildRequest(final DiscoveryNode node, final String action, + final TransportRequest request, final Task parentTask, + final TransportRequestOptions options, + final TransportResponseHandler handler) { + request.setParentTask(localNode.getId(), parentTask.getId()); + try { + taskManager.registerChildTask(parentTask, node.getId()); + sendRequest(node, action, request, options, handler); + } catch (TaskCancelledException ex) { + // The parent task is already cancelled - just fail the request + handler.handleException(new TransportException(ex)); + } + + } + private void sendRequestInternal(final DiscoveryNode node, final String action, final TransportRequest request, final TransportRequestOptions options, diff --git a/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java b/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java index 0eff8d7e60c89..ae2f246639dc0 100644 --- a/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java +++ b/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java @@ -795,6 +795,7 @@ public void testIds() { ids.put(143, org.elasticsearch.script.ScriptException.class); ids.put(144, org.elasticsearch.cluster.NotMasterException.class); ids.put(145, org.elasticsearch.ElasticsearchStatusException.class); + ids.put(146, org.elasticsearch.tasks.TaskCancelledException.class); Map, Integer> reverse = new HashMap<>(); for (Map.Entry> entry : ids.entrySet()) { diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java index 9027b3d372e37..be2a83af42cf4 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java @@ -35,6 +35,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.threadpool.ThreadPool; @@ -168,7 +169,7 @@ protected NodeResponse nodeOperation(CancellableNodeRequest request, Task task) try { awaitBusy(() -> { if (((CancellableTask) task).isCancelled()) { - throw new RuntimeException("Cancelled"); + throw new TaskCancelledException("Cancelled"); } return false; }); diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index 1aafa1d343b55..5ab35085e401b 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -85,7 +85,7 @@ public void sendFreeContext(DiscoveryNode node, long contextId, SearchRequest re Map lookup = new HashMap<>(); lookup.put(primaryNode.getId(), primaryNode); AbstractSearchAsyncAction asyncAction = new AbstractSearchAsyncAction(logger, transportService, lookup::get, - Collections.emptyMap(), null, request, responseListener, shardsIter, 0, 0) { + Collections.emptyMap(), null, request, responseListener, shardsIter, 0, 0, null) { TestSearchResponse response = new TestSearchResponse(); @Override diff --git a/core/src/test/java/org/elasticsearch/search/SearchCancellationIT.java b/core/src/test/java/org/elasticsearch/search/SearchCancellationIT.java new file mode 100644 index 0000000000000..181301a139c10 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/search/SearchCancellationIT.java @@ -0,0 +1,297 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search; + +import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollAction; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.plugins.ScriptPlugin; +import org.elasticsearch.script.AbstractSearchScript; +import org.elasticsearch.script.ExecutableScript; +import org.elasticsearch.script.NativeScriptFactory; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptService.ScriptType; +import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.test.ESIntegTestCase; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.elasticsearch.index.query.QueryBuilders.scriptQuery; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE) +public class SearchCancellationIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Collections.singleton(ScriptedBlockPlugin.class); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(SearchService.LOW_LEVEL_CANCELLATION_SETTING.getKey(), randomBoolean()).build(); + } + + private void indexTestData() { + for (int i = 0; i < 10; i++) { + // Make sure we have a few segments + BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for(int j=0; j<10; j++) { + bulkRequestBuilder.add(client().prepareIndex("test", "type", Integer.toString(i*10 + j)).setSource("field", "value")); + } + assertNoFailures(bulkRequestBuilder.get()); + } + } + + private List initBlockFactory() { + List plugins = new ArrayList<>(); + for (PluginsService pluginsService : internalCluster().getDataNodeInstances(PluginsService.class)) { + plugins.addAll(pluginsService.filterPlugins(ScriptedBlockPlugin.class)); + } + for (ScriptedBlockPlugin plugin : plugins) { + plugin.scriptedBlockFactory.reset(); + plugin.scriptedBlockFactory.enableBlock(); + } + return plugins; + } + + private void awaitForBlock(List plugins) throws Exception { + int numberOfShards = getNumShards("test").numPrimaries; + assertBusy(() -> { + int numberOfBlockedPlugins = 0; + for (ScriptedBlockPlugin plugin : plugins) { + numberOfBlockedPlugins += plugin.scriptedBlockFactory.hits.get(); + } + logger.info("The plugin blocked on {} out of {} shards", numberOfBlockedPlugins, numberOfShards); + assertThat(numberOfBlockedPlugins, greaterThan(0)); + }); + } + + private void disableBlocks(List plugins) throws Exception { + for (ScriptedBlockPlugin plugin : plugins) { + plugin.scriptedBlockFactory.disableBlock(); + } + } + + private void cancelSearch(String action) { + ListTasksResponse listTasksResponse = client().admin().cluster().prepareListTasks().setActions(action).get(); + assertThat(listTasksResponse.getTasks(), hasSize(1)); + TaskInfo searchTask = listTasksResponse.getTasks().get(0); + + logger.info("Cancelling search"); + CancelTasksResponse cancelTasksResponse = client().admin().cluster().prepareCancelTasks().setTaskId(searchTask.getTaskId()).get(); + assertThat(cancelTasksResponse.getTasks(), hasSize(1)); + assertThat(cancelTasksResponse.getTasks().get(0).getTaskId(), equalTo(searchTask.getTaskId())); + } + + private SearchResponse ensureSearchWasCancelled(ListenableActionFuture searchResponse) { + try { + SearchResponse response = searchResponse.actionGet(); + logger.info("Search response {}", response); + assertNotEquals("At least one shard should have failed", 0, response.getFailedShards()); + return response; + } catch (SearchPhaseExecutionException ex) { + logger.info("All shards failed with", ex); + return null; + } + } + + public void testCancellationDuringQueryPhase() throws Exception { + + List plugins = initBlockFactory(); + indexTestData(); + + logger.info("Executing search"); + ListenableActionFuture searchResponse = client().prepareSearch("test").setQuery( + scriptQuery(new Script(NativeTestScriptedBlockFactory.TEST_NATIVE_BLOCK_SCRIPT, ScriptType.INLINE, "native", null))) + .execute(); + + awaitForBlock(plugins); + cancelSearch(SearchAction.NAME); + disableBlocks(plugins); + ensureSearchWasCancelled(searchResponse); + } + + public void testCancellationDuringFetchPhase() throws Exception { + + List plugins = initBlockFactory(); + indexTestData(); + + logger.info("Executing search"); + ListenableActionFuture searchResponse = client().prepareSearch("test") + .addScriptField("test_field", + new Script(NativeTestScriptedBlockFactory.TEST_NATIVE_BLOCK_SCRIPT, ScriptType.INLINE, "native", null) + ).execute(); + + awaitForBlock(plugins); + cancelSearch(SearchAction.NAME); + disableBlocks(plugins); + ensureSearchWasCancelled(searchResponse); + } + + public void testCancellationOfScrollSearches() throws Exception { + + List plugins = initBlockFactory(); + indexTestData(); + + logger.info("Executing search"); + ListenableActionFuture searchResponse = client().prepareSearch("test") + .setScroll(TimeValue.timeValueSeconds(10)) + .setSize(5) + .setQuery( + scriptQuery(new Script(NativeTestScriptedBlockFactory.TEST_NATIVE_BLOCK_SCRIPT, ScriptType.INLINE, "native", null))) + .execute(); + + awaitForBlock(plugins); + cancelSearch(SearchAction.NAME); + disableBlocks(plugins); + ensureSearchWasCancelled(searchResponse); + } + + + public void testCancellationOfScrollSearchesOnFollowupRequests() throws Exception { + + List plugins = initBlockFactory(); + indexTestData(); + + // Disable block so the first request would pass + disableBlocks(plugins); + + logger.info("Executing search"); + TimeValue keepAlive = TimeValue.timeValueSeconds(5); + SearchResponse searchResponse = client().prepareSearch("test") + .setScroll(keepAlive) + .setSize(2) + .setQuery( + scriptQuery(new Script(NativeTestScriptedBlockFactory.TEST_NATIVE_BLOCK_SCRIPT, ScriptType.INLINE, "native", null))) + .get(); + + assertNotNull(searchResponse.getScrollId()); + + // Enable block so the second request would block + for (ScriptedBlockPlugin plugin : plugins) { + plugin.scriptedBlockFactory.reset(); + plugin.scriptedBlockFactory.enableBlock(); + } + + String scrollId = searchResponse.getScrollId(); + logger.info("Executing scroll with id {}", scrollId); + ListenableActionFuture scrollResponse = client().prepareSearchScroll(searchResponse.getScrollId()) + .setScroll(keepAlive).execute(); + + awaitForBlock(plugins); + cancelSearch(SearchScrollAction.NAME); + disableBlocks(plugins); + + SearchResponse response = ensureSearchWasCancelled(scrollResponse); + if (response != null){ + // The response didn't fail completely - update scroll id + scrollId = response.getScrollId(); + } + logger.info("Cleaning scroll with id {}", scrollId); + client().prepareClearScroll().addScrollId(scrollId).get(); + } + + + public static class ScriptedBlockPlugin extends Plugin implements ScriptPlugin { + private NativeTestScriptedBlockFactory scriptedBlockFactory; + + public ScriptedBlockPlugin() { + scriptedBlockFactory = new NativeTestScriptedBlockFactory(); + } + + @Override + public List getNativeScripts() { + return Collections.singletonList(scriptedBlockFactory); + } + } + + private static class NativeTestScriptedBlockFactory implements NativeScriptFactory { + + public static final String TEST_NATIVE_BLOCK_SCRIPT = "native_test_search_block_script"; + + private final AtomicInteger hits = new AtomicInteger(); + + private final AtomicBoolean shouldBlock = new AtomicBoolean(true); + + public NativeTestScriptedBlockFactory() { + } + + public void reset() { + hits.set(0); + } + + public void disableBlock() { + shouldBlock.set(false); + } + + public void enableBlock() { + shouldBlock.set(true); + } + + @Override + public ExecutableScript newScript(Map params) { + return new NativeTestScriptedBlock(); + } + + @Override + public boolean needsScores() { + return false; + } + + @Override + public String getName() { + return TEST_NATIVE_BLOCK_SCRIPT; + } + + public class NativeTestScriptedBlock extends AbstractSearchScript { + @Override + public Object run() { + hits.incrementAndGet(); + try { + awaitBusy(() -> shouldBlock.get() == false); + } catch (Exception e) { + throw new RuntimeException(e); + } + return true; + } + } + } + +} diff --git a/core/src/test/java/org/elasticsearch/search/SearchCancellationTests.java b/core/src/test/java/org/elasticsearch/search/SearchCancellationTests.java new file mode 100644 index 0000000000000..5026be9d52135 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/search/SearchCancellationTests.java @@ -0,0 +1,97 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.search.LeafCollector; +import org.apache.lucene.search.TotalHitCountCollector; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.TestUtil; +import org.elasticsearch.search.query.CancellableCollector; +import org.elasticsearch.tasks.TaskCancelledException; +import org.elasticsearch.test.ESTestCase; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +public class SearchCancellationTests extends ESTestCase { + + static Directory dir; + static IndexReader reader; + + @BeforeClass + public static void before() throws IOException { + dir = newDirectory(); + RandomIndexWriter w = new RandomIndexWriter(random(), dir); + indexRandomDocuments(w, TestUtil.nextInt(random(), 2, 20)); + w.flush(); + indexRandomDocuments(w, TestUtil.nextInt(random(), 1, 20)); + reader = w.getReader(); + w.close(); + } + + private static void indexRandomDocuments(RandomIndexWriter w, int numDocs) throws IOException { + for (int i = 0; i < numDocs; ++i) { + final int numHoles = random().nextInt(5); + for (int j = 0; j < numHoles; ++j) { + w.addDocument(new Document()); + } + Document doc = new Document(); + doc.add(new StringField("foo", "bar", Field.Store.NO)); + w.addDocument(doc); + } + } + + @AfterClass + public static void after() throws IOException { + IOUtils.close(reader, dir); + dir = null; + reader = null; + } + + + public void testLowLevelCancellableCollector() throws IOException { + TotalHitCountCollector collector = new TotalHitCountCollector(); + AtomicBoolean cancelled = new AtomicBoolean(); + CancellableCollector cancellableCollector = new CancellableCollector(cancelled::get, true, collector); + final LeafCollector leafCollector = cancellableCollector.getLeafCollector(reader.leaves().get(0)); + leafCollector.collect(0); + cancelled.set(true); + expectThrows(TaskCancelledException.class, () -> leafCollector.collect(1)); + } + + public void testCancellableCollector() throws IOException { + TotalHitCountCollector collector = new TotalHitCountCollector(); + AtomicBoolean cancelled = new AtomicBoolean(); + CancellableCollector cancellableCollector = new CancellableCollector(cancelled::get, false, collector); + final LeafCollector leafCollector = cancellableCollector.getLeafCollector(reader.leaves().get(0)); + leafCollector.collect(0); + cancelled.set(true); + leafCollector.collect(1); + expectThrows(TaskCancelledException.class, () -> cancellableCollector.getLeafCollector(reader.leaves().get(1))); + } + +} diff --git a/core/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/core/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 0def6726e0368..717d64a53e796 100644 --- a/core/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/core/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchTask; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.Strings; @@ -175,11 +176,12 @@ public void onFailure(Exception e) { try { QuerySearchResultProvider querySearchResultProvider = service.executeQueryPhase( new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT, - new SearchSourceBuilder(), new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY))); + new SearchSourceBuilder(), new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY)), + new SearchTask(123L, "", "", "", null)); IntArrayList intCursors = new IntArrayList(1); intCursors.add(0); ShardFetchRequest req = new ShardFetchRequest(querySearchResultProvider.id(), intCursors, null /* not a scroll */); - service.executeFetchPhase(req); + service.executeFetchPhase(req, new SearchTask(123L, "", "", "", null)); } catch (AlreadyClosedException ex) { throw ex; } catch (IllegalStateException ex) { diff --git a/core/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java b/core/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java index 5bbae82b7d7ae..65aa5f992e635 100644 --- a/core/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java +++ b/core/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java @@ -40,6 +40,7 @@ import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.Weight; import org.apache.lucene.store.Directory; +import org.elasticsearch.action.search.SearchTask; import org.elasticsearch.index.query.ParsedQuery; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.TestSearchContext; @@ -54,6 +55,7 @@ private void countTestCase(Query query, IndexReader reader, boolean shouldCollec TestSearchContext context = new TestSearchContext(null); context.parsedQuery(new ParsedQuery(query)); context.setSize(0); + context.setTask(new SearchTask(123L, "", "", "", null)); IndexSearcher searcher = new IndexSearcher(reader); final AtomicBoolean collected = new AtomicBoolean(); @@ -123,6 +125,7 @@ public void testPostFilterDisablesCountOptimization() throws Exception { TestSearchContext context = new TestSearchContext(null); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); context.setSize(0); + context.setTask(new SearchTask(123L, "", "", "", null)); final AtomicBoolean collected = new AtomicBoolean(); IndexSearcher contextSearcher = new IndexSearcher(new MultiReader()) { @@ -146,6 +149,7 @@ public void testMinScoreDisablesCountOptimization() throws Exception { TestSearchContext context = new TestSearchContext(null); context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); context.setSize(0); + context.setTask(new SearchTask(123L, "", "", "", null)); final AtomicBoolean collected = new AtomicBoolean(); IndexSearcher contextSearcher = new IndexSearcher(new MultiReader()) { diff --git a/docs/reference/cluster/tasks.asciidoc b/docs/reference/cluster/tasks.asciidoc index 2176a432291fd..ce550a689bf4a 100644 --- a/docs/reference/cluster/tasks.asciidoc +++ b/docs/reference/cluster/tasks.asciidoc @@ -108,6 +108,7 @@ GET _cat/tasks // CONSOLE [float] +[[task-cancellation]] === Task Cancellation If a long-running task supports cancellation, it can be cancelled by the following command: diff --git a/docs/reference/search.asciidoc b/docs/reference/search.asciidoc index 61d807cc21237..7a26ee2cf6002 100644 --- a/docs/reference/search.asciidoc +++ b/docs/reference/search.asciidoc @@ -92,6 +92,19 @@ timeout. The setting key is `search.default_search_timeout` and can be set using the <> endpoints. Setting this value to `-1` resets the global search timeout to no timeout. +[float] +[[global-search-cancellation]] +== Search Cancellation + +Searches can be cancelled using standard <> +mechanism. By default, a running search only checks if it is cancelled or +not on segment boundaries, therefore the cancellation can be delayed by large +segments. The search cancellation responsiveness can be improved by setting +the dynamic cluster-level setting `search.low_level_cancellation` to `true`. +However, it comes with an additional overhead of more frequent cancellation +checks that can be noticeable on large fast running search queries. Changing this +setting only affects the searches that start after the change is made. + -- include::search/search.asciidoc[] diff --git a/docs/reference/search/profile.asciidoc b/docs/reference/search/profile.asciidoc index 0a03b322858e4..52b744d30e95c 100644 --- a/docs/reference/search/profile.asciidoc +++ b/docs/reference/search/profile.asciidoc @@ -118,9 +118,16 @@ This will yield the following result: "rewrite_time": 51443, "collector": [ { - "name": "SimpleTopScoreDocCollector", - "reason": "search_top_hits", - "time": "0.06989100000ms" + "name": "CancellableCollector", + "reason": "search_cancelled", + "time": "0.3043110000ms", + "children": [ + { + "name": "SimpleTopScoreDocCollector", + "reason": "search_top_hits", + "time": "0.03227300000ms" + } + ] } ] } @@ -150,7 +157,8 @@ This will yield the following result: // TESTRESPONSE[s/"build_scorer": 42602/"build_scorer": $body.profile.shards.0.searches.0.query.0.children.1.breakdown.build_scorer/] // TESTRESPONSE[s/"create_weight": 89323/"create_weight": $body.profile.shards.0.searches.0.query.0.children.1.breakdown.create_weight/] // TESTRESPONSE[s/"next_doc": 2852/"next_doc": $body.profile.shards.0.searches.0.query.0.children.1.breakdown.next_doc/] -// TESTRESPONSE[s/"time": "0.06989100000ms"/"time": $body.profile.shards.0.searches.0.collector.0.time/] +// TESTRESPONSE[s/"time": "0.3043110000ms"/"time": $body.profile.shards.0.searches.0.collector.0.time/] +// TESTRESPONSE[s/"time": "0.03227300000ms"/"time": $body.profile.shards.0.searches.0.collector.0.children.0.time/] // Sorry for this mess.... <1> Search results are returned, but were omitted here for brevity @@ -390,21 +398,30 @@ Looking at the previous example: [source,js] -------------------------------------------------- "collector": [ - { - "name": "SimpleTopScoreDocCollector", - "reason": "search_top_hits", - "time": "0.06989100000ms" - } + { + "name": "CancellableCollector", + "reason": "search_cancelled", + "time": "0.3043110000ms", + "children": [ + { + "name": "SimpleTopScoreDocCollector", + "reason": "search_top_hits", + "time": "0.03227300000ms" + } + ] + } ] -------------------------------------------------- // TESTRESPONSE[s/^/{\n"took": $body.took,\n"timed_out": $body.timed_out,\n"_shards": $body._shards,\n"hits": $body.hits,\n"profile": {\n"shards": [ {\n"id": "$body.profile.shards.0.id",\n"searches": [{\n"query": $body.profile.shards.0.searches.0.query,\n"rewrite_time": $body.profile.shards.0.searches.0.rewrite_time,/] // TESTRESPONSE[s/]$/]}], "aggregations": []}]}}/] -// TESTRESPONSE[s/"time": "0.06989100000ms"/"time": $body.profile.shards.0.searches.0.collector.0.time/] +// TESTRESPONSE[s/"time": "0.3043110000ms"/"time": $body.profile.shards.0.searches.0.collector.0.time/] +// TESTRESPONSE[s/"time": "0.03227300000ms"/"time": $body.profile.shards.0.searches.0.collector.0.children.0.time/] -We see a single collector named `SimpleTopScoreDocCollector`. This is the default "scoring and sorting" Collector -used by Elasticsearch. The `"reason"` field attempts to give a plain english description of the class name. The +We see a single collector named `SimpleTopScoreDocCollector` wrapped into `CancellableCollector`. `SimpleTopScoreDocCollector` is the default "scoring and sorting" +`Collector` used by Elasticsearch. The `"reason"` field attempts to give a plain english description of the class name. The `"time"` is similar to the time in the Query tree: a wall-clock time inclusive of all children. Similarly, `children` lists -all sub-collectors. +all sub-collectors. The `CancellableCollector` that wraps `SimpleTopScoreDocCollector` is used by elasticsearch to detect if the current +search was cancelled and stop collecting documents as soon as it occurs. It should be noted that Collector times are **independent** from the Query times. They are calculated, combined and normalized independently! Due to the nature of Lucene's execution, it is impossible to "merge" the times diff --git a/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java b/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java index aa041a1219577..e4c0e1d5abc2c 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java +++ b/test/framework/src/main/java/org/elasticsearch/test/TestSearchContext.java @@ -22,6 +22,7 @@ import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.Query; import org.apache.lucene.util.Counter; +import org.elasticsearch.action.search.SearchTask; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.unit.TimeValue; @@ -80,6 +81,7 @@ public class TestSearchContext extends SearchContext { ParsedQuery postFilter; Query query; Float minScore; + SearchTask task; ContextIndexSearcher searcher; int size; @@ -324,6 +326,11 @@ public void terminateAfter(int terminateAfter) { this.terminateAfter = terminateAfter; } + @Override + public boolean lowLevelCancellation() { + return false; + } + @Override public SearchContext minimumScore(float minimumScore) { this.minScore = minimumScore; @@ -571,4 +578,18 @@ public QueryShardContext getQueryShardContext() { return queryShardContext; } + @Override + public void setTask(SearchTask task) { + this.task = task; + } + + @Override + public SearchTask getTask() { + return task; + } + + @Override + public boolean isCancelled() { + return task.isCancelled(); + } }