Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final SearchTimeProvider timeProvider;
private final SearchResponse.Clusters clusters;

private final GroupShardsIterator<SearchShardIterator> toSkipShardsIts;
protected final GroupShardsIterator<SearchShardIterator> toSkipShardsIts;
protected final GroupShardsIterator<SearchShardIterator> shardsIts;
private final int expectedTotalOps;
private final AtomicInteger totalOps = new AtomicInteger();
Expand Down Expand Up @@ -375,6 +375,11 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard,
// we do make sure to clean it on a successful response from a shard
SearchShardTarget shardTarget = shardIt.newSearchShardTarget(nodeId);
onShardFailure(shardIndex, shardTarget, e);
final ShardRouting nextShard = shardIt.nextOrNull();
final boolean lastShard = nextShard == null;
if (lastShard) {
onShardGroupFailure(shardIndex, shardTarget, e);
}

if (totalOps.incrementAndGet() == expectedTotalOps) {
if (logger.isDebugEnabled()) {
Expand All @@ -385,11 +390,8 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard,
logger.trace(new ParameterizedMessage("{}: Failed to execute [{}]", shard, request), e);
}
}
onShardGroupFailure(shardIndex, e);
onPhaseDone();
} else {
final ShardRouting nextShard = shardIt.nextOrNull();
final boolean lastShard = nextShard == null;
// trace log this exception
logger.trace(() -> new ParameterizedMessage(
"{}: Failed to execute [{}] lastShard [{}]",
Expand All @@ -405,18 +407,18 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard,
shard != null ? shard.shortSummary() : shardIt.shardId(), request, lastShard), e);
}
}
onShardGroupFailure(shardIndex, e);
}
}
}

/**
* Executed once for every {@link ShardId} that failed on all available shard routing.
*
* @param shardIndex the shard target that failed
* @param exc the final failure reason
* @param shardIndex the shard index that failed
* @param shardTarget the last shard target for this failure
* @param exc the last failure reason
*/
protected void onShardGroupFailure(int shardIndex, Exception exc) {}
protected void onShardGroupFailure(int shardIndex, SearchShardTarget shardTarget, Exception exc) {}

/**
* Executed once for every failed shard level request. This method is invoked before the next replica is tried for the given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.query.QuerySearchRequest;
Expand Down Expand Up @@ -72,8 +71,6 @@ public void run() throws IOException {
final CountedCollector<SearchPhaseResult> counter = new CountedCollector<>(queryResult::consumeResult,
resultList.size(),
() -> context.executeNextPhase(this, nextPhaseFactory.apply(queryResult)), context);
final SearchSourceBuilder sourceBuilder = context.getRequest().source();
progressListener.notifyListShards(progressListener.searchShards(resultList), sourceBuilder == null || sourceBuilder.size() != 0);
for (final DfsSearchResult dfsResult : resultList) {
final SearchShardTarget searchShardTarget = dfsResult.getSearchShardTarget();
Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
Expand All @@ -97,7 +94,7 @@ public void onFailure(Exception exception) {
try {
context.getLogger().debug(() -> new ParameterizedMessage("[{}] Failed to execute query phase",
querySearchRequest.contextId()), exception);
progressListener.notifyQueryFailure(shardIndex, exception);
progressListener.notifyQueryFailure(shardIndex, searchShardTarget, exception);
counter.onFailure(shardIndex, searchShardTarget, exception);
} finally {
// the query might not have been executed at all (for example because thread pool rejected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.transport.Transport;
Expand Down Expand Up @@ -51,6 +52,10 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
shardsIts, timeProvider, clusterState, task, new ArraySearchPhaseResults<>(shardsIts.size()),
request.getMaxConcurrentShardRequests(), clusters);
this.searchPhaseController = searchPhaseController;
SearchProgressListener progressListener = task.getProgressListener();
SearchSourceBuilder sourceBuilder = request.source();
progressListener.notifyListShards(progressListener.searchShards(this.shardsIts),
progressListener.searchShards(toSkipShardsIts), clusters, sourceBuilder == null || sourceBuilder.size() != 0);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,9 +664,9 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
}
numReducePhases++;
index = 1;
if (hasAggs) {
if (hasAggs || hasTopDocs) {
progressListener.notifyPartialReduce(progressListener.searchShards(processedShards),
topDocsStats.getTotalHits(), aggsBuffer[0], numReducePhases);
topDocsStats.getTotalHits(), hasAggs ? aggsBuffer[0] : null, numReducePhases);
}
}
final int i = index++;
Expand Down Expand Up @@ -696,7 +696,7 @@ public ReducedQueryPhase reduce() {
ReducedQueryPhase reducePhase = controller.reducedQueryPhase(results.asList(),
getRemainingAggs(), getRemainingTopDocs(), topDocsStats, numReducePhases, false, performFinalReduce);
progressListener.notifyReduce(progressListener.searchShards(results.asList()),
reducePhase.totalHits, reducePhase.aggregations);
reducePhase.totalHits, reducePhase.aggregations, reducePhase.numReducePhases);
return reducePhase;
}

Expand Down Expand Up @@ -751,7 +751,8 @@ ReducedQueryPhase reduce() {
List<SearchPhaseResult> resultList = results.asList();
final ReducedQueryPhase reducePhase =
reducedQueryPhase(resultList, isScrollRequest, trackTotalHitsUpTo, request.isFinalReduce());
listener.notifyReduce(listener.searchShards(resultList), reducePhase.totalHits, reducePhase.aggregations);
listener.notifyReduce(listener.searchShards(resultList), reducePhase.totalHits,
reducePhase.aggregations, reducePhase.numReducePhases);
return reducePhase;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.search.SearchResponse.Clusters;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
Expand All @@ -49,24 +50,27 @@ abstract class SearchProgressListener {
* Executed when shards are ready to be queried.
*
* @param shards The list of shards to query.
* @param skippedShards The list of skipped shards.
* @param clusters The statistics for remote clusters included in the search.
* @param fetchPhase <code>true</code> if the search needs a fetch phase, <code>false</code> otherwise.
**/
public void onListShards(List<SearchShard> shards, boolean fetchPhase) {}
public void onListShards(List<SearchShard> shards, List<SearchShard> skippedShards, Clusters clusters, boolean fetchPhase) {}

/**
* Executed when a shard returns a query result.
*
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards(List, boolean)} )}.
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards} )}.
*/
public void onQueryResult(int shardIndex) {}

/**
* Executed when a shard reports a query failure.
*
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards(List, boolean)})}.
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards})}.
* @param shardTarget The last shard target that thrown an exception.
* @param exc The cause of the failure.
*/
public void onQueryFailure(int shardIndex, Exception exc) {}
public void onQueryFailure(int shardIndex, SearchShardTarget shardTarget, Exception exc) {}

/**
* Executed when a partial reduce is created. The number of partial reduce can be controlled via
Expand All @@ -75,38 +79,39 @@ public void onQueryFailure(int shardIndex, Exception exc) {}
* @param shards The list of shards that are part of this reduce.
* @param totalHits The total number of hits in this reduce.
* @param aggs The partial result for aggregations.
* @param version The version number for this reduce.
* @param reducePhase The version number for this reduce.
*/
public void onPartialReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int version) {}
public void onPartialReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {}

/**
* Executed once when the final reduce is created.
*
* @param shards The list of shards that are part of this reduce.
* @param totalHits The total number of hits in this reduce.
* @param aggs The final result for aggregations.
* @param reducePhase The version number for this reduce.
*/
public void onReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs) {}
public void onReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {}

/**
* Executed when a shard returns a fetch result.
*
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards(List, boolean)})}.
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards})}.
*/
public void onFetchResult(int shardIndex) {}

/**
* Executed when a shard reports a fetch failure.
*
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards(List, boolean)})}.
* @param shardIndex The index of the shard in the list provided by {@link SearchProgressListener#onListShards})}.
* @param exc The cause of the failure.
*/
public void onFetchFailure(int shardIndex, Exception exc) {}

final void notifyListShards(List<SearchShard> shards, boolean fetchPhase) {
final void notifyListShards(List<SearchShard> shards, List<SearchShard> skippedShards, Clusters clusters, boolean fetchPhase) {
this.shards = shards;
try {
onListShards(shards, fetchPhase);
onListShards(shards, skippedShards, clusters, fetchPhase);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("Failed to execute progress listener on list shards"), e);
}
Expand All @@ -121,26 +126,26 @@ final void notifyQueryResult(int shardIndex) {
}
}

final void notifyQueryFailure(int shardIndex, Exception exc) {
final void notifyQueryFailure(int shardIndex, SearchShardTarget shardTarget, Exception exc) {
try {
onQueryFailure(shardIndex, exc);
onQueryFailure(shardIndex, shardTarget, exc);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("[{}] Failed to execute progress listener on query failure",
shards.get(shardIndex)), e);
}
}

final void notifyPartialReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int version) {
final void notifyPartialReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {
try {
onPartialReduce(shards, totalHits, aggs, version);
onPartialReduce(shards, totalHits, aggs, reducePhase);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("Failed to execute progress listener on partial reduce"), e);
}
}

final void notifyReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs) {
final void notifyReduce(List<SearchShard> shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {
try {
onReduce(shards, totalHits, aggs);
onReduce(shards, totalHits, aggs, reducePhase);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("Failed to execute progress listener on reduce"), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.transport.Transport;
Expand Down Expand Up @@ -57,7 +58,7 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<Se
final SearchProgressListener progressListener = task.getProgressListener();
final SearchSourceBuilder sourceBuilder = request.source();
progressListener.notifyListShards(progressListener.searchShards(this.shardsIts),
sourceBuilder == null || sourceBuilder.size() != 0);
progressListener.searchShards(toSkipShardsIts), clusters, sourceBuilder == null || sourceBuilder.size() != 0);
}

protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard,
Expand All @@ -67,8 +68,8 @@ protected void executePhaseOnShard(final SearchShardIterator shardIt, final Shar
}

@Override
protected void onShardGroupFailure(int shardIndex, Exception exc) {
progressListener.notifyQueryFailure(shardIndex, exc);
protected void onShardGroupFailure(int shardIndex, SearchShardTarget shardTarget, Exception exc) {
progressListener.notifyQueryFailure(shardIndex, shardTarget, exc);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
Expand All @@ -56,9 +55,9 @@
* @see org.elasticsearch.client.Client#search(SearchRequest)
* @see SearchResponse
*/
public final class SearchRequest extends ActionRequest implements IndicesRequest.Replaceable {
public class SearchRequest extends ActionRequest implements IndicesRequest.Replaceable {

private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false"));
public static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false"));

public static final int DEFAULT_PRE_FILTER_SHARD_SIZE = 128;
public static final int DEFAULT_BATCHED_REDUCE_SIZE = 512;
Expand Down Expand Up @@ -597,7 +596,7 @@ public boolean isSuggestOnly() {
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
// generating description in a lazy way since source can be quite big
return new SearchTask(id, type, action, null, parentTaskId, headers) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public SearchRequestBuilder setVersion(boolean version) {
sourceBuilder().version(version);
return this;
}

/**
* Should each {@link org.elasticsearch.search.SearchHit} be returned with the
* sequence number and primary term of the last modification of the document.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ public RestStatus status() {
return RestStatus.status(successfulShards, totalShards, shardFailures);
}

public SearchResponseSections getInternalResponse() {
return internalResponse;
}

/**
* The search hits.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class SearchShard implements Comparable<SearchShard> {
private final String clusterAlias;
private final ShardId shardId;

SearchShard(@Nullable String clusterAlias, ShardId shardId) {
public SearchShard(@Nullable String clusterAlias, ShardId shardId) {
this.clusterAlias = clusterAlias;
this.shardId = shardId;
}
Expand Down
Loading