Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public Scorer scorer(LeafReaderContext context) throws IOException {
throw new IOException("search sort :[" + sort.getSort() + "] does not match the index sort:[" + segmentSort + "]");
}
final int afterDoc = after.doc - context.docBase;
TopComparator comparator= getTopComparator(fieldComparators, reverseMuls, context, afterDoc);
TopComparator comparator = getTopComparator(fieldComparators, reverseMuls, context, afterDoc);
final int maxDoc = context.reader().maxDoc();
final int firstDoc = searchAfterDoc(comparator, 0, context.reader().maxDoc());
if (firstDoc >= maxDoc) {
Expand Down Expand Up @@ -143,7 +143,7 @@ static TopComparator getTopComparator(FieldComparator<?>[] fieldComparators,
}
}

if (topDoc <= doc) {
if (doc <= topDoc) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,10 @@

package org.elasticsearch.search.query;

import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.EarlyTerminatingSortingCollector;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MultiCollector;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHitCountCollector;
import org.apache.lucene.search.Weight;
import org.elasticsearch.common.lucene.MinimumScoreCollector;
import org.elasticsearch.common.lucene.search.FilteredCollector;
Expand All @@ -40,14 +35,12 @@
import java.util.Collections;
import java.util.List;
import java.util.function.BooleanSupplier;
import java.util.function.IntSupplier;

import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_CANCELLED;
import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_MIN_SCORE;
import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_MULTI;
import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_POST_FILTER;
import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_TERMINATE_AFTER_COUNT;
import static org.elasticsearch.search.query.TopDocsCollectorContext.shortcutTotalHitCount;

abstract class QueryCollectorContext {
private String profilerName;
Expand All @@ -70,22 +63,12 @@ protected InternalProfileCollector createWithProfiler(InternalProfileCollector i
return new InternalProfileCollector(collector, profilerName, in != null ? Collections.singletonList(in) : Collections.emptyList());
}

/**
* A value of <code>false</code> indicates that the underlying collector can infer
* its results directly from the context (search is not needed).
* Default to true (search is needed).
*/
boolean shouldCollect() {
return true;
}

/**
* Post-process <code>result</code> after search execution.
*
* @param result The query search result to populate
* @param hasCollected True if search was executed
*/
void postProcess(QuerySearchResult result, boolean hasCollected) throws IOException {}
void postProcess(QuerySearchResult result) throws IOException {}

/**
* Creates the collector tree from the provided <code>collectors</code>
Expand Down Expand Up @@ -175,11 +158,6 @@ static QueryCollectorContext createCancellableCollectorContext(BooleanSupplier c
Collector create(Collector in) throws IOException {
return new CancellableCollector(cancelled, in);
}

@Override
boolean shouldCollect() {
return false;
}
};
}

Expand All @@ -198,52 +176,11 @@ Collector create(Collector in) throws IOException {
}

@Override
void postProcess(QuerySearchResult result, boolean hasCollected) throws IOException {
if (hasCollected && collector.terminatedEarly()) {
void postProcess(QuerySearchResult result) throws IOException {
if (collector.terminatedEarly()) {
result.terminatedEarly(true);
}
}
};
}

/**
* Creates a sorting termination collector limiting the collection to the first <code>numHits</code> per segment.
* The total hit count matching the query is also computed if <code>trackTotalHits</code> is true.
*/
static QueryCollectorContext createEarlySortingTerminationCollectorContext(IndexReader reader,
Query query,
Sort indexSort,
int numHits,
boolean trackTotalHits,
boolean shouldCollect) {
return new QueryCollectorContext(REASON_SEARCH_TERMINATE_AFTER_COUNT) {
private IntSupplier countSupplier = null;

@Override
Collector create(Collector in) throws IOException {
EarlyTerminatingSortingCollector sortingCollector = new EarlyTerminatingSortingCollector(in, indexSort, numHits);
Collector collector = sortingCollector;
if (trackTotalHits) {
int count = shouldCollect ? -1 : shortcutTotalHitCount(reader, query);
if (count == -1) {
TotalHitCountCollector countCollector = new TotalHitCountCollector();
collector = MultiCollector.wrap(sortingCollector, countCollector);
this.countSupplier = countCollector::getTotalHits;
} else {
this.countSupplier = () -> count;
}
}
return collector;
}

@Override
void postProcess(QuerySearchResult result, boolean hasCollected) throws IOException {
if (countSupplier != null) {
final TopDocs topDocs = result.topDocs();
topDocs.totalHits = countSupplier.getAsInt();
result.topDocs(topDocs, result.sortValueFormats());
}
}
};
}
}
66 changes: 33 additions & 33 deletions core/src/main/java/org/elasticsearch/search/query/QueryPhase.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.search.query;

import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.queries.MinDocQuery;
import org.apache.lucene.queries.SearchAfterSortedDocQuery;
import org.apache.lucene.search.BooleanClause;
Expand All @@ -36,7 +37,6 @@
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.util.Counter;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
Expand All @@ -61,7 +61,6 @@
import java.util.function.Consumer;

import static org.elasticsearch.search.query.QueryCollectorContext.createCancellableCollectorContext;
import static org.elasticsearch.search.query.QueryCollectorContext.createEarlySortingTerminationCollectorContext;
import static org.elasticsearch.search.query.QueryCollectorContext.createEarlyTerminationCollectorContext;
import static org.elasticsearch.search.query.QueryCollectorContext.createFilteredCollectorContext;
import static org.elasticsearch.search.query.QueryCollectorContext.createMinScoreCollectorContext;
Expand Down Expand Up @@ -104,10 +103,8 @@ public void execute(SearchContext searchContext) throws QueryPhaseExecutionExcep
// request, preProcess is called on the DFS phase phase, this is why we pre-process them
// here to make sure it happens during the QUERY phase
aggregationPhase.preProcess(searchContext);
Sort indexSort = searchContext.mapperService().getIndexSettings().getIndexSortConfig()
.buildIndexSort(searchContext.mapperService()::fullName, searchContext::getForField);
final ContextIndexSearcher searcher = searchContext.searcher();
boolean rescore = execute(searchContext, searchContext.searcher(), searcher::setCheckCancelled, indexSort);
boolean rescore = execute(searchContext, searchContext.searcher(), searcher::setCheckCancelled);

if (rescore) { // only if we do a regular search
rescorePhase.execute(searchContext);
Expand All @@ -127,11 +124,12 @@ public void execute(SearchContext searchContext) throws QueryPhaseExecutionExcep
* wire everything (mapperService, etc.)
* @return whether the rescoring phase should be executed
*/
static boolean execute(SearchContext searchContext, final IndexSearcher searcher,
Consumer<Runnable> checkCancellationSetter, @Nullable Sort indexSort) throws QueryPhaseExecutionException {
static boolean execute(SearchContext searchContext,
final IndexSearcher searcher,
Consumer<Runnable> checkCancellationSetter) throws QueryPhaseExecutionException {
final IndexReader reader = searcher.getIndexReader();
QuerySearchResult queryResult = searchContext.queryResult();
queryResult.searchTimedOut(false);

try {
queryResult.from(searchContext.from());
queryResult.size(searchContext.size());
Expand Down Expand Up @@ -161,7 +159,7 @@ static boolean execute(SearchContext searchContext, final IndexSearcher searcher
// ... and stop collecting after ${size} matches
searchContext.terminateAfter(searchContext.size());
searchContext.trackTotalHits(false);
} else if (canEarlyTerminate(indexSort, searchContext)) {
} else if (canEarlyTerminate(reader, searchContext.sort())) {
// now this gets interesting: since the search sort is a prefix of the index sort, we can directly
// skip to the desired doc
if (after != null) {
Expand All @@ -177,10 +175,14 @@ static boolean execute(SearchContext searchContext, final IndexSearcher searcher
}

final LinkedList<QueryCollectorContext> collectors = new LinkedList<>();
// whether the chain contains a collector that filters documents
boolean hasFilterCollector = false;
if (searchContext.parsedPostFilter() != null) {
// add post filters before aggregations
// it will only be applied to top hits
collectors.add(createFilteredCollectorContext(searcher, searchContext.parsedPostFilter().query()));
// this collector can filter documents during the collection
hasFilterCollector = true;
}
if (searchContext.queryCollectors().isEmpty() == false) {
// plug in additional collectors, like aggregations
Expand All @@ -189,10 +191,14 @@ static boolean execute(SearchContext searchContext, final IndexSearcher searcher
if (searchContext.minimumScore() != null) {
// apply the minimum score after multi collector so we filter aggs as well
collectors.add(createMinScoreCollectorContext(searchContext.minimumScore()));
// this collector can filter documents during the collection
hasFilterCollector = true;
}
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER) {
// apply terminate after after all filters collectors
collectors.add(createEarlyTerminationCollectorContext(searchContext.terminateAfter()));
// this collector can filter documents during the collection
hasFilterCollector = true;
}

boolean timeoutSet = scrollContext == null && searchContext.timeout() != null &&
Expand Down Expand Up @@ -240,21 +246,9 @@ static boolean execute(SearchContext searchContext, final IndexSearcher searcher
// searchContext.lowLevelCancellation()
collectors.add(createCancellableCollectorContext(searchContext.getTask()::isCancelled));

final IndexReader reader = searcher.getIndexReader();
final boolean doProfile = searchContext.getProfilers() != null;
// create the top docs collector last when the other collectors are known
final TopDocsCollectorContext topDocsFactory = createTopDocsCollectorContext(searchContext, reader,
collectors.stream().anyMatch(QueryCollectorContext::shouldCollect));
final boolean shouldCollect = topDocsFactory.shouldCollect();

if (topDocsFactory.numHits() > 0 &&
(scrollContext == null || scrollContext.totalHits != -1) &&
canEarlyTerminate(indexSort, searchContext)) {
// top docs collection can be early terminated based on index sort
// add the collector context first so we don't early terminate aggs but only top docs
collectors.addFirst(createEarlySortingTerminationCollectorContext(reader, searchContext.query(), indexSort,
topDocsFactory.numHits(), searchContext.trackTotalHits(), shouldCollect));
}
final TopDocsCollectorContext topDocsFactory = createTopDocsCollectorContext(searchContext, reader, hasFilterCollector);
// add the top docs collector, the first collector context in the chain
collectors.addFirst(topDocsFactory);

Expand All @@ -268,9 +262,7 @@ static boolean execute(SearchContext searchContext, final IndexSearcher searcher
}

try {
if (shouldCollect) {
searcher.search(query, queryCollector);
}
searcher.search(query, queryCollector);
} catch (TimeExceededException e) {
assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
queryResult.searchTimedOut(true);
Expand All @@ -280,7 +272,7 @@ static boolean execute(SearchContext searchContext, final IndexSearcher searcher

final QuerySearchResult result = searchContext.queryResult();
for (QueryCollectorContext ctx : collectors) {
ctx.postProcess(result, shouldCollect);
ctx.postProcess(result);
}
EsThreadPoolExecutor executor = (EsThreadPoolExecutor)
searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);
Expand Down Expand Up @@ -317,13 +309,21 @@ static boolean returnsDocsInOrder(Query query, SortAndFormats sf) {
}

/**
* Returns true if the provided <code>searchContext</code> can early terminate based on <code>indexSort</code>
* @param indexSort The index sort specification
* @param context The search context for the request
*/
static boolean canEarlyTerminate(Sort indexSort, SearchContext context) {
final Sort sort = context.sort() == null ? Sort.RELEVANCE : context.sort().sort;
return indexSort != null && EarlyTerminatingSortingCollector.canEarlyTerminate(sort, indexSort);
* Returns whether collection within the provided <code>reader</code> can be early-terminated if it sorts
* with <code>sortAndFormats</code>.
**/
static boolean canEarlyTerminate(IndexReader reader, SortAndFormats sortAndFormats) {
if (sortAndFormats == null || sortAndFormats.sort == null) {
return false;
}
final Sort sort = sortAndFormats.sort;
for (LeafReaderContext ctx : reader.leaves()) {
Sort indexSort = ctx.reader().getMetaData().getSort();
if (indexSort == null || EarlyTerminatingSortingCollector.canEarlyTerminate(sort, indexSort) == false) {
return false;
}
}
return true;
}

private static class TimeExceededException extends RuntimeException {}
Expand Down
Loading