Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.FetchSearchResultProvider;
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.InternalSearchResponse;
Expand All @@ -51,7 +50,6 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
Expand Down Expand Up @@ -137,7 +135,6 @@ void performInitialPhase(final int shardIndex, final ShardIterator shardIt, fina
final Transport.Connection connection = nodeIdToConnection.apply(shard.currentNodeId());
AliasFilter filter = this.aliasFilter.get(shard.index().getUUID());
assert filter != null;

float indexBoost = concreteIndexBoosts.getOrDefault(shard.index().getUUID(), DEFAULT_INDEX_BOOST);
ShardSearchTransportRequest transportRequest = new ShardSearchTransportRequest(request, shardIt.shardId(), shardsIts.size(),
filter, indexBoost, startTime());
Expand Down Expand Up @@ -440,39 +437,45 @@ final class FetchPhase implements CheckedRunnable<Exception> {
public void run() throws Exception {
final boolean isScrollRequest = request.scroll() != null;
ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, queryResults);
final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(), sortedShardDocs);
final IntConsumer finishPhase = successOpts
-> sendResponseAsync("fetch", searchPhaseController, sortedShardDocs, queryResults, fetchResults);
if (sortedShardDocs.length == 0) { // no docs to fetch -- sidestep everything and return
queryResults.asList().stream()
.map(e -> e.value.queryResult())
.forEach(this::releaseIrrelevantSearchContext); // we have to release contexts here to free up resources
finishPhase.accept(successfulOps.get());
if (queryResults.length() == 1) {
assert queryResults.get(0) == null || queryResults.get(0).fetchResult() != null;
// query AND fetch optimization
sendResponseAsync("fetch", searchPhaseController, sortedShardDocs, queryResults, queryResults);
} else {
final ScoreDoc[] lastEmittedDocPerShard = isScrollRequest ?
searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(), sortedShardDocs, queryResults.length())
: null;
final CountedCollector<FetchSearchResult> counter = new CountedCollector<>(fetchResults,
docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not
finishPhase);
for (int i = 0; i < docIdsToLoad.length; i++) {
IntArrayList entry = docIdsToLoad[i];
QuerySearchResultProvider queryResult = queryResults.get(i);
if (entry == null) { // no results for this shard ID
if (queryResult != null) {
// if we got some hits from this shard we have to release the context there
// we do this as we go since it will free up resources and passing on the request on the
// transport layer is cheap.
releaseIrrelevantSearchContext(queryResult.queryResult());
final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(), sortedShardDocs);
final IntConsumer finishPhase = successOpts
-> sendResponseAsync("fetch", searchPhaseController, sortedShardDocs, queryResults, fetchResults);
if (sortedShardDocs.length == 0) { // no docs to fetch -- sidestep everything and return
queryResults.asList().stream()
.map(e -> e.value.queryResult())
.forEach(this::releaseIrrelevantSearchContext); // we have to release contexts here to free up resources
finishPhase.accept(successfulOps.get());
} else {
final ScoreDoc[] lastEmittedDocPerShard = isScrollRequest ?
searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(), sortedShardDocs, queryResults.length())
: null;
final CountedCollector<FetchSearchResult> counter = new CountedCollector<>(fetchResults,
docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not
finishPhase);
for (int i = 0; i < docIdsToLoad.length; i++) {
IntArrayList entry = docIdsToLoad[i];
QuerySearchResultProvider queryResult = queryResults.get(i);
if (entry == null) { // no results for this shard ID
if (queryResult != null) {
// if we got some hits from this shard we have to release the context there
// we do this as we go since it will free up resources and passing on the request on the
// transport layer is cheap.
releaseIrrelevantSearchContext(queryResult.queryResult());
}
// in any case we count down this result since we don't talk to this shard anymore
counter.countDown();
} else {
Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId());
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), i, entry,
lastEmittedDocPerShard);
executeFetch(i, queryResult.shardTarget(), counter, fetchSearchRequest, queryResult.queryResult(),
connection);
}
// in any case we count down this result since we don't talk to this shard anymore
counter.countDown();
} else {
Transport.Connection connection = nodeIdToConnection.apply(queryResult.shardTarget().getNodeId());
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), i, entry,
lastEmittedDocPerShard);
executeFetch(i, queryResult.shardTarget(), counter, fetchSearchRequest, queryResult.queryResult(),
connection);
}
}
}
Expand Down Expand Up @@ -529,16 +532,14 @@ private void releaseIrrelevantSearchContext(QuerySearchResult queryResult) {
*/
final void sendResponseAsync(String phase, SearchPhaseController searchPhaseController, ScoreDoc[] sortedDocs,
AtomicArray<? extends QuerySearchResultProvider> queryResultsArr,
AtomicArray<? extends FetchSearchResultProvider> fetchResultsArr) {
AtomicArray<? extends QuerySearchResultProvider> fetchResultsArr) {
getExecutor().execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
final boolean isScrollRequest = request.scroll() != null;
final ScoreDoc[] theScoreDocs = sortedDocs == null ? searchPhaseController.sortDocs(isScrollRequest, queryResultsArr)
: sortedDocs;
final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, theScoreDocs, queryResultsArr,
final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedDocs, queryResultsArr,
fetchResultsArr);
String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), queryResultsArr) : null;
String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(queryResultsArr) : null;
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(),
buildTookInMillis(), buildShardFailures()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,11 @@
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldDocs;
import org.apache.lucene.search.grouping.CollapseTopFieldDocs;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.IntsRef;
import org.elasticsearch.common.collect.HppcMaps;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.ArrayUtils;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.IntArray;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.InternalAggregation;
Expand All @@ -49,7 +45,6 @@
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.FetchSearchResultProvider;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.search.internal.InternalSearchResponse;
Expand All @@ -68,7 +63,6 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -391,10 +385,10 @@ public IntArrayList[] fillDocIdsToLoad(int numShards, ScoreDoc[] shardDocs) {
*/
public InternalSearchResponse merge(boolean ignoreFrom, ScoreDoc[] sortedDocs,
AtomicArray<? extends QuerySearchResultProvider> queryResultsArr,
AtomicArray<? extends FetchSearchResultProvider> fetchResultsArr) {
AtomicArray<? extends QuerySearchResultProvider> fetchResultsArr) {

List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults = queryResultsArr.asList();
List<? extends AtomicArray.Entry<? extends FetchSearchResultProvider>> fetchResults = fetchResultsArr.asList();
List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> fetchResults = fetchResultsArr.asList();

if (queryResults.isEmpty()) {
return InternalSearchResponse.empty();
Expand Down Expand Up @@ -448,7 +442,7 @@ public InternalSearchResponse merge(boolean ignoreFrom, ScoreDoc[] sortedDocs,
}

// clean the fetch counter
for (AtomicArray.Entry<? extends FetchSearchResultProvider> entry : fetchResults) {
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : fetchResults) {
entry.value.fetchResult().initCounter();
}
int from = ignoreFrom ? 0 : firstResult.queryResult().from();
Expand All @@ -460,7 +454,7 @@ public InternalSearchResponse merge(boolean ignoreFrom, ScoreDoc[] sortedDocs,
if (!fetchResults.isEmpty()) {
for (int i = 0; i < numSearchHits; i++) {
ScoreDoc shardDoc = sortedDocs[i];
FetchSearchResultProvider fetchResultProvider = fetchResultsArr.get(shardDoc.shardIndex);
QuerySearchResultProvider fetchResultProvider = fetchResultsArr.get(shardDoc.shardIndex);
if (fetchResultProvider == null) {
continue;
}
Expand Down Expand Up @@ -503,11 +497,11 @@ public InternalSearchResponse merge(boolean ignoreFrom, ScoreDoc[] sortedDocs,
final List<CompletionSuggestion.Entry.Option> suggestionOptions = suggestion.getOptions();
for (int scoreDocIndex = currentOffset; scoreDocIndex < currentOffset + suggestionOptions.size(); scoreDocIndex++) {
ScoreDoc shardDoc = sortedDocs[scoreDocIndex];
FetchSearchResultProvider fetchSearchResultProvider = fetchResultsArr.get(shardDoc.shardIndex);
if (fetchSearchResultProvider == null) {
QuerySearchResultProvider searchResultProvider = fetchResultsArr.get(shardDoc.shardIndex);
if (searchResultProvider == null) {
continue;
}
FetchSearchResult fetchResult = fetchSearchResultProvider.fetchResult();
FetchSearchResult fetchResult = searchResultProvider.fetchResult();
int fetchResultIndex = fetchResult.counterGetAndIncrement();
if (fetchResultIndex < fetchResult.hits().internalHits().length) {
InternalSearchHit hit = fetchResult.hits().internalHits()[fetchResultIndex];
Expand Down Expand Up @@ -569,7 +563,7 @@ public InternalSearchResponse merge(boolean ignoreFrom, ScoreDoc[] sortedDocs,
private static int topN(List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults) {
QuerySearchResultProvider firstResult = queryResults.get(0).value;
int topN = firstResult.queryResult().size();
if (firstResult.includeFetch()) {
if (firstResult.fetchResult() != null) {
// if we did both query and fetch on the same go, we have fetched all the docs from each shards already, use them...
// this is also important since we shortcut and fetch only docs from "from" and up to "size"
topN *= queryResults.size();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<Qu
SearchTask task) {
super(logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, searchPhaseController, executor,
request, listener, shardsIts, startTime, clusterStateVersion, task);

}

@Override
Expand Down
Loading