diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractAsyncAction.java index 3ce14d8dacd87..96db19d547269 100644 --- a/core/src/main/java/org/elasticsearch/action/search/AbstractAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractAsyncAction.java @@ -26,8 +26,10 @@ abstract class AbstractAsyncAction { private final long startTime; - protected AbstractAsyncAction() { - this.startTime = System.currentTimeMillis(); + protected AbstractAsyncAction() { this(System.currentTimeMillis());} + + protected AbstractAsyncAction(long startTime) { + this.startTime = startTime; } /** diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 6cb68b8e9bef5..b9f4120844c30 100644 --- a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -27,15 +27,10 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.support.TransportActions; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.SearchPhaseResult; @@ -45,12 +40,12 @@ import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.QuerySearchResultProvider; -import org.elasticsearch.threadpool.ThreadPool; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import static org.elasticsearch.action.search.TransportSearchHelper.internalSearchRequest; @@ -58,73 +53,45 @@ abstract class AbstractSearchAsyncAction protected final Logger logger; protected final SearchTransportService searchTransportService; - private final IndexNameExpressionResolver indexNameExpressionResolver; - protected final SearchPhaseController searchPhaseController; - protected final ThreadPool threadPool; + private final Executor executor; protected final ActionListener listener; - protected final GroupShardsIterator shardsIts; + private final GroupShardsIterator shardsIts; protected final SearchRequest request; - protected final ClusterState clusterState; - protected final DiscoveryNodes nodes; + /** Used by subclasses to resolve node ids to DiscoveryNodes. **/ + protected final Function nodeIdToDiscoveryNode; protected final int expectedSuccessfulOps; private final int expectedTotalOps; protected final AtomicInteger successfulOps = new AtomicInteger(); private final AtomicInteger totalOps = new AtomicInteger(); protected final AtomicArray firstResults; + private final Map perIndexFilteringAliases; + private final long clusterStateVersion; private volatile AtomicArray shardFailures; private final Object shardFailuresMutex = new Object(); protected volatile ScoreDoc[] sortedShardDocs; - protected AbstractSearchAsyncAction(Logger logger, SearchTransportService searchTransportService, ClusterService clusterService, - IndexNameExpressionResolver indexNameExpressionResolver, - SearchPhaseController searchPhaseController, ThreadPool threadPool, SearchRequest request, - ActionListener listener) { + protected AbstractSearchAsyncAction(Logger logger, SearchTransportService searchTransportService, + Function nodeIdToDiscoveryNode, + Map perIndexFilteringAliases, Executor executor, SearchRequest request, + ActionListener listener, GroupShardsIterator shardsIts, long startTime, + long clusterStateVersion) { + super(startTime); this.logger = logger; this.searchTransportService = searchTransportService; - this.indexNameExpressionResolver = indexNameExpressionResolver; - this.searchPhaseController = searchPhaseController; - this.threadPool = threadPool; + this.executor = executor; this.request = request; this.listener = listener; - - this.clusterState = clusterService.state(); - nodes = clusterState.nodes(); - - clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); - - // TODO: I think startTime() should become part of ActionRequest and that should be used both for index name - // date math expressions and $now in scripts. This way all apis will deal with now in the same way instead - // of just for the _search api - String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, request.indicesOptions(), - startTime(), request.indices()); - - for (String index : concreteIndices) { - clusterState.blocks().indexBlockedRaiseException(ClusterBlockLevel.READ, index); - } - - Map> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, request.routing(), - request.indices()); - - shardsIts = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap, request.preference()); - final int shardCount = shardsIts.size(); - failIfOverShardCountLimit(clusterService, shardCount); - expectedSuccessfulOps = shardCount; + this.perIndexFilteringAliases = perIndexFilteringAliases; + this.nodeIdToDiscoveryNode = nodeIdToDiscoveryNode; + this.clusterStateVersion = clusterStateVersion; + this.shardsIts = shardsIts; + expectedSuccessfulOps = shardsIts.size(); // we need to add 1 for non active partition, since we count it in the total! expectedTotalOps = shardsIts.totalSizeWith1ForEmpty(); - firstResults = new AtomicArray<>(shardsIts.size()); } - private void failIfOverShardCountLimit(ClusterService clusterService, int shardCount) { - final long shardCountLimit = clusterService.getClusterSettings().get(TransportSearchAction.SHARD_COUNT_LIMIT_SETTING); - if (shardCount > shardCountLimit) { - throw new IllegalArgumentException("Trying to query " + shardCount + " shards, which is over the limit of " - + shardCountLimit + ". This limit exists because querying many shards at the same time can make the " - + "job of the coordinating node very CPU and/or memory intensive. It is usually a better idea to " - + "have a smaller number of larger shards. Update [" + TransportSearchAction.SHARD_COUNT_LIMIT_SETTING.getKey() - + "] to a greater value if you really want to query that many shards at the same time."); - } - } + public void start() { if (expectedSuccessfulOps == 0) { @@ -152,12 +119,11 @@ void performFirstPhase(final int shardIndex, final ShardIterator shardIt, final // no more active shards... (we should not really get here, but just for safety) onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); } else { - final DiscoveryNode node = nodes.get(shard.currentNodeId()); + final DiscoveryNode node = nodeIdToDiscoveryNode.apply(shard.currentNodeId()); if (node == null) { onFirstPhaseResult(shardIndex, shard, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); } else { - String[] filteringAliases = indexNameExpressionResolver.filteringAliases(clusterState, - shard.index().getName(), request.indices()); + String[] filteringAliases = perIndexFilteringAliases.get(shard.index().getName()); sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request, filteringAliases, startTime()), new ActionListener() { @Override @@ -319,7 +285,7 @@ protected final void addShardFailure(final int shardIndex, @Nullable SearchShard private void raiseEarlyFailure(Exception e) { for (AtomicArray.Entry entry : firstResults.asList()) { try { - DiscoveryNode node = nodes.get(entry.value.shardTarget().nodeId()); + DiscoveryNode node = nodeIdToDiscoveryNode.apply(entry.value.shardTarget().nodeId()); sendReleaseSearchContext(entry.value.id(), node); } catch (Exception inner) { inner.addSuppressed(e); @@ -344,7 +310,7 @@ protected void releaseIrrelevantSearchContexts(AtomicArray { private final AtomicArray queryFetchResults; - + private final SearchPhaseController searchPhaseController; SearchDfsQueryAndFetchAsyncAction(Logger logger, SearchTransportService searchTransportService, - ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, - SearchPhaseController searchPhaseController, ThreadPool threadPool, - SearchRequest request, ActionListener listener) { - super(logger, searchTransportService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, - request, listener); + Function nodeIdToDiscoveryNode, + Map perIndexFilteringAliases, SearchPhaseController searchPhaseController, + Executor executor, SearchRequest request, ActionListener listener, + GroupShardsIterator shardsIts, long startTime, long clusterStateVersion) { + super(logger, searchTransportService, nodeIdToDiscoveryNode, perIndexFilteringAliases, executor, + request, listener, shardsIts, startTime, clusterStateVersion); + this.searchPhaseController = searchPhaseController; queryFetchResults = new AtomicArray<>(firstResults.length()); } @@ -70,7 +73,7 @@ protected void moveToSecondPhase() { for (final AtomicArray.Entry entry : firstResults.asList()) { DfsSearchResult dfsResult = entry.value; - DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); + DiscoveryNode node = nodeIdToDiscoveryNode.apply(dfsResult.shardTarget().nodeId()); QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest); } @@ -115,7 +118,7 @@ void onSecondPhaseFailure(Exception e, QuerySearchRequest querySearchRequest, in } private void finishHim() { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) { + getExecutor().execute(new ActionRunnable(listener) { @Override public void doRun() throws IOException { sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults); diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java index ccd646ae129c9..a0a5035335f2f 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -26,9 +26,8 @@ import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.dfs.AggregatedDfs; @@ -39,23 +38,28 @@ import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.search.query.QuerySearchRequest; import org.elasticsearch.search.query.QuerySearchResult; -import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.util.Map; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction { final AtomicArray queryResults; final AtomicArray fetchResults; final AtomicArray docIdsToLoad; + private final SearchPhaseController searchPhaseController; SearchDfsQueryThenFetchAsyncAction(Logger logger, SearchTransportService searchTransportService, - ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, - SearchPhaseController searchPhaseController, ThreadPool threadPool, - SearchRequest request, ActionListener listener) { - super(logger, searchTransportService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, - request, listener); + Function nodeIdToDiscoveryNode, + Map perIndexFilteringAliases, SearchPhaseController searchPhaseController, + Executor executor, SearchRequest request, ActionListener listener, + GroupShardsIterator shardsIts, long startTime, long clusterStateVersion) { + super(logger, searchTransportService, nodeIdToDiscoveryNode, perIndexFilteringAliases, executor, + request, listener, shardsIts, startTime, clusterStateVersion); + this.searchPhaseController = searchPhaseController; queryResults = new AtomicArray<>(firstResults.length()); fetchResults = new AtomicArray<>(firstResults.length()); docIdsToLoad = new AtomicArray<>(firstResults.length()); @@ -78,7 +82,7 @@ protected void moveToSecondPhase() { final AtomicInteger counter = new AtomicInteger(firstResults.asList().size()); for (final AtomicArray.Entry entry : firstResults.asList()) { DfsSearchResult dfsResult = entry.value; - DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); + DiscoveryNode node = nodeIdToDiscoveryNode.apply(dfsResult.shardTarget().nodeId()); QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); executeQuery(entry.index, dfsResult, counter, querySearchRequest, node); } @@ -149,7 +153,7 @@ void innerExecuteFetchPhase() throws Exception { final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { QuerySearchResult queryResult = queryResults.get(entry.index); - DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); + DiscoveryNode node = nodeIdToDiscoveryNode.apply(queryResult.shardTarget().nodeId()); ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); } @@ -192,7 +196,7 @@ void onFetchFailure(Exception e, ShardFetchSearchRequest fetchSearchRequest, int } private void finishHim() { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) { + getExecutor().execute(new ActionRunnable(listener) { @Override public void doRun() throws IOException { final boolean isScrollRequest = request.scroll() != null; diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java index d799bc267649f..3137283814297 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java @@ -22,24 +22,29 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.search.fetch.QueryFetchSearchResult; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchTransportRequest; -import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.function.Function; class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction { + private final SearchPhaseController searchPhaseController; + SearchQueryAndFetchAsyncAction(Logger logger, SearchTransportService searchTransportService, - ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, - SearchPhaseController searchPhaseController, ThreadPool threadPool, - SearchRequest request, ActionListener listener) { - super(logger, searchTransportService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, - request, listener); + Function nodeIdToDiscoveryNode, Map perIndexFilteringAliases, + SearchPhaseController searchPhaseController, Executor executor, + SearchRequest request, ActionListener listener, + GroupShardsIterator shardsIts, long startTime, long clusterStateVersion) { + super(logger, searchTransportService, nodeIdToDiscoveryNode, perIndexFilteringAliases, executor, + request, listener, shardsIts, startTime, clusterStateVersion); + this.searchPhaseController = searchPhaseController; } @Override @@ -55,7 +60,7 @@ protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportReq @Override protected void moveToSecondPhase() throws Exception { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) { + getExecutor().execute(new ActionRunnable(listener) { @Override public void doRun() throws IOException { final boolean isScrollRequest = request.scroll() != null; diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 6df2bb3f87e44..edf651e1f2aaa 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -26,9 +26,8 @@ import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.fetch.FetchSearchResult; @@ -36,21 +35,27 @@ import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.search.query.QuerySearchResultProvider; -import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.util.Map; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction { final AtomicArray fetchResults; final AtomicArray docIdsToLoad; - - SearchQueryThenFetchAsyncAction(Logger logger, SearchTransportService searchService, - ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, - SearchPhaseController searchPhaseController, ThreadPool threadPool, - SearchRequest request, ActionListener listener) { - super(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, request, listener); + private final SearchPhaseController searchPhaseController; + + SearchQueryThenFetchAsyncAction(Logger logger, SearchTransportService searchTransportService, + Function nodeIdToDiscoveryNode, Map perIndexFilteringAliases, + SearchPhaseController searchPhaseController, Executor executor, + SearchRequest request, ActionListener listener, + GroupShardsIterator shardsIts, long startTime, long clusterStateVersion) { + super(logger, searchTransportService, nodeIdToDiscoveryNode, perIndexFilteringAliases, executor, request, listener, + shardsIts, startTime, clusterStateVersion); + this.searchPhaseController = searchPhaseController; fetchResults = new AtomicArray<>(firstResults.length()); docIdsToLoad = new AtomicArray<>(firstResults.length()); } @@ -82,7 +87,7 @@ protected void moveToSecondPhase() throws Exception { final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); for (AtomicArray.Entry entry : docIdsToLoad.asList()) { QuerySearchResultProvider queryResult = firstResults.get(entry.index); - DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); + DiscoveryNode node = nodeIdToDiscoveryNode.apply(queryResult.shardTarget().nodeId()); ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), entry, lastEmittedDocPerShard); executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); } @@ -125,7 +130,7 @@ void onFetchFailure(Exception e, ShardFetchSearchRequest fetchSearchRequest, int } private void finishHim() { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) { + getExecutor().execute(new ActionRunnable(listener) { @Override public void doRun() throws IOException { final boolean isScrollRequest = request.scroll() != null; diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 9b9ca48fc33f7..54105dc82dc18 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -23,7 +23,11 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Setting; @@ -37,8 +41,12 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executor; +import java.util.function.Function; import static org.elasticsearch.action.search.SearchType.QUERY_AND_FETCH; import static org.elasticsearch.action.search.SearchType.QUERY_THEN_FETCH; @@ -67,14 +75,33 @@ public TransportSearchAction(Settings settings, ThreadPool threadPool, BigArrays @Override protected void doExecute(SearchRequest searchRequest, ActionListener listener) { + // pure paranoia if time goes backwards we are at least positive + final long startTimeInMillis = Math.max(0, System.currentTimeMillis()); + ClusterState clusterState = clusterService.state(); + clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); + + // TODO: I think startTime() should become part of ActionRequest and that should be used both for index name + // date math expressions and $now in scripts. This way all apis will deal with now in the same way instead + // of just for the _search api + String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, searchRequest.indicesOptions(), + startTimeInMillis, searchRequest.indices()); + Map filteringAliasLookup = new HashMap<>(); + + for (String index : concreteIndices) { + clusterState.blocks().indexBlockedRaiseException(ClusterBlockLevel.READ, index); + filteringAliasLookup.put(index, indexNameExpressionResolver.filteringAliases(clusterState, + index, searchRequest.indices())); + } + + Map> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(), + searchRequest.indices()); + GroupShardsIterator shardIterators = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap, + searchRequest.preference()); + failIfOverShardCountLimit(clusterService, shardIterators.size()); + // optimize search type for cases where there is only one shard group to search on try { - ClusterState clusterState = clusterService.state(); - String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, searchRequest); - Map> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, - searchRequest.routing(), searchRequest.indices()); - int shardCount = clusterService.operationRouting().searchShardsCount(clusterState, concreteIndices, routingMap); - if (shardCount == 1) { + if (shardIterators.size() == 1) { // if we only have one group, then we always want Q_A_F, no need for DFS, and no need to do THEN since we hit one shard searchRequest.searchType(QUERY_AND_FETCH); } @@ -95,27 +122,37 @@ protected void doExecute(SearchRequest searchRequest, ActionListener listener) { + private AbstractSearchAsyncAction searchAsyncAction(SearchRequest searchRequest, GroupShardsIterator shardIterators, long startTime, + ClusterState state, Map filteringAliasLookup, + ActionListener listener) { + final Function nodesLookup = state.nodes()::get; + final long clusterStateVersion = state.version(); + Executor executor = threadPool.executor(ThreadPool.Names.SEARCH); AbstractSearchAsyncAction searchAsyncAction; switch(searchRequest.searchType()) { case DFS_QUERY_THEN_FETCH: - searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, clusterService, - indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener); + searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, nodesLookup, + filteringAliasLookup, searchPhaseController, executor, searchRequest, listener, shardIterators, startTime, + clusterStateVersion); break; case QUERY_THEN_FETCH: - searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, clusterService, - indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener); + searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, nodesLookup, + filteringAliasLookup, searchPhaseController, executor, searchRequest, listener, shardIterators, startTime, + clusterStateVersion); break; case DFS_QUERY_AND_FETCH: - searchAsyncAction = new SearchDfsQueryAndFetchAsyncAction(logger, searchTransportService, clusterService, - indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener); + searchAsyncAction = new SearchDfsQueryAndFetchAsyncAction(logger, searchTransportService, nodesLookup, + filteringAliasLookup, searchPhaseController, executor, searchRequest, listener, shardIterators, startTime, + clusterStateVersion); break; case QUERY_AND_FETCH: - searchAsyncAction = new SearchQueryAndFetchAsyncAction(logger, searchTransportService, clusterService, - indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener); + searchAsyncAction = new SearchQueryAndFetchAsyncAction(logger, searchTransportService, nodesLookup, + filteringAliasLookup, searchPhaseController, executor, searchRequest, listener, shardIterators, startTime, + clusterStateVersion); break; default: throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]"); @@ -123,4 +160,15 @@ private AbstractSearchAsyncAction searchAsyncAction(SearchRequest searchRequest, return searchAsyncAction; } + private void failIfOverShardCountLimit(ClusterService clusterService, int shardCount) { + final long shardCountLimit = clusterService.getClusterSettings().get(SHARD_COUNT_LIMIT_SETTING); + if (shardCount > shardCountLimit) { + throw new IllegalArgumentException("Trying to query " + shardCount + " shards, which is over the limit of " + + shardCountLimit + ". This limit exists because querying many shards at the same time can make the " + + "job of the coordinating node very CPU and/or memory intensive. It is usually a better idea to " + + "have a smaller number of larger shards. Update [" + SHARD_COUNT_LIMIT_SETTING.getKey() + + "] to a greater value if you really want to query that many shards at the same time."); + } + } + } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java b/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java index 10a29963b6322..94cb4b8c8e86c 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java @@ -68,11 +68,6 @@ public ShardIterator getShards(ClusterState clusterState, String index, int shar return preferenceActiveShardIterator(indexShard, clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference); } - public int searchShardsCount(ClusterState clusterState, String[] concreteIndices, @Nullable Map> routing) { - final Set shards = computeTargetedShards(clusterState, concreteIndices, routing); - return shards.size(); - } - public GroupShardsIterator searchShards(ClusterState clusterState, String[] concreteIndices, @Nullable Map> routing, @Nullable String preference) { final Set shards = computeTargetedShards(clusterState, concreteIndices, routing); final Set set = new HashSet<>(shards.size()); diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java new file mode 100644 index 0000000000000..0cd8015a37482 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -0,0 +1,211 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.search; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.PlainShardIterator; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.search.SearchPhaseResult; +import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.internal.ShardSearchTransportRequest; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +public class SearchAsyncActionTests extends ESTestCase { + + public void testFanOutAndCollect() throws InterruptedException { + SearchRequest request = new SearchRequest(); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference response = new AtomicReference<>(); + ActionListener responseListener = new ActionListener() { + @Override + public void onResponse(SearchResponse searchResponse) { + response.set((TestSearchResponse) searchResponse); + } + + @Override + public void onFailure(Exception e) { + logger.warn("test failed", e); + fail(e.getMessage()); + } + }; + DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNode replicaNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); + + Map> nodeToContextMap = new HashMap<>(); + AtomicInteger contextIdGenerator = new AtomicInteger(0); + GroupShardsIterator shardsIter = getShardsIter("idx", randomIntBetween(1, 10), randomBoolean(), primaryNode, replicaNode); + AtomicInteger numFreedContext = new AtomicInteger(); + SearchTransportService transportService = new SearchTransportService(Settings.EMPTY, null) { + @Override + public void sendFreeContext(DiscoveryNode node, long contextId, SearchRequest request) { + numFreedContext.incrementAndGet(); + assertTrue(nodeToContextMap.containsKey(node)); + assertTrue(nodeToContextMap.get(node).remove(contextId)); + } + }; + Map lookup = new HashMap<>(); + lookup.put(primaryNode.getId(), primaryNode); + AbstractSearchAsyncAction asyncAction = new AbstractSearchAsyncAction(logger, transportService, lookup::get, + Collections.emptyMap(), null, request, responseListener, shardsIter, 0, 0) { + TestSearchResponse response = new TestSearchResponse(); + + @Override + protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener listener) { + assertTrue("shard: " + request.shardId() + " has been queried twice", response.queried.add(request.shardId())); + TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(contextIdGenerator.incrementAndGet(), node); + Set ids = nodeToContextMap.computeIfAbsent(node, (n) -> new HashSet<>()); + ids.add(testSearchPhaseResult.id); + if (randomBoolean()) { + listener.onResponse(testSearchPhaseResult); + } else { + new Thread(() -> listener.onResponse(testSearchPhaseResult)).start(); + } + } + + @Override + protected void moveToSecondPhase() throws Exception { + for (int i = 0; i < firstResults.length(); i++) { + TestSearchPhaseResult result = firstResults.get(i); + assertEquals(result.node.getId(), result.shardTarget().getNodeId()); + sendReleaseSearchContext(result.id(), result.node); + } + responseListener.onResponse(response); + latch.countDown(); + } + + @Override + protected String firstPhaseName() { + return "test"; + } + + @Override + protected Executor getExecutor() { + fail("no executor in this class"); + return null; + } + }; + asyncAction.start(); + latch.await(); + assertNotNull(response.get()); + assertFalse(nodeToContextMap.isEmpty()); + assertTrue(nodeToContextMap.containsKey(primaryNode)); + assertEquals(shardsIter.size(), numFreedContext.get()); + assertTrue(nodeToContextMap.get(primaryNode).toString(), nodeToContextMap.get(primaryNode).isEmpty()); + + } + + private GroupShardsIterator getShardsIter(String index, int numShards, boolean doReplicas, DiscoveryNode primaryNode, + DiscoveryNode replicaNode) { + ArrayList list = new ArrayList<>(); + for (int i = 0; i < numShards; i++) { + ArrayList started = new ArrayList<>(); + ArrayList initializing = new ArrayList<>(); + ArrayList unassigned = new ArrayList<>(); + + ShardRouting routing = ShardRouting.newUnassigned(new ShardId(new Index(index, "_na_"), i), true, + RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar")); + routing = routing.initialize(primaryNode.getId(), i + "p", 0); + routing.started(); + started.add(routing); + if (doReplicas) { + routing = ShardRouting.newUnassigned(new ShardId(new Index(index, "_na_"), i), false, + RecoverySource.PeerRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar")); + if (replicaNode != null) { + routing = routing.initialize(replicaNode.getId(), i + "r", 0); + if (randomBoolean()) { + routing.started(); + started.add(routing); + } else { + initializing.add(routing); + } + } else { + unassigned.add(routing); // unused yet + } + } + Collections.shuffle(started, random()); + started.addAll(initializing); + list.add(new PlainShardIterator(new ShardId(new Index(index, "_na_"), i), started)); + } + return new GroupShardsIterator(list); + } + + public static class TestSearchResponse extends SearchResponse { + public final Set queried = new HashSet<>(); + } + + public static class TestSearchPhaseResult implements SearchPhaseResult { + final long id; + final DiscoveryNode node; + SearchShardTarget shardTarget; + + public TestSearchPhaseResult(long id, DiscoveryNode node) { + this.id = id; + this.node = node; + } + + @Override + public long id() { + return id; + } + + @Override + public SearchShardTarget shardTarget() { + return this.shardTarget; + } + + @Override + public void shardTarget(SearchShardTarget shardTarget) { + this.shardTarget = shardTarget; + + } + + @Override + public void readFrom(StreamInput in) throws IOException { + + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + + } + } +}