Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,9 @@ enum ElasticsearchExceptionHandle {
ShardStateAction.NoLongerPrimaryShardException::new, 142),
SCRIPT_EXCEPTION(org.elasticsearch.script.ScriptException.class, org.elasticsearch.script.ScriptException::new, 143),
NOT_MASTER_EXCEPTION(org.elasticsearch.cluster.NotMasterException.class, org.elasticsearch.cluster.NotMasterException::new, 144),
STATUS_EXCEPTION(org.elasticsearch.ElasticsearchStatusException.class, org.elasticsearch.ElasticsearchStatusException::new, 145);
STATUS_EXCEPTION(org.elasticsearch.ElasticsearchStatusException.class, org.elasticsearch.ElasticsearchStatusException::new, 145),
TASK_CANCELLED_EXCEPTION(org.elasticsearch.tasks.TaskCancelledException.class,
org.elasticsearch.tasks.TaskCancelledException::new, 146);

final Class<? extends ElasticsearchException> exceptionClass;
final FunctionThatThrowsIOException<StreamInput, ? extends ElasticsearchException> constructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
protected final SearchRequest request;
/** Used by subclasses to resolve node ids to DiscoveryNodes. **/
protected final Function<String, DiscoveryNode> nodeIdToDiscoveryNode;
protected final SearchTask task;
protected final int expectedSuccessfulOps;
private final int expectedTotalOps;
protected final AtomicInteger successfulOps = new AtomicInteger();
Expand All @@ -74,12 +75,13 @@ protected AbstractSearchAsyncAction(Logger logger, SearchTransportService search
Function<String, DiscoveryNode> nodeIdToDiscoveryNode,
Map<String, AliasFilter> aliasFilter, Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator shardsIts, long startTime,
long clusterStateVersion) {
long clusterStateVersion, SearchTask task) {
super(startTime);
this.logger = logger;
this.searchTransportService = searchTransportService;
this.executor = executor;
this.request = request;
this.task = task;
this.listener = listener;
this.nodeIdToDiscoveryNode = nodeIdToDiscoveryNode;
this.clusterStateVersion = clusterStateVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<DfsSea
Function<String, DiscoveryNode> nodeIdToDiscoveryNode,
Map<String, AliasFilter> aliasFilter, SearchPhaseController searchPhaseController,
Executor executor, SearchRequest request, ActionListener<SearchResponse> listener,
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion) {
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion, SearchTask task) {
super(logger, searchTransportService, nodeIdToDiscoveryNode, aliasFilter, executor,
request, listener, shardsIts, startTime, clusterStateVersion);
request, listener, shardsIts, startTime, clusterStateVersion, task);
this.searchPhaseController = searchPhaseController;
queryFetchResults = new AtomicArray<>(firstResults.length());
}
Expand All @@ -64,7 +64,7 @@ protected String firstPhaseName() {
@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
ActionListener<DfsSearchResult> listener) {
searchTransportService.sendExecuteDfs(node, request, listener);
searchTransportService.sendExecuteDfs(node, request, task, listener);
}

@Override
Expand All @@ -82,7 +82,7 @@ protected void moveToSecondPhase() {

void executeSecondPhase(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter,
final DiscoveryNode node, final QuerySearchRequest querySearchRequest) {
searchTransportService.sendExecuteFetch(node, querySearchRequest, new ActionListener<QueryFetchSearchResult>() {
searchTransportService.sendExecuteFetch(node, querySearchRequest, task, new ActionListener<QueryFetchSearchResult>() {
@Override
public void onResponse(QueryFetchSearchResult result) {
result.shardTarget(dfsResult.shardTarget());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,10 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
Function<String, DiscoveryNode> nodeIdToDiscoveryNode,
Map<String, AliasFilter> aliasFilter, SearchPhaseController searchPhaseController,
Executor executor, SearchRequest request, ActionListener<SearchResponse> listener,
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion) {
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion,
SearchTask task) {
super(logger, searchTransportService, nodeIdToDiscoveryNode, aliasFilter, executor,
request, listener, shardsIts, startTime, clusterStateVersion);
request, listener, shardsIts, startTime, clusterStateVersion, task);
this.searchPhaseController = searchPhaseController;
queryResults = new AtomicArray<>(firstResults.length());
fetchResults = new AtomicArray<>(firstResults.length());
Expand All @@ -74,7 +75,7 @@ protected String firstPhaseName() {
@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
ActionListener<DfsSearchResult> listener) {
searchTransportService.sendExecuteDfs(node, request, listener);
searchTransportService.sendExecuteDfs(node, request, task, listener);
}

@Override
Expand All @@ -91,7 +92,7 @@ protected void moveToSecondPhase() {

void executeQuery(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter,
final QuerySearchRequest querySearchRequest, final DiscoveryNode node) {
searchTransportService.sendExecuteQuery(node, querySearchRequest, new ActionListener<QuerySearchResult>() {
searchTransportService.sendExecuteQuery(node, querySearchRequest, task, new ActionListener<QuerySearchResult>() {
@Override
public void onResponse(QuerySearchResult result) {
result.shardTarget(dfsResult.shardTarget());
Expand Down Expand Up @@ -162,7 +163,7 @@ void innerExecuteFetchPhase() throws Exception {

void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter,
final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
searchTransportService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener<FetchSearchResult>() {
searchTransportService.sendExecuteFetch(node, fetchSearchRequest, task, new ActionListener<FetchSearchResult>() {
@Override
public void onResponse(FetchSearchResult result) {
result.shardTarget(shardTarget);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<QueryFetc
Map<String, AliasFilter> aliasFilter,
SearchPhaseController searchPhaseController, Executor executor,
SearchRequest request, ActionListener<SearchResponse> listener,
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion) {
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion,
SearchTask task) {
super(logger, searchTransportService, nodeIdToDiscoveryNode, aliasFilter, executor,
request, listener, shardsIts, startTime, clusterStateVersion);
request, listener, shardsIts, startTime, clusterStateVersion, task);
this.searchPhaseController = searchPhaseController;

}
Expand All @@ -58,7 +59,7 @@ protected String firstPhaseName() {
@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
ActionListener<QueryFetchSearchResult> listener) {
searchTransportService.sendExecuteFetch(node, request, listener);
searchTransportService.sendExecuteFetch(node, request, task, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySea
AliasFilter> aliasFilter,
SearchPhaseController searchPhaseController, Executor executor,
SearchRequest request, ActionListener<SearchResponse> listener,
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion) {
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion,
SearchTask task) {
super(logger, searchTransportService, nodeIdToDiscoveryNode, aliasFilter, executor, request, listener,
shardsIts, startTime, clusterStateVersion);
shardsIts, startTime, clusterStateVersion, task);
this.searchPhaseController = searchPhaseController;
fetchResults = new AtomicArray<>(firstResults.length());
docIdsToLoad = new AtomicArray<>(firstResults.length());
Expand All @@ -70,7 +71,7 @@ protected String firstPhaseName() {
@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
ActionListener<QuerySearchResultProvider> listener) {
searchTransportService.sendExecuteQuery(node, request, listener);
searchTransportService.sendExecuteQuery(node, request, task, listener);
}

@Override
Expand All @@ -97,7 +98,7 @@ protected void moveToSecondPhase() throws Exception {

void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter,
final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
searchTransportService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener<FetchSearchResult>() {
searchTransportService.sendExecuteFetch(node, fetchSearchRequest, task, new ActionListener<FetchSearchResult>() {
@Override
public void onResponse(FetchSearchResult result) {
result.shardTarget(shardTarget);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -275,6 +277,11 @@ public boolean isSuggestOnly() {
return source != null && source.isSuggestOnly();
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new SearchTask(id, type, action, getDescription(), parentTaskId);
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction {
private final SearchPhaseController searchPhaseController;
private final SearchTransportService searchTransportService;
private final SearchScrollRequest request;
private final SearchTask task;
private final ActionListener<SearchResponse> listener;
private final ParsedScrollId scrollId;
private final DiscoveryNodes nodes;
Expand All @@ -52,13 +53,14 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction {
private final AtomicInteger successfulOps;
private final AtomicInteger counter;

SearchScrollQueryAndFetchAsyncAction(Logger logger, ClusterService clusterService,
SearchTransportService searchTransportService, SearchPhaseController searchPhaseController,
SearchScrollRequest request, ParsedScrollId scrollId, ActionListener<SearchResponse> listener) {
SearchScrollQueryAndFetchAsyncAction(Logger logger, ClusterService clusterService, SearchTransportService searchTransportService,
SearchPhaseController searchPhaseController, SearchScrollRequest request, SearchTask task,
ParsedScrollId scrollId, ActionListener<SearchResponse> listener) {
this.logger = logger;
this.searchPhaseController = searchPhaseController;
this.searchTransportService = searchTransportService;
this.request = request;
this.task = task;
this.listener = listener;
this.scrollId = scrollId;
this.nodes = clusterService.state().nodes();
Expand Down Expand Up @@ -128,7 +130,7 @@ public void start() {

void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) {
InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request);
searchTransportService.sendExecuteFetch(node, internalRequest, new ActionListener<ScrollQueryFetchSearchResult>() {
searchTransportService.sendExecuteFetch(node, internalRequest, task, new ActionListener<ScrollQueryFetchSearchResult>() {
@Override
public void onResponse(ScrollQueryFetchSearchResult result) {
queryFetchResults.set(shardIndex, result.result());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {

private final Logger logger;
private final SearchTask task;
private final SearchTransportService searchTransportService;
private final SearchPhaseController searchPhaseController;
private final SearchScrollRequest request;
Expand All @@ -56,13 +57,14 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
private volatile ScoreDoc[] sortedShardDocs;
private final AtomicInteger successfulOps;

SearchScrollQueryThenFetchAsyncAction(Logger logger, ClusterService clusterService,
SearchTransportService searchTransportService, SearchPhaseController searchPhaseController,
SearchScrollRequest request, ParsedScrollId scrollId, ActionListener<SearchResponse> listener) {
SearchScrollQueryThenFetchAsyncAction(Logger logger, ClusterService clusterService, SearchTransportService searchTransportService,
SearchPhaseController searchPhaseController, SearchScrollRequest request, SearchTask task,
ParsedScrollId scrollId, ActionListener<SearchResponse> listener) {
this.logger = logger;
this.searchTransportService = searchTransportService;
this.searchPhaseController = searchPhaseController;
this.request = request;
this.task = task;
this.listener = listener;
this.scrollId = scrollId;
this.nodes = clusterService.state().nodes();
Expand Down Expand Up @@ -124,7 +126,7 @@ public void start() {

private void executeQueryPhase(final int shardIndex, final AtomicInteger counter, DiscoveryNode node, final long searchId) {
InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request);
searchTransportService.sendExecuteQuery(node, internalRequest, new ActionListener<ScrollQuerySearchResult>() {
searchTransportService.sendExecuteQuery(node, internalRequest, task, new ActionListener<ScrollQuerySearchResult>() {
@Override
public void onResponse(ScrollQuerySearchResult result) {
queryResults.set(shardIndex, result.queryResult());
Expand Down Expand Up @@ -184,7 +186,7 @@ private void executeFetchPhase() throws Exception {
ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index];
ShardFetchRequest shardFetchRequest = new ShardFetchRequest(querySearchResult.id(), docIds, lastEmittedDoc);
DiscoveryNode node = nodes.get(querySearchResult.shardTarget().nodeId());
searchTransportService.sendExecuteFetchScroll(node, shardFetchRequest, new ActionListener<FetchSearchResult>() {
searchTransportService.sendExecuteFetchScroll(node, shardFetchRequest, task, new ActionListener<FetchSearchResult>() {
@Override
public void onResponse(FetchSearchResult result) {
result.shardTarget(querySearchResult.shardTarget());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
import java.util.Objects;
Expand Down Expand Up @@ -107,6 +109,11 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(scroll);
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new SearchTask(id, type, action, getDescription(), parentTaskId);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
34 changes: 34 additions & 0 deletions core/src/main/java/org/elasticsearch/action/search/SearchTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.search;

import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskId;

/**
* Task storing information about a currently running search request.
*/
public class SearchTask extends CancellableTask {

public SearchTask(long id, String type, String action, String description, TaskId parentTaskId) {
super(id, type, action, description, parentTaskId);
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be cool to be able to list the phase and/or which shards you are waiting for. You could put all kinds of cool stuff in here one day! But for now this seems like the right thing to do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's the plan, but I didn't want to clump everything into a single PR.

}
Loading