From 826e2fc0455878d1f50f1d741b56087cc4b60a39 Mon Sep 17 00:00:00 2001 From: jimczi Date: Tue, 24 Nov 2020 18:20:50 +0100 Subject: [PATCH 1/3] Automatic tie-breaking for sorted queries within a PIT This change generates a tiebreaker automatically for sorted queries that are executed under a PIT (point in time reader). This allows to paginate consistently over the matching documents without requiring to provide a sort criteria that is unique per document. The tiebreaker is automatically added as the last sort values of the search hits in the response. It is then used by `search_after` to ensure that pagination will not miss any documents and that each document will appear only once. This commit also allows queries sorted by internal Lucene id (`_doc`) to be optimized if they are executed under a PIT the same way than scroll queries. Closes #56828 --- .../paginate-search-results.asciidoc | 27 ++++---- .../search/AbstractSearchAsyncAction.java | 28 +++++---- .../action/search/SearchPhaseController.java | 43 ++++++++++--- .../action/search/SearchShardIterator.java | 13 ++++ .../elasticsearch/search/SearchService.java | 11 +++- .../search/internal/ShardSearchRequest.java | 40 +++++++++--- .../search/query/QueryPhase.java | 42 ++++++------- .../search/query/QuerySearchResult.java | 4 ++ .../searchafter/SearchAfterBuilder.java | 42 +++++++++++++ .../AbstractSearchAsyncActionTests.java | 3 + .../search/query/QueryPhaseTests.java | 63 ++++++++++++------- .../searchafter/SearchAfterBuilderTests.java | 42 +++++++++++++ .../xpack/core/search/PointInTimeIT.java | 56 +++++++++++++++++ 13 files changed, 324 insertions(+), 90 deletions(-) diff --git a/docs/reference/search/search-your-data/paginate-search-results.asciidoc b/docs/reference/search/search-your-data/paginate-search-results.asciidoc index f036d04fb00be..f4705470e01be 100644 --- a/docs/reference/search/search-your-data/paginate-search-results.asciidoc +++ b/docs/reference/search/search-your-data/paginate-search-results.asciidoc @@ -71,9 +71,9 @@ To get the first page of results, submit a search request with a `sort` argument. If using a PIT, specify the PIT ID in the `pit.id` parameter and omit the target data stream or index from the request path. -IMPORTANT: We recommend you include a tiebreaker field in your `sort`. This -tiebreaker field should contain a unique value for each document. If you don't -include a tiebreaker field, your paged results could miss or duplicate hits. +NOTE: Search after requests have optimizations that make them faster when the sort +order is `_doc` and total hits are not tracked. If you want to iterate over all documents regardless of the +order, this is the most efficient option. [source,console] ---- @@ -90,8 +90,7 @@ GET /_search "keep_alive": "1m" }, "sort": [ <2> - {"@timestamp": "asc"}, - {"tie_breaker_id": "asc"} + {"@timestamp": "asc"} ] } ---- @@ -101,7 +100,9 @@ GET /_search <2> Sorts hits for the search. The search response includes an array of `sort` values for each hit. If you used -a PIT, the response's `pit_id` parameter contains an updated PIT ID. +a PIT, the response's `pit_id` parameter contains an updated PIT ID and a tiebreaker +is included as the last `sort` values for each hit. This tiebreaker is a unique value for each document that allows +consistent pagination within a `pit_id`. [source,console-result] ---- @@ -122,7 +123,7 @@ a PIT, the response's `pit_id` parameter contains an updated PIT ID. "_source" : ..., "sort" : [ <2> 4098435132000, - "FaslK3QBySSL_rrj9zM5" + 4294967298 <3> ] } ] @@ -133,9 +134,10 @@ a PIT, the response's `pit_id` parameter contains an updated PIT ID. <1> Updated `id` for the point in time. <2> Sort values for the last returned hit. +<3> The tiebreaker value, unique per document within the `pit_id`. To get the next page of results, rerun the previous search using the last hit's -sort values as the `search_after` argument. If using a PIT, use the latest PIT +sort values (including the tiebreaker) as the `search_after` argument. If using a PIT, use the latest PIT ID in the `pit.id` parameter. The search's `query` and `sort` arguments must remain unchanged. If provided, the `from` argument must be `0` (default) or `-1`. @@ -154,19 +156,20 @@ GET /_search "keep_alive": "1m" }, "sort": [ - {"@timestamp": "asc"}, - {"tie_breaker_id": "asc"} + {"@timestamp": "asc"} ], "search_after": [ <2> 4098435132000, - "FaslK3QBySSL_rrj9zM5" - ] + 4294967298 + ], + "track_total_hits": false <3> } ---- // TEST[catch:missing] <1> PIT ID returned by the previous search. <2> Sort values from the previous search's last hit. +<3> Disable the tracking of total hits to speed up pagination. You can repeat this process to get additional pages of results. If using a PIT, you can extend the PIT's retention period using the diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 1adc37ac6c1b5..b74ef2a8e0038 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -212,8 +212,9 @@ public final void run() { } for (int index = 0; index < shardsIts.size(); index++) { final SearchShardIterator shardRoutings = shardsIts.get(index); + shardRoutings.setShardIndex(index); assert shardRoutings.skip() == false; - performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull()); + performPhaseOnShard(shardRoutings, shardRoutings.nextOrNull()); } } } @@ -225,7 +226,7 @@ void skipShard(SearchShardIterator iterator) { successfulShardExecution(iterator); } - private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) { + private void performPhaseOnShard(final SearchShardIterator shardIt, final SearchShardTarget shard) { /* * We capture the thread that this phase is starting on. When we are called back after executing the phase, we are either on the * same thread (because we never went async, or the same thread was selected from the thread pool) or a different thread. If we @@ -236,7 +237,7 @@ private void performPhaseOnShard(final int shardIndex, final SearchShardIterator if (shard == null) { SearchShardTarget unassignedShard = new SearchShardTarget(null, shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()); - fork(() -> onShardFailure(shardIndex, unassignedShard, shardIt, new NoShardAvailableActionException(shardIt.shardId()))); + fork(() -> onShardFailure(unassignedShard, shardIt, new NoShardAvailableActionException(shardIt.shardId()))); } else { final PendingExecutions pendingExecutions = throttleConcurrentRequests ? pendingExecutionsPerNode.computeIfAbsent(shard.getNodeId(), n -> new PendingExecutions(maxConcurrentRequestsPerNode)) @@ -245,7 +246,7 @@ private void performPhaseOnShard(final int shardIndex, final SearchShardIterator final Thread thread = Thread.currentThread(); try { executePhaseOnShard(shardIt, shard, - new SearchActionListener(shard, shardIndex) { + new SearchActionListener(shard, shardIt.getShardIndex()) { @Override public void innerOnResponse(Result result) { try { @@ -258,7 +259,7 @@ public void innerOnResponse(Result result) { @Override public void onFailure(Exception t) { try { - onShardFailure(shardIndex, shard, shardIt, t); + onShardFailure(shard, shardIt, t); } finally { executeNext(pendingExecutions, thread); } @@ -270,7 +271,7 @@ public void onFailure(Exception t) { * It is possible to run into connection exceptions here because we are getting the connection early and might * run into nodes that are not connected. In this case, on shard failure will move us to the next shard copy. */ - fork(() -> onShardFailure(shardIndex, shard, shardIt, e)); + fork(() -> onShardFailure(shard, shardIt, e)); } finally { executeNext(pendingExecutions, thread); } @@ -387,10 +388,10 @@ ShardSearchFailure[] buildShardFailures() { return failures; } - private void onShardFailure(final int shardIndex, SearchShardTarget shard, final SearchShardIterator shardIt, Exception e) { + private void onShardFailure(SearchShardTarget shard, final SearchShardIterator shardIt, Exception e) { // we always add the shard failure for a specific shard instance // we do make sure to clean it on a successful response from a shard - onShardFailure(shardIndex, shard, e); + onShardFailure(shardIt.getShardIndex(), shard, e); final SearchShardTarget nextShard = shardIt.nextOrNull(); final boolean lastShard = nextShard == null; logger.debug(() -> new ParameterizedMessage("{}: Failed to execute [{}] lastShard [{}]", shard, request, lastShard), e); @@ -404,7 +405,7 @@ private void onShardFailure(final int shardIndex, SearchShardTarget shard, final } } } - onShardGroupFailure(shardIndex, shard, e); + onShardGroupFailure(shardIt.getShardIndex(), shard, e); } final int totalOps = this.totalOps.incrementAndGet(); if (totalOps == expectedTotalOps) { @@ -414,7 +415,7 @@ private void onShardFailure(final int shardIndex, SearchShardTarget shard, final new SearchPhaseExecutionException(getName(), "Shard failures", null, buildShardFailures())); } else { if (lastShard == false) { - performPhaseOnShard(shardIndex, shardIt, nextShard); + performPhaseOnShard(shardIt, nextShard); } } } @@ -487,6 +488,7 @@ private static boolean isTaskCancelledException(Exception e) { */ protected void onShardResult(Result result, SearchShardIterator shardIt) { assert result.getShardIndex() != -1 : "shard index is not set"; + assert result.getShardIndex() == shardIt.getShardIndex() : "shard index is different"; assert result.getSearchShardTarget() != null : "search shard target must not be null"; hasShardResponse.set(true); if (logger.isTraceEnabled()) { @@ -656,19 +658,19 @@ public final ShardSearchRequest buildShardSearchRequest(SearchShardIterator shar String indexName = shardIt.shardId().getIndex().getName(); final String[] routings = indexRoutings.getOrDefault(indexName, Collections.emptySet()) .toArray(new String[0]); - ShardSearchRequest shardRequest = new ShardSearchRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(), - filter, indexBoost, timeProvider.getAbsoluteStartMillis(), shardIt.getClusterAlias(), routings, + ShardSearchRequest shardRequest = new ShardSearchRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), + getNumShards(), filter, indexBoost, timeProvider.getAbsoluteStartMillis(), shardIt.getClusterAlias(), routings, shardIt.getSearchContextId(), shardIt.getSearchContextKeepAlive()); // if we already received a search result we can inform the shard that it // can return a null response if the request rewrites to match none rather // than creating an empty response in the search thread pool. // Note that, we have to disable this shortcut for queries that create a context (scroll and search context). shardRequest.canReturnNullResponseIfMatchNoDocs(hasShardResponse.get() && shardRequest.scroll() == null); + shardRequest.setShardIndex(shardIt.getShardIndex()); return shardRequest; } /** - * Returns the next phase based on the results of the initial search phase * @param results the results of the initial search phase. Each non null element in the result array represent a successfully * executed shard request * @param context the search context for the next phase diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index a6c5964c9470c..a7e64d27a7159 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -55,6 +55,7 @@ import org.elasticsearch.search.profile.ProfileShardResult; import org.elasticsearch.search.profile.SearchProfileShardResults; import org.elasticsearch.search.query.QuerySearchResult; +import org.elasticsearch.search.searchafter.SearchAfterBuilder; import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.search.suggest.Suggest.Suggestion; import org.elasticsearch.search.suggest.completion.CompletionSuggestion; @@ -349,7 +350,11 @@ private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFr searchHit.shard(fetchResult.getSearchShardTarget()); if (sortedTopDocs.isSortedByField) { FieldDoc fieldDoc = (FieldDoc) shardDoc; - searchHit.sortValues(fieldDoc.fields, reducedQueryPhase.sortValueFormats); + if (reducedQueryPhase.hasPIT) { + addSortValuesTie(searchHit, fieldDoc, reducedQueryPhase.sortValueFormats); + } else { + searchHit.sortValues(fieldDoc.fields, reducedQueryPhase.sortValueFormats); + } if (sortScoreIndex != -1) { searchHit.score(((Number) fieldDoc.fields[sortScoreIndex]).floatValue()); } @@ -363,6 +368,16 @@ private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFr reducedQueryPhase.maxScore, sortedTopDocs.sortFields, sortedTopDocs.collapseField, sortedTopDocs.collapseValues); } + private void addSortValuesTie(SearchHit searchHit, FieldDoc fieldDoc, DocValueFormat[] sortValueFormats) { + Object[] newFields = new Object[fieldDoc.fields.length+1]; + DocValueFormat[] dvFormats = new DocValueFormat[newFields.length]; + System.arraycopy(fieldDoc.fields, 0, newFields, 0, fieldDoc.fields.length); + System.arraycopy(sortValueFormats, 0, dvFormats, 0, fieldDoc.fields.length); + newFields[newFields.length-1] = SearchAfterBuilder.createTiebreaker(fieldDoc); + dvFormats[newFields.length-1] = DocValueFormat.RAW; + searchHit.sortValues(newFields, dvFormats); + } + /** * Reduces the given query results and consumes all aggregations and profile results. * @param queryResults a list of non-null query shard results @@ -416,7 +431,7 @@ ReducedQueryPhase reducedQueryPhase(Collection quer if (queryResults.isEmpty()) { // early terminate we have nothing to reduce final TotalHits totalHits = topDocsStats.getTotalHits(); return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.getMaxScore(), - false, null, null, null, null, SortedTopDocs.EMPTY, null, numReducePhases, 0, 0, true); + false, null, null, null, null, SortedTopDocs.EMPTY, null, numReducePhases, 0, 0, true, false); } int total = queryResults.size(); queryResults = queryResults.stream() @@ -477,7 +492,7 @@ ReducedQueryPhase reducedQueryPhase(Collection quer final TotalHits totalHits = topDocsStats.getTotalHits(); return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.getMaxScore(), topDocsStats.timedOut, topDocsStats.terminatedEarly, reducedSuggest, aggregations, shardResults, sortedTopDocs, - firstResult.sortValueFormats(), numReducePhases, size, from, false); + firstResult.sortValueFormats(), numReducePhases, size, from, false, firstResult.hasPIT()); } private static InternalAggregations reduceAggs(InternalAggregation.ReduceContextBuilder aggReduceContextBuilder, @@ -558,10 +573,23 @@ public static final class ReducedQueryPhase { final int from; // sort value formats used to sort / format the result final DocValueFormat[] sortValueFormats; - - ReducedQueryPhase(TotalHits totalHits, long fetchHits, float maxScore, boolean timedOut, Boolean terminatedEarly, Suggest suggest, - InternalAggregations aggregations, SearchProfileShardResults shardResults, SortedTopDocs sortedTopDocs, - DocValueFormat[] sortValueFormats, int numReducePhases, int size, int from, boolean isEmptyResult) { + // true if the search request uses a point in time reader + final boolean hasPIT; + + ReducedQueryPhase(TotalHits totalHits, + long fetchHits, + float maxScore, + boolean timedOut, + Boolean terminatedEarly, + Suggest suggest, + InternalAggregations aggregations, + SearchProfileShardResults shardResults, + SortedTopDocs sortedTopDocs, + DocValueFormat[] sortValueFormats, + int numReducePhases, + int size, int from, + boolean isEmptyResult, + boolean hasPIT) { if (numReducePhases <= 0) { throw new IllegalArgumentException("at least one reduce phase must have been applied but was: " + numReducePhases); } @@ -579,6 +607,7 @@ public static final class ReducedQueryPhase { this.from = from; this.isEmptyResult = isEmptyResult; this.sortValueFormats = sortValueFormats; + this.hasPIT = hasPIT; } /** diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java b/server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java index 36a9b75199402..a45f7340b8f10 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java @@ -52,6 +52,8 @@ public final class SearchShardIterator implements Comparable targetNodesIterator; + private int searchShardIndex; + /** * Creates a {@link PlainShardIterator} instance that iterates over a subset of the given shards * this the a given shardId. @@ -78,6 +80,17 @@ public SearchShardIterator(@Nullable String clusterAlias, ShardId shardId, assert searchContextKeepAlive == null || searchContextId != null; } + void setShardIndex(int shardIndex) { + this.searchShardIndex = shardIndex; + } + + /** + * Returns the shard index that is used to tiebreak identical sort values coming from different shards. + */ + int getShardIndex() { + return searchShardIndex; + } + /** * Returns the original indices associated with this shard iterator, specifically with the cluster that this shard belongs to. */ diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index f976a05ba510e..998287e14286a 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -728,7 +728,7 @@ final SearchContext createContext(ReaderContext readerContext, if (request.scroll() != null) { context.scrollContext().scroll = request.scroll(); } - parseSource(context, request.source(), includeAggregations); + parseSource(context, request, includeAggregations); // if the from and size are still not set, default them if (context.from() == -1) { @@ -878,7 +878,8 @@ private void processFailure(ReaderContext context, Exception exc) { } } - private void parseSource(DefaultSearchContext context, SearchSourceBuilder source, boolean includeAggregations) { + private void parseSource(DefaultSearchContext context, ShardSearchRequest request, boolean includeAggregations) { + final SearchSourceBuilder source = request.source(); // nothing to parse... if (source == null) { return; @@ -1022,7 +1023,11 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc if (context.from() > 0) { throw new SearchException(shardTarget, "`from` parameter must be set to 0 when `search_after` is used."); } - FieldDoc fieldDoc = SearchAfterBuilder.buildFieldDoc(context.sort(), source.searchAfter()); + + + FieldDoc fieldDoc = source.pointInTimeBuilder() == null ? + SearchAfterBuilder.buildFieldDoc(context.sort(), source.searchAfter()) : + SearchAfterBuilder.buildFieldDocWithPIT(request.getShardIndex(), context.sort(), source.searchAfter()); context.searchAfter(fieldDoc); } diff --git a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java index af2e46a65dffb..5b380a17081b5 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java @@ -83,14 +83,17 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque private final String preference; private final OriginalIndices originalIndices; - private boolean canReturnNullResponseIfMatchNoDocs; - private SearchSortValuesAndFormats bottomSortValues; + private final ShardSearchContextId readerId; + private final TimeValue keepAlive; //these are the only mutable fields, as they are subject to rewriting private AliasFilter aliasFilter; private SearchSourceBuilder source; - private final ShardSearchContextId readerId; - private final TimeValue keepAlive; + + private boolean canReturnNullResponseIfMatchNoDocs; + private SearchSortValuesAndFormats bottomSortValues; + + private int shardIndex = -1; public ShardSearchRequest(OriginalIndices originalIndices, SearchRequest searchRequest, @@ -213,6 +216,7 @@ public ShardSearchRequest(StreamInput in) throws IOException { readerId = null; keepAlive = null; } + shardIndex = in.getVersion().onOrAfter(Version.V_8_0_0) ? in.readVInt() : -1; originalIndices = OriginalIndices.readOriginalIndices(in); assert keepAlive == null || readerId != null : "readerId: " + readerId + " keepAlive: " + keepAlive; } @@ -236,6 +240,7 @@ public ShardSearchRequest(ShardSearchRequest clone) { this.originalIndices = clone.originalIndices; this.readerId = clone.readerId; this.keepAlive = clone.keepAlive; + this.shardIndex = clone.shardIndex; } @Override @@ -268,12 +273,15 @@ protected final void innerWriteTo(StreamOutput out, boolean asKey) throws IOExce if (asKey == false) { out.writeStringArray(indexRoutings); out.writeOptionalString(preference); - } - if (asKey == false && out.getVersion().onOrAfter(Version.V_7_7_0)) { - out.writeBoolean(canReturnNullResponseIfMatchNoDocs); - out.writeOptionalWriteable(bottomSortValues); - out.writeOptionalWriteable(readerId); - out.writeOptionalTimeValue(keepAlive); + if (out.getVersion().onOrAfter(Version.V_7_7_0)) { + out.writeBoolean(canReturnNullResponseIfMatchNoDocs); + out.writeOptionalWriteable(bottomSortValues); + out.writeOptionalWriteable(readerId); + out.writeOptionalTimeValue(keepAlive); + } + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeVInt(shardIndex); + } } } @@ -293,6 +301,18 @@ public IndicesOptions indicesOptions() { return originalIndices.indicesOptions(); } + public void setShardIndex(int shardIndex) { + this.shardIndex = shardIndex; + } + + /** + * Returns the shard index that is used to tiebreak identical sort values coming from different shards + * or -1 if unknown. + */ + public int getShardIndex() { + return shardIndex; + } + public ShardId shardId() { return shardId; } diff --git a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java index e226aed411e9f..6a62c5de59b87 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -179,35 +179,29 @@ static boolean executeInternal(SearchContext searchContext) throws QueryPhaseExe assert query == searcher.rewrite(query); // already rewritten final ScrollContext scrollContext = searchContext.scrollContext(); - if (scrollContext != null) { - if (scrollContext.totalHits == null) { - // first round - assert scrollContext.lastEmittedDoc == null; - // there is not much that we can optimize here since we want to collect all - // documents in order to get the total number of hits + final ScoreDoc after = scrollContext != null ? scrollContext.lastEmittedDoc : searchContext.searchAfter(); + if (searchContext.trackTotalHitsUpTo() == SearchContext.TRACK_TOTAL_HITS_DISABLED + || (scrollContext != null && scrollContext.lastEmittedDoc != null)) { - } else { - final ScoreDoc after = scrollContext.lastEmittedDoc; - if (returnsDocsInOrder(query, searchContext.sort())) { + if (returnsDocsInOrder(query, searchContext.sort())) { + if (after != null) { // now this gets interesting: since we sort in index-order, we can directly // skip to the desired doc - if (after != null) { - query = new BooleanQuery.Builder() - .add(query, BooleanClause.Occur.MUST) - .add(new MinDocQuery(after.doc + 1), BooleanClause.Occur.FILTER) - .build(); - } - // ... and stop collecting after ${size} matches - searchContext.terminateAfter(searchContext.size()); - } else if (canEarlyTerminate(reader, searchContext.sort())) { + query = new BooleanQuery.Builder() + .add(query, BooleanClause.Occur.MUST) + .add(new MinDocQuery(after.doc + 1), BooleanClause.Occur.FILTER) + .build(); + } + // ... and stop collecting after ${size} matches + searchContext.terminateAfter(searchContext.size()); + } else if (canEarlyTerminate(reader, searchContext.sort())) { + if (after != null) { // now this gets interesting: since the search sort is a prefix of the index sort, we can directly // skip to the desired doc - if (after != null) { - query = new BooleanQuery.Builder() - .add(query, BooleanClause.Occur.MUST) - .add(new SearchAfterSortedDocQuery(searchContext.sort().sort, (FieldDoc) after), BooleanClause.Occur.FILTER) - .build(); - } + query = new BooleanQuery.Builder() + .add(query, BooleanClause.Occur.MUST) + .add(new SearchAfterSortedDocQuery(searchContext.sort().sort, (FieldDoc) after), BooleanClause.Occur.FILTER) + .build(); } } } diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index ca1db258d08cf..6967e583c5a4b 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -124,6 +124,10 @@ public QuerySearchResult queryResult() { return this; } + public boolean hasPIT() { + return getShardSearchRequest() != null ? getShardSearchRequest().readerId() != null : false; + } + public void searchTimedOut(boolean searchTimedOut) { this.searchTimedOut = searchTimedOut; } diff --git a/server/src/main/java/org/elasticsearch/search/searchafter/SearchAfterBuilder.java b/server/src/main/java/org/elasticsearch/search/searchafter/SearchAfterBuilder.java index 533d87e618113..4033835ddea86 100644 --- a/server/src/main/java/org/elasticsearch/search/searchafter/SearchAfterBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/searchafter/SearchAfterBuilder.java @@ -20,6 +20,8 @@ package org.elasticsearch.search.searchafter; import org.apache.lucene.search.FieldDoc; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; import org.apache.lucene.search.SortedNumericSortField; import org.apache.lucene.search.SortedSetSortField; @@ -104,6 +106,46 @@ public Object[] getSortValues() { return Arrays.copyOf(sortValues, sortValues.length); } + /** + * Returns a value that can be used to tiebreak documents within a point in time reader. + */ + public static long createTiebreaker(ScoreDoc fieldDoc) { + return (((long) fieldDoc.shardIndex) << 32) | (fieldDoc.doc & 0xFFFFFFFFL); + } + + private static int decodeShardIndex(long value) { + return (int) (value >> 32); + } + + private static int decodeDocID(long value) { + return (int) value; + } + + public static FieldDoc buildFieldDocWithPIT(int shardIndex, SortAndFormats sort, Object[] values) { + if (sort == null || sort.sort.getSort() == null || sort.sort.getSort().length == 0) { + throw new IllegalArgumentException("Sort must contain at least one field."); + } + SortField[] sortFields = sort.sort.getSort(); + if (shardIndex >= 0 && sortFields.length == values.length-1) { + Object[] sortValues = new Object[values.length - 1]; + System.arraycopy(values, 0, sortValues, 0, values.length - 1); + FieldDoc fieldDoc = buildFieldDoc(sort, sortValues); + final long tiebreaker = (long) convertValueFromSortType("_tie_breaker", SortField.Type.LONG, + values[values.length - 1], DocValueFormat.RAW); + final int fieldDocShard = decodeShardIndex(tiebreaker); + if (Sort.RELEVANCE.equals(sort) || shardIndex == fieldDocShard) { + fieldDoc.doc = decodeDocID(tiebreaker); + } else if (shardIndex < fieldDocShard) { + fieldDoc.doc = Integer.MAX_VALUE; + } else { + fieldDoc.doc = -1; + } + return fieldDoc; + } else { + return buildFieldDoc(sort, values); + } + } + public static FieldDoc buildFieldDoc(SortAndFormats sort, Object[] values) { if (sort == null || sort.sort.getSort() == null || sort.sort.getSort().length == 0) { throw new IllegalArgumentException("Sort must contain at least one field."); diff --git a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java index 46b1e057c41a2..f807c45153dec 100644 --- a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java @@ -149,6 +149,8 @@ public void testBuildShardSearchTransportRequest() { String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10); SearchShardIterator iterator = new SearchShardIterator(clusterAlias, new ShardId(new Index("name", "foo"), 1), Collections.emptyList(), new OriginalIndices(new String[] {"name", "name1"}, IndicesOptions.strictExpand())); + iterator.setShardIndex(10); + ShardSearchRequest shardSearchTransportRequest = action.buildShardSearchRequest(iterator); assertEquals(IndicesOptions.strictExpand(), shardSearchTransportRequest.indicesOptions()); assertArrayEquals(new String[] {"name", "name1"}, shardSearchTransportRequest.indices()); @@ -158,6 +160,7 @@ public void testBuildShardSearchTransportRequest() { assertArrayEquals(new String[] {"bar", "baz"}, shardSearchTransportRequest.indexRoutings()); assertEquals("_shards:1,3", shardSearchTransportRequest.preference()); assertEquals(clusterAlias, shardSearchTransportRequest.getClusterAlias()); + assertEquals(10, shardSearchTransportRequest.getShardIndex()); } public void testSendSearchResponseDisallowPartialFailures() { diff --git a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java index f0782ac0cc6d6..8262cd526a135 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java @@ -308,7 +308,7 @@ public void testQueryCapturesThreadPoolStats() throws Exception { dir.close(); } - public void testInOrderScrollOptimization() throws Exception { + public void testInOrderOptimization() throws Exception { Directory dir = newDirectory(); final Sort sort = new Sort(new SortField("rank", SortField.Type.INT)); IndexWriterConfig iwc = newIndexWriterConfig() @@ -320,28 +320,49 @@ public void testInOrderScrollOptimization() throws Exception { } w.close(); IndexReader reader = DirectoryReader.open(dir); - ScrollContext scrollContext = new ScrollContext(); - TestSearchContext context = new TestSearchContext(null, indexShard, newContextSearcher(reader), scrollContext); - context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); - scrollContext.lastEmittedDoc = null; - scrollContext.maxScore = Float.NaN; - scrollContext.totalHits = null; - context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); - int size = randomIntBetween(2, 5); - context.setSize(size); - QueryPhase.executeInternal(context); - assertThat(context.queryResult().topDocs().topDocs.totalHits.value, equalTo((long) numDocs)); - assertNull(context.queryResult().terminatedEarly()); - assertThat(context.terminateAfter(), equalTo(0)); - assertThat(context.queryResult().getTotalHits().value, equalTo((long) numDocs)); + // Inside a scroll + { + ScrollContext scrollContext = new ScrollContext(); + TestSearchContext context = new TestSearchContext(null, indexShard, newContextSearcher(reader), scrollContext); + context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); + scrollContext.lastEmittedDoc = null; + scrollContext.maxScore = Float.NaN; + scrollContext.totalHits = null; + context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); + int size = randomIntBetween(2, 5); + context.setSize(size); + + QueryPhase.executeInternal(context); + assertThat(context.queryResult().topDocs().topDocs.totalHits.value, equalTo((long) numDocs)); + assertNull(context.queryResult().terminatedEarly()); + assertThat(context.terminateAfter(), equalTo(0)); + assertThat(context.queryResult().getTotalHits().value, equalTo((long) numDocs)); + + context.setSearcher(newEarlyTerminationContextSearcher(reader, size)); + QueryPhase.executeInternal(context); + assertThat(context.queryResult().topDocs().topDocs.totalHits.value, equalTo((long) numDocs)); + assertThat(context.terminateAfter(), equalTo(size)); + assertThat(context.queryResult().getTotalHits().value, equalTo((long) numDocs)); + assertThat(context.queryResult().topDocs().topDocs.scoreDocs[0].doc, greaterThanOrEqualTo(size)); + } + + // When total hits are not tracked + { + TestSearchContext context = new TestSearchContext(null, indexShard, newContextSearcher(reader), null); + context.parsedQuery(new ParsedQuery(new MatchAllDocsQuery())); + context.trackTotalHitsUpTo(SearchContext.TRACK_TOTAL_HITS_DISABLED); + context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap())); + int size = randomIntBetween(2, 5); + context.setSize(size); + + context.setSearcher(newEarlyTerminationContextSearcher(reader, size)); + QueryPhase.executeInternal(context); + assertThat(context.queryResult().getTotalHits().value, equalTo(0L)); + assertThat(context.queryResult().topDocs().topDocs.scoreDocs.length, equalTo(size)); + assertThat(context.terminateAfter(), equalTo(size)); + } - context.setSearcher(newEarlyTerminationContextSearcher(reader, size)); - QueryPhase.executeInternal(context); - assertThat(context.queryResult().topDocs().topDocs.totalHits.value, equalTo((long) numDocs)); - assertThat(context.terminateAfter(), equalTo(size)); - assertThat(context.queryResult().getTotalHits().value, equalTo((long) numDocs)); - assertThat(context.queryResult().topDocs().topDocs.scoreDocs[0].doc, greaterThanOrEqualTo(size)); reader.close(); dir.close(); } diff --git a/server/src/test/java/org/elasticsearch/search/searchafter/SearchAfterBuilderTests.java b/server/src/test/java/org/elasticsearch/search/searchafter/SearchAfterBuilderTests.java index 45da49e5f4c48..9b2d8a77235d6 100644 --- a/server/src/test/java/org/elasticsearch/search/searchafter/SearchAfterBuilderTests.java +++ b/server/src/test/java/org/elasticsearch/search/searchafter/SearchAfterBuilderTests.java @@ -21,6 +21,9 @@ import org.apache.lucene.document.LatLonDocValuesField; import org.apache.lucene.search.FieldComparator; +import org.apache.lucene.search.FieldDoc; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; import org.apache.lucene.search.SortedNumericSortField; import org.apache.lucene.search.SortedSetSortField; @@ -38,6 +41,7 @@ import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.MultiValueMode; import org.elasticsearch.search.sort.BucketedSort; +import org.elasticsearch.search.sort.SortAndFormats; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.test.ESTestCase; @@ -289,4 +293,42 @@ public BucketedSort newBucketedSort(BigArrays bigArrays, SortOrder sortOrder, Do type = extractSortType(new SortedSetSortField("field", false)); assertThat(type, equalTo(SortField.Type.STRING)); } + + public void testFieldDocWithPIT() { + int shardIndex = randomIntBetween(1, 100); + int docID = randomIntBetween(0, 10000); + ScoreDoc scoreDoc = new ScoreDoc(docID, Float.NaN); + scoreDoc.shardIndex = shardIndex; + long tiebreaker = SearchAfterBuilder.createTiebreaker(scoreDoc); + Sort sort = new Sort(new SortField("field", SortField.Type.INT)); + SortAndFormats sortAndFormats = new SortAndFormats(sort, new DocValueFormat[] { DocValueFormat.RAW }); + final Object[] values = new Object[2]; + values[0] = randomInt(); + values[1] = tiebreaker; + + FieldDoc fieldDoc = SearchAfterBuilder.buildFieldDocWithPIT(shardIndex, sortAndFormats, values); + assertThat(fieldDoc.doc, equalTo(docID)); + assertThat(fieldDoc.fields.length, equalTo(1)); + assertThat(fieldDoc.fields[0], equalTo(values[0])); + + fieldDoc = SearchAfterBuilder.buildFieldDocWithPIT(shardIndex-1, sortAndFormats, values); + assertThat(fieldDoc.doc, equalTo(Integer.MAX_VALUE)); + assertThat(fieldDoc.fields.length, equalTo(1)); + assertThat(fieldDoc.fields[0], equalTo(values[0])); + + fieldDoc = SearchAfterBuilder.buildFieldDocWithPIT(shardIndex+1, sortAndFormats, values); + assertThat(fieldDoc.doc, equalTo(-1)); + assertThat(fieldDoc.fields.length, equalTo(1)); + assertThat(fieldDoc.fields[0], equalTo(values[0])); + + Exception exc = expectThrows(Exception.class, () -> SearchAfterBuilder.buildFieldDocWithPIT(-1, sortAndFormats, values)); + assertThat(exc.getMessage(), containsString("search_after")); + + Object[] newValues = new Object[1]; + newValues[0] = values[0]; + fieldDoc = SearchAfterBuilder.buildFieldDocWithPIT(-1, sortAndFormats, newValues); + assertThat(fieldDoc.doc, equalTo(Integer.MAX_VALUE)); + assertThat(fieldDoc.fields.length, equalTo(1)); + assertThat(fieldDoc.fields[0], equalTo(values[0])); + } } diff --git a/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/search/PointInTimeIT.java b/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/search/PointInTimeIT.java index ad9e746be8d44..927d6399de49a 100644 --- a/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/search/PointInTimeIT.java +++ b/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/search/PointInTimeIT.java @@ -25,8 +25,10 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.SearchContextMissingException; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.builder.PointInTimeBuilder; +import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction; @@ -37,6 +39,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -50,6 +53,7 @@ import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.in; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; @@ -372,6 +376,58 @@ public void testPartialResults() throws Exception { } } + public void testTiebreakWithPIT() { + assertAcked(client().admin().indices().prepareDelete("index-*").get()); + int numIndex = randomIntBetween(2, 10); + int expectedNumDocs = 0; + for (int i = 0; i < numIndex; i++) { + String index = "index-" + i; + createIndex(index, Settings.builder().put("index.number_of_shards", 1).build()); + int numDocs = randomIntBetween(3, 20); + for (int j = 0; j < numDocs; j++) { + client().prepareIndex(index).setSource("value", 0).get(); + expectedNumDocs ++; + } + } + refresh("index-*"); + String pit = openPointInTime(new String[] { "index-*" }, TimeValue.timeValueHours(1)); + try { + for (String sortField : new String[] {"value", "_doc"}) { + for (int i = 1; i <= numIndex; i++) { + Comparable lastSortValue = null; + Set seen = new HashSet<>(); + SearchResponse response = client().prepareSearch() + .setSize(i) + .addSort(SortBuilders.fieldSort(sortField)) + .setPointInTime(new PointInTimeBuilder(pit)) + .get(); + while (response.getHits().getHits().length > 0) { + for (SearchHit hit : response.getHits().getHits()) { + assertTrue(seen.add(hit.getIndex() + hit.getId())); + } + int len = response.getHits().getHits().length; + SearchHit last = response.getHits().getHits()[len - 1]; + Comparable sortValue = (Comparable) last.getSortValues()[0]; + if (lastSortValue != null) { + assertThat(sortValue.compareTo(lastSortValue), greaterThanOrEqualTo(0)); + } + lastSortValue = sortValue; + assertThat(last.getSortValues().length, equalTo(2)); + response = client().prepareSearch() + .setSize(i) + .addSort(SortBuilders.fieldSort(sortField)) + .searchAfter(last.getSortValues()) + .setPointInTime(new PointInTimeBuilder(pit)) + .get(); + } + assertThat(seen.size(), equalTo(expectedNumDocs)); + } + } + } finally { + closePointInTime(pit); + } + } + private String openPointInTime(String[] indices, TimeValue keepAlive) { OpenPointInTimeRequest request = new OpenPointInTimeRequest( indices, From b5cac787fe7566ba3fe9b4d75b6fc942259ab4cf Mon Sep 17 00:00:00 2001 From: jimczi Date: Wed, 25 Nov 2020 10:23:50 +0100 Subject: [PATCH 2/3] Fix yml tests --- .../test/data_stream/10_data_stream_resolvability.yml | 9 ++++++--- .../rest-api-spec/test/search/point_in_time.yml | 9 ++++++--- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_stream/10_data_stream_resolvability.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_stream/10_data_stream_resolvability.yml index 80b9e0001b29c..3853800023f75 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_stream/10_data_stream_resolvability.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_stream/10_data_stream_resolvability.yml @@ -662,7 +662,8 @@ - length: {hits.hits: 1 } - match: {hits.hits.0._index: .ds-simple-data-stream1-000001 } - match: {hits.hits.0._id: "123" } - - match: {hits.hits.0.sort: [22, 123] } + - match: {hits.hits.0.sort.0: 22 } + - match: {hits.hits.0.sort.1: 123 } - do: search: @@ -680,7 +681,8 @@ - length: {hits.hits: 1 } - match: {hits.hits.0._index: .ds-simple-data-stream1-000001 } - match: {hits.hits.0._id: "5" } - - match: {hits.hits.0.sort: [18, 5] } + - match: {hits.hits.0.sort.0: 18 } + - match: {hits.hits.0.sort.1: 5 } - do: search: @@ -699,7 +701,8 @@ - length: {hits.hits: 1 } - match: {hits.hits.0._index: .ds-simple-data-stream1-000001 } - match: {hits.hits.0._id: "1" } - - match: {hits.hits.0.sort: [18, 1] } + - match: {hits.hits.0.sort.0: 18 + - match: {hits.hits.0.sort.1: 1 } - do: search: diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/search/point_in_time.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/search/point_in_time.yml index 294a10a165da3..4fd6a8b15d7f3 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/search/point_in_time.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/search/point_in_time.yml @@ -61,7 +61,8 @@ setup: - length: {hits.hits: 1 } - match: {hits.hits.0._index: test } - match: {hits.hits.0._id: "172" } - - match: {hits.hits.0.sort: [24, 172] } + - match: {hits.hits.0.sort.0: 24 + - match: {hits.hits.0.sort.1: 172 } - do: index: @@ -89,7 +90,8 @@ setup: - length: {hits.hits: 1 } - match: {hits.hits.0._index: test } - match: {hits.hits.0._id: "42" } - - match: {hits.hits.0.sort: [18, 42] } + - match: {hits.hits.0.sort.0: 18 } + - match: {hits.hits.0.sort.1: 42 } - do: search: @@ -107,7 +109,8 @@ setup: - length: {hits.hits: 1 } - match: {hits.hits.0._index: test } - match: {hits.hits.0._id: "1" } - - match: {hits.hits.0.sort: [18, 1] } + - match: {hits.hits.0.sort.0: 18 } + - match: {hits.hits.0.sort.1: 1 } - do: search: From e248522393a46415bdd5aa9829b532a2233468ff Mon Sep 17 00:00:00 2001 From: jimczi Date: Mon, 30 Nov 2020 11:21:14 +0100 Subject: [PATCH 3/3] yml test --- .../test/resources/rest-api-spec/test/search/point_in_time.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/search/point_in_time.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/search/point_in_time.yml index 4fd6a8b15d7f3..678f3e1dc2844 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/search/point_in_time.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/search/point_in_time.yml @@ -61,7 +61,7 @@ setup: - length: {hits.hits: 1 } - match: {hits.hits.0._index: test } - match: {hits.hits.0._id: "172" } - - match: {hits.hits.0.sort.0: 24 + - match: {hits.hits.0.sort.0: 24 } - match: {hits.hits.0.sort.1: 172 } - do: