Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ abstract class AbstractAsyncAction {

private final long startTime;

protected AbstractAsyncAction() {
this.startTime = System.currentTimeMillis();
protected AbstractAsyncAction() { this(System.currentTimeMillis());}

protected AbstractAsyncAction(long startTime) {
this.startTime = startTime;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,10 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchPhaseResult;
Expand All @@ -45,86 +40,58 @@
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import static org.elasticsearch.action.search.TransportSearchHelper.internalSearchRequest;

abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult> extends AbstractAsyncAction {

protected final Logger logger;
protected final SearchTransportService searchTransportService;
private final IndexNameExpressionResolver indexNameExpressionResolver;
protected final SearchPhaseController searchPhaseController;
protected final ThreadPool threadPool;
private final Executor executor;
protected final ActionListener<SearchResponse> listener;
protected final GroupShardsIterator shardsIts;
private final GroupShardsIterator shardsIts;
protected final SearchRequest request;
protected final ClusterState clusterState;
protected final DiscoveryNodes nodes;
/** Used by subclasses to resolve node ids to DiscoveryNodes. **/
protected final Function<String, DiscoveryNode> nodeIdToDiscoveryNode;
protected final int expectedSuccessfulOps;
private final int expectedTotalOps;
protected final AtomicInteger successfulOps = new AtomicInteger();
private final AtomicInteger totalOps = new AtomicInteger();
protected final AtomicArray<FirstResult> firstResults;
private final Map<String, String[]> perIndexFilteringAliases;
private final long clusterStateVersion;
private volatile AtomicArray<ShardSearchFailure> shardFailures;
private final Object shardFailuresMutex = new Object();
protected volatile ScoreDoc[] sortedShardDocs;

protected AbstractSearchAsyncAction(Logger logger, SearchTransportService searchTransportService, ClusterService clusterService,
IndexNameExpressionResolver indexNameExpressionResolver,
SearchPhaseController searchPhaseController, ThreadPool threadPool, SearchRequest request,
ActionListener<SearchResponse> listener) {
protected AbstractSearchAsyncAction(Logger logger, SearchTransportService searchTransportService,
Function<String, DiscoveryNode> nodeIdToDiscoveryNode,
Map<String, String[]> perIndexFilteringAliases, Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator shardsIts, long startTime,
long clusterStateVersion) {
super(startTime);
this.logger = logger;
this.searchTransportService = searchTransportService;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.searchPhaseController = searchPhaseController;
this.threadPool = threadPool;
this.executor = executor;
this.request = request;
this.listener = listener;

this.clusterState = clusterService.state();
nodes = clusterState.nodes();

clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);

// TODO: I think startTime() should become part of ActionRequest and that should be used both for index name
// date math expressions and $now in scripts. This way all apis will deal with now in the same way instead
// of just for the _search api
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, request.indicesOptions(),
startTime(), request.indices());

for (String index : concreteIndices) {
clusterState.blocks().indexBlockedRaiseException(ClusterBlockLevel.READ, index);
}

Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, request.routing(),
request.indices());

shardsIts = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap, request.preference());
final int shardCount = shardsIts.size();
failIfOverShardCountLimit(clusterService, shardCount);
expectedSuccessfulOps = shardCount;
this.perIndexFilteringAliases = perIndexFilteringAliases;
this.nodeIdToDiscoveryNode = nodeIdToDiscoveryNode;
this.clusterStateVersion = clusterStateVersion;
this.shardsIts = shardsIts;
expectedSuccessfulOps = shardsIts.size();
// we need to add 1 for non active partition, since we count it in the total!
expectedTotalOps = shardsIts.totalSizeWith1ForEmpty();

firstResults = new AtomicArray<>(shardsIts.size());
}

private void failIfOverShardCountLimit(ClusterService clusterService, int shardCount) {
final long shardCountLimit = clusterService.getClusterSettings().get(TransportSearchAction.SHARD_COUNT_LIMIT_SETTING);
if (shardCount > shardCountLimit) {
throw new IllegalArgumentException("Trying to query " + shardCount + " shards, which is over the limit of "
+ shardCountLimit + ". This limit exists because querying many shards at the same time can make the "
+ "job of the coordinating node very CPU and/or memory intensive. It is usually a better idea to "
+ "have a smaller number of larger shards. Update [" + TransportSearchAction.SHARD_COUNT_LIMIT_SETTING.getKey()
+ "] to a greater value if you really want to query that many shards at the same time.");
}
}


public void start() {
if (expectedSuccessfulOps == 0) {
Expand Down Expand Up @@ -152,12 +119,11 @@ void performFirstPhase(final int shardIndex, final ShardIterator shardIt, final
// no more active shards... (we should not really get here, but just for safety)
onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
} else {
final DiscoveryNode node = nodes.get(shard.currentNodeId());
final DiscoveryNode node = nodeIdToDiscoveryNode.apply(shard.currentNodeId());
if (node == null) {
onFirstPhaseResult(shardIndex, shard, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
} else {
String[] filteringAliases = indexNameExpressionResolver.filteringAliases(clusterState,
shard.index().getName(), request.indices());
String[] filteringAliases = perIndexFilteringAliases.get(shard.index().getName());
sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request, filteringAliases,
startTime()), new ActionListener<FirstResult>() {
@Override
Expand Down Expand Up @@ -319,7 +285,7 @@ protected final void addShardFailure(final int shardIndex, @Nullable SearchShard
private void raiseEarlyFailure(Exception e) {
for (AtomicArray.Entry<FirstResult> entry : firstResults.asList()) {
try {
DiscoveryNode node = nodes.get(entry.value.shardTarget().nodeId());
DiscoveryNode node = nodeIdToDiscoveryNode.apply(entry.value.shardTarget().nodeId());
sendReleaseSearchContext(entry.value.id(), node);
} catch (Exception inner) {
inner.addSuppressed(e);
Expand All @@ -344,7 +310,7 @@ protected void releaseIrrelevantSearchContexts(AtomicArray<? extends QuerySearch
if (queryResult.hasHits()
&& docIdsToLoad.get(entry.index) == null) { // but none of them made it to the global top docs
try {
DiscoveryNode node = nodes.get(entry.value.queryResult().shardTarget().nodeId());
DiscoveryNode node = nodeIdToDiscoveryNode.apply(entry.value.queryResult().shardTarget().nodeId());
sendReleaseSearchContext(entry.value.queryResult().id(), node);
} catch (Exception e) {
logger.trace("failed to release context", e);
Expand Down Expand Up @@ -402,12 +368,17 @@ final void innerMoveToSecondPhase() throws Exception {
sb.append(result.shardTarget());
}

logger.trace("Moving to second phase, based on results from: {} (cluster state version: {})", sb, clusterState.version());
logger.trace("Moving to second phase, based on results from: {} (cluster state version: {})", sb, clusterStateVersion);
}
moveToSecondPhase();
}

protected abstract void moveToSecondPhase() throws Exception;

protected abstract String firstPhaseName();

protected Executor getExecutor() {
return executor;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,34 @@
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<DfsSearchResult> {

private final AtomicArray<QueryFetchSearchResult> queryFetchResults;

private final SearchPhaseController searchPhaseController;
SearchDfsQueryAndFetchAsyncAction(Logger logger, SearchTransportService searchTransportService,
ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
SearchPhaseController searchPhaseController, ThreadPool threadPool,
SearchRequest request, ActionListener<SearchResponse> listener) {
super(logger, searchTransportService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool,
request, listener);
Function<String, DiscoveryNode> nodeIdToDiscoveryNode,
Map<String, String[]> perIndexFilteringAliases, SearchPhaseController searchPhaseController,
Executor executor, SearchRequest request, ActionListener<SearchResponse> listener,
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion) {
super(logger, searchTransportService, nodeIdToDiscoveryNode, perIndexFilteringAliases, executor,
request, listener, shardsIts, startTime, clusterStateVersion);
this.searchPhaseController = searchPhaseController;
queryFetchResults = new AtomicArray<>(firstResults.length());
}

Expand All @@ -70,7 +73,7 @@ protected void moveToSecondPhase() {

for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
DfsSearchResult dfsResult = entry.value;
DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
DiscoveryNode node = nodeIdToDiscoveryNode.apply(dfsResult.shardTarget().nodeId());
QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest);
}
Expand Down Expand Up @@ -115,7 +118,7 @@ void onSecondPhaseFailure(Exception e, QuerySearchRequest querySearchRequest, in
}

private void finishHim() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
getExecutor().execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.dfs.AggregatedDfs;
Expand All @@ -39,23 +38,28 @@
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSearchResult> {

final AtomicArray<QuerySearchResult> queryResults;
final AtomicArray<FetchSearchResult> fetchResults;
final AtomicArray<IntArrayList> docIdsToLoad;
private final SearchPhaseController searchPhaseController;

SearchDfsQueryThenFetchAsyncAction(Logger logger, SearchTransportService searchTransportService,
ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
SearchPhaseController searchPhaseController, ThreadPool threadPool,
SearchRequest request, ActionListener<SearchResponse> listener) {
super(logger, searchTransportService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool,
request, listener);
Function<String, DiscoveryNode> nodeIdToDiscoveryNode,
Map<String, String[]> perIndexFilteringAliases, SearchPhaseController searchPhaseController,
Executor executor, SearchRequest request, ActionListener<SearchResponse> listener,
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion) {
super(logger, searchTransportService, nodeIdToDiscoveryNode, perIndexFilteringAliases, executor,
request, listener, shardsIts, startTime, clusterStateVersion);
this.searchPhaseController = searchPhaseController;
queryResults = new AtomicArray<>(firstResults.length());
fetchResults = new AtomicArray<>(firstResults.length());
docIdsToLoad = new AtomicArray<>(firstResults.length());
Expand All @@ -78,7 +82,7 @@ protected void moveToSecondPhase() {
final AtomicInteger counter = new AtomicInteger(firstResults.asList().size());
for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
DfsSearchResult dfsResult = entry.value;
DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
DiscoveryNode node = nodeIdToDiscoveryNode.apply(dfsResult.shardTarget().nodeId());
QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
executeQuery(entry.index, dfsResult, counter, querySearchRequest, node);
}
Expand Down Expand Up @@ -149,7 +153,7 @@ void innerExecuteFetchPhase() throws Exception {
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
for (final AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResult queryResult = queryResults.get(entry.index);
DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
DiscoveryNode node = nodeIdToDiscoveryNode.apply(queryResult.shardTarget().nodeId());
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
Expand Down Expand Up @@ -192,7 +196,7 @@ void onFetchFailure(Exception e, ShardFetchSearchRequest fetchSearchRequest, int
}

private void finishHim() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
getExecutor().execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
final boolean isScrollRequest = request.scroll() != null;
Expand Down
Loading