From f89b863a387565203d215f6535682cc7a4d83aa0 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sat, 9 Jan 2021 09:43:59 -0500 Subject: [PATCH 1/4] Retry point in time on other copy when possible (#66713) Relates #61062 --- .../search/point-in-time-api.asciidoc | 4 +- .../paginate-search-results.asciidoc | 12 +-- .../search/AbstractSearchAsyncAction.java | 43 +++++---- .../action/search/DfsQueryPhase.java | 2 +- .../action/search/FetchSearchPhase.java | 2 +- .../action/search/SearchContextId.java | 12 ++- .../action/search/SearchPhaseContext.java | 7 ++ .../action/search/SearchTransportService.java | 5 + .../action/search/TransportSearchAction.java | 25 ++++- .../rest/action/search/RestSearchAction.java | 2 +- .../elasticsearch/search/SearchService.java | 96 +++++++++++++------ .../search/builder/PointInTimeBuilder.java | 43 ++++++--- .../search/internal/LegacyReaderContext.java | 1 + .../search/internal/ShardSearchContextId.java | 25 ++++- .../action/search/MockSearchPhaseContext.java | 5 + .../search/TransportSearchActionTests.java | 63 ++++++++++++ .../search/SearchServiceTests.java | 3 +- .../index/engine/FrozenIndexIT.java | 66 +++++++++++++ .../index/engine/FrozenIndexTests.java | 24 +++++ .../RetrySearchIntegTests.java | 89 +++++++++++++++++ 20 files changed, 447 insertions(+), 82 deletions(-) diff --git a/docs/reference/search/point-in-time-api.asciidoc b/docs/reference/search/point-in-time-api.asciidoc index f6b962a721f87..1c4db581a30b9 100644 --- a/docs/reference/search/point-in-time-api.asciidoc +++ b/docs/reference/search/point-in-time-api.asciidoc @@ -38,7 +38,7 @@ POST /_search <1> } }, "pit": { - "id": "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWICBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <2> + "id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <2> "keep_alive": "1m" <3> } } @@ -99,7 +99,7 @@ as soon as they are no longer used in search requests. --------------------------------------- DELETE /_pit { - "id" : "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWIBBXV1aWQyAAA=" + "id" : "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==" } --------------------------------------- // TEST[catch:missing] diff --git a/docs/reference/search/search-your-data/paginate-search-results.asciidoc b/docs/reference/search/search-your-data/paginate-search-results.asciidoc index f036d04fb00be..066e9a80daee7 100644 --- a/docs/reference/search/search-your-data/paginate-search-results.asciidoc +++ b/docs/reference/search/search-your-data/paginate-search-results.asciidoc @@ -62,10 +62,10 @@ The API returns a PIT ID. [source,console-result] ---- { - "id": "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWICBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==" + "id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==" } ---- -// TESTRESPONSE[s/"id": "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWICBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="/"id": $body.id/] +// TESTRESPONSE[s/"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="/"id": $body.id/] To get the first page of results, submit a search request with a `sort` argument. If using a PIT, specify the PIT ID in the `pit.id` parameter and omit @@ -86,7 +86,7 @@ GET /_search } }, "pit": { - "id": "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWICBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1> + "id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1> "keep_alive": "1m" }, "sort": [ <2> @@ -106,7 +106,7 @@ a PIT, the response's `pit_id` parameter contains an updated PIT ID. [source,console-result] ---- { - "pit_id" : "46ToAwEPbXktaW5kZXgtMDAwMDAxFnVzaTVuenpUVGQ2TFNheUxVUG5LVVEAFldicVdzOFFtVHZTZDFoWWowTGkwS0EAAAAAAAAAAAQURzZzcUszUUJ5U1NMX3Jyak5ET0wBFnVzaTVuenpUVGQ2TFNheUxVUG5LVVEAAA==", <1> + "pit_id" : "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1> "took" : 17, "timed_out" : false, "_shards" : ..., @@ -150,7 +150,7 @@ GET /_search } }, "pit": { - "id": "46ToAwEPbXktaW5kZXgtMDAwMDAxFnVzaTVuenpUVGQ2TFNheUxVUG5LVVEAFldicVdzOFFtVHZTZDFoWWowTGkwS0EAAAAAAAAAAAQURzZzcUszUUJ5U1NMX3Jyak5ET0wBFnVzaTVuenpUVGQ2TFNheUxVUG5LVVEAAA==", <1> + "id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1> "keep_alive": "1m" }, "sort": [ @@ -178,7 +178,7 @@ When you're finished, you should delete your PIT. ---- DELETE /_pit { - "id" : "46ToAwEPbXktaW5kZXgtMDAwMDAxFnVzaTVuenpUVGQ2TFNheUxVUG5LVVEAFldicVdzOFFtVHZTZDFoWWowTGkwS0EAAAAAAAAAAAQURzZzcUszUUJ5U1NMX3Jyak5ET0wBFnVzaTVuenpUVGQ2TFNheUxVUG5LVVEAAA==" + "id" : "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==" } ---- // TEST[catch:missing] diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 57d9f8298b08a..79e6375302a1b 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -38,11 +38,14 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.search.SearchContextMissingException; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.builder.PointInTimeBuilder; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.internal.ShardSearchContextId; import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.transport.Transport; @@ -478,7 +481,7 @@ public final void onShardFailure(final int shardIndex, SearchShardTarget shardTa } else { // the failure is already present, try and not override it with an exception that is less meaningless // for example, getting illegal shard state - if (TransportActions.isReadOverrideException(e)) { + if (TransportActions.isReadOverrideException(e) && (e instanceof SearchContextMissingException == false)) { shardFailures.set(shardIndex, new ShardSearchFailure(e, shardTarget)); } } @@ -567,6 +570,16 @@ public final SearchRequest getRequest() { return request; } + @Override + public boolean isPartOfPointInTime(ShardSearchContextId contextId) { + final PointInTimeBuilder pointInTimeBuilder = request.pointInTimeBuilder(); + if (pointInTimeBuilder != null) { + return request.pointInTimeBuilder().getSearchContextId(searchTransportService.getNamedWriteableRegistry()).contains(contextId); + } else { + return false; + } + } + protected final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, ShardSearchFailure[] failures, String scrollId, String searchContextId) { int numSuccess = successfulOps.get(); @@ -598,7 +611,7 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At searchContextId = SearchContextId.encode(queryResults.asList(), aliasFilter, minNodeVersion); } else { if (request.source() != null && request.source().pointInTimeBuilder() != null) { - searchContextId = request.source().pointInTimeBuilder().getId(); + searchContextId = request.source().pointInTimeBuilder().getEncodedId(); } else { searchContextId = null; } @@ -619,21 +632,19 @@ public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) * @param exception the exception explaining or causing the phase failure */ private void raisePhaseFailure(SearchPhaseExecutionException exception) { - // we don't release persistent readers (point in time). - if (request.pointInTimeBuilder() == null) { - results.getSuccessfulResults().forEach((entry) -> { - if (entry.getContextId() != null) { - try { - SearchShardTarget searchShardTarget = entry.getSearchShardTarget(); - Transport.Connection connection = getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId()); - sendReleaseSearchContext(entry.getContextId(), connection, searchShardTarget.getOriginalIndices()); - } catch (Exception inner) { - inner.addSuppressed(exception); - logger.trace("failed to release context", inner); - } + results.getSuccessfulResults().forEach((entry) -> { + // Do not release search contexts that are part of the point in time + if (entry.getContextId() != null && isPartOfPointInTime(entry.getContextId()) == false) { + try { + SearchShardTarget searchShardTarget = entry.getSearchShardTarget(); + Transport.Connection connection = getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId()); + sendReleaseSearchContext(entry.getContextId(), connection, searchShardTarget.getOriginalIndices()); + } catch (Exception inner) { + inner.addSuppressed(exception); + logger.trace("failed to release context", inner); } - }); - } + } + }); Releasables.close(releasables); listener.onFailure(exception); } diff --git a/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java b/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java index e0fe285b730ec..40923d90b5273 100644 --- a/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java @@ -100,7 +100,7 @@ public void onFailure(Exception exception) { progressListener.notifyQueryFailure(shardIndex, searchShardTarget, exception); counter.onFailure(shardIndex, searchShardTarget, exception); } finally { - if (context.getRequest().pointInTimeBuilder() == null) { + if (context.isPartOfPointInTime(querySearchRequest.contextId()) == false) { // the query might not have been executed at all (for example because thread pool rejected // execution) and the search context that was created in dfs phase might not be released. // release it again to be in the safe side diff --git a/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java index 94baf2a379e83..a287ee160ba27 100644 --- a/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java @@ -210,7 +210,7 @@ private void releaseIrrelevantSearchContext(QuerySearchResult queryResult) { // or using a PIT and if it has at least one hit that didn't make it to the global topDocs if (queryResult.hasSearchContext() && context.getRequest().scroll() == null - && context.getRequest().pointInTimeBuilder() == null) { + && (context.isPartOfPointInTime(queryResult.getContextId()) == false)) { try { SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget(); Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId()); diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java b/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java index 7a037375d7dd8..8269acca6ff2c 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java @@ -32,6 +32,7 @@ import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.search.internal.ShardSearchContextId; import org.elasticsearch.transport.RemoteClusterAware; import java.io.IOException; @@ -43,14 +44,17 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; -public class SearchContextId { +public final class SearchContextId { private final Map shards; private final Map aliasFilter; + private transient Set contextIds; - private SearchContextId(Map shards, Map aliasFilter) { + SearchContextId(Map shards, Map aliasFilter) { this.shards = shards; this.aliasFilter = aliasFilter; + this.contextIds = shards.values().stream().map(SearchContextIdForNode::getSearchContextId).collect(Collectors.toSet()); } public Map shards() { @@ -61,6 +65,10 @@ public Map aliasFilter() { return aliasFilter; } + public boolean contains(ShardSearchContextId contextId) { + return contextIds.contains(contextId); + } + public static String encode(List searchPhaseResults, Map aliasFilter, Version version) { final Map shards = new HashMap<>(); for (SearchPhaseResult searchPhaseResult : searchPhaseResults) { diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java index 93463921d7e99..f27faf493ee83 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java @@ -58,6 +58,12 @@ interface SearchPhaseContext extends Executor { */ SearchRequest getRequest(); + /** + * Checks if the given context id is part of the point in time of this search (if exists). + * We should not release search contexts that belong to the point in time during or after searches. + */ + boolean isPartOfPointInTime(ShardSearchContextId contextId); + /** * Builds and sends the final search response back to the user. * @@ -108,6 +114,7 @@ interface SearchPhaseContext extends Executor { default void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connection connection, OriginalIndices originalIndices) { + assert isPartOfPointInTime(contextId) == false : "Must not release point in time context [" + contextId + "]"; if (connection != null) { getSearchTransport().sendFreeContext(connection, contextId, originalIndices); } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index b069ad5d54664..b9b92e1316a29 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -32,6 +32,7 @@ import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -438,4 +439,8 @@ public void cancelSearchTask(SearchTask task, String reason) { // force the origin to execute the cancellation as a system user new OriginSettingClient(client, GetTaskAction.TASKS_ORIGIN).admin().cluster().cancelTasks(req, ActionListener.wrap(() -> {})); } + + public NamedWriteableRegistry getNamedWriteableRegistry() { + return client.getNamedWriteableRegistry(); + } } diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 4d969d7f892fa..c9e1ba7a5d505 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -38,6 +38,7 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; @@ -284,7 +285,7 @@ private void executeRequest(Task task, SearchRequest searchRequest, final SearchContextId searchContext; final Map remoteClusterIndices; if (searchRequest.pointInTimeBuilder() != null) { - searchContext = SearchContextId.decode(namedWriteableRegistry, searchRequest.pointInTimeBuilder().getId()); + searchContext = searchRequest.pointInTimeBuilder().getSearchContextId(namedWriteableRegistry); remoteClusterIndices = getIndicesFromSearchContexts(searchContext, searchRequest.indicesOptions()); } else { searchContext = null; @@ -581,7 +582,15 @@ static List getRemoteShardsIteratorFromPointInTime(Map {}; - try (Releasable ignored = markAsUsed) { - final IndexService indexService; - final Engine.Searcher canMatchSearcher; + Releasable releasable = null; + try { + IndexService indexService; final boolean hasRefreshPending; - if (readerContext != null) { - indexService = readerContext.indexService(); - canMatchSearcher = readerContext.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE); + final Engine.Searcher canMatchSearcher; + if (request.readerId() != null) { hasRefreshPending = false; + ReaderContext readerContext; + Engine.Searcher searcher; + try { + readerContext = findReaderContext(request.readerId(), request); + releasable = readerContext.markAsUsed(getKeepAlive(request)); + indexService = readerContext.indexService(); + searcher = readerContext.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE); + } catch (SearchContextMissingException e) { + final String searcherId = request.readerId().getSearcherId(); + if (searcherId == null) { + throw e; + } + indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); + IndexShard indexShard = indexService.getShard(request.shardId().getId()); + final Engine.SearcherSupplier searcherSupplier = indexShard.acquireSearcherSupplier(); + if (searcherId.equals(searcherSupplier.getSearcherId()) == false) { + searcherSupplier.close(); + throw e; + } + releasable = searcherSupplier; + searcher = searcherSupplier.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE); + } + canMatchSearcher = searcher; } else { indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); IndexShard indexShard = indexService.getShard(request.shardId().getId()); hasRefreshPending = indexShard.hasRefreshPending() && checkRefreshPending; canMatchSearcher = indexShard.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE); } - - try (Releasable ignored2 = canMatchSearcher) { + try (Releasable ignored = canMatchSearcher) { QueryShardContext context = indexService.newQueryShardContext(request.shardId().id(), 0, canMatchSearcher, request::nowInMillis, request.getClusterAlias(), request.getRuntimeMappings()); final boolean canMatch = queryStillMatchesAfterRewrite(request, context); @@ -1199,6 +1235,8 @@ private CanMatchResponse canMatch(ShardSearchRequest request, boolean checkRefre } return new CanMatchResponse(canMatch || hasRefreshPending, minMax); } + } finally { + Releasables.close(releasable); } } diff --git a/server/src/main/java/org/elasticsearch/search/builder/PointInTimeBuilder.java b/server/src/main/java/org/elasticsearch/search/builder/PointInTimeBuilder.java index dfa8f45e23750..9fc0661dbf7af 100644 --- a/server/src/main/java/org/elasticsearch/search/builder/PointInTimeBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/builder/PointInTimeBuilder.java @@ -19,8 +19,10 @@ package org.elasticsearch.search.builder; +import org.elasticsearch.action.search.SearchContextId; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -44,39 +46,40 @@ public final class PointInTimeBuilder implements Writeable, ToXContentObject { static { PARSER = new ObjectParser<>(SearchSourceBuilder.POINT_IN_TIME.getPreferredName(), XContentParams::new); - PARSER.declareString((params, id) -> params.id = id, ID_FIELD); + PARSER.declareString((params, id) -> params.encodedId = id, ID_FIELD); PARSER.declareField((params, keepAlive) -> params.keepAlive = keepAlive, (p, c) -> TimeValue.parseTimeValue(p.text(), KEEP_ALIVE_FIELD.getPreferredName()), KEEP_ALIVE_FIELD, ObjectParser.ValueType.STRING); } private static final class XContentParams { - private String id; + private String encodedId; private TimeValue keepAlive; } - private final String id; + private final String encodedId; + private transient SearchContextId searchContextId; // lazily decoded from the encodedId private TimeValue keepAlive; - public PointInTimeBuilder(String id) { - this.id = Objects.requireNonNull(id); + public PointInTimeBuilder(String encodedId) { + this.encodedId = Objects.requireNonNull(encodedId); } public PointInTimeBuilder(StreamInput in) throws IOException { - id = in.readString(); + encodedId = in.readString(); keepAlive = in.readOptionalTimeValue(); } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeString(id); + out.writeString(encodedId); out.writeOptionalTimeValue(keepAlive); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(SearchSourceBuilder.POINT_IN_TIME.getPreferredName()); - builder.field(ID_FIELD.getPreferredName(), id); + builder.field(ID_FIELD.getPreferredName(), encodedId); if (keepAlive != null) { builder.field(KEEP_ALIVE_FIELD.getPreferredName(), keepAlive); } @@ -86,17 +89,27 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public static PointInTimeBuilder fromXContent(XContentParser parser) throws IOException { final XContentParams params = PARSER.parse(parser, null); - if (params.id == null) { + if (params.encodedId == null) { throw new IllegalArgumentException("point int time id is not provided"); } - return new PointInTimeBuilder(params.id).setKeepAlive(params.keepAlive); + return new PointInTimeBuilder(params.encodedId).setKeepAlive(params.keepAlive); } /** - * Returns the id of this point in time + * Returns the encoded id of this point in time */ - public String getId() { - return id; + public String getEncodedId() { + return encodedId; + } + + /** + * Returns the search context of this point in time from its encoded id. + */ + public SearchContextId getSearchContextId(NamedWriteableRegistry namedWriteableRegistry) { + if (searchContextId == null) { + searchContextId = SearchContextId.decode(namedWriteableRegistry, encodedId); + } + return searchContextId; } /** @@ -118,11 +131,11 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; final PointInTimeBuilder that = (PointInTimeBuilder) o; - return Objects.equals(id, that.id) && Objects.equals(keepAlive, that.keepAlive); + return Objects.equals(encodedId, that.encodedId) && Objects.equals(keepAlive, that.keepAlive); } @Override public int hashCode() { - return Objects.hash(id, keepAlive); + return Objects.hash(encodedId, keepAlive); } } diff --git a/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java b/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java index d40bdcfbed86d..23c90135deb86 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java @@ -40,6 +40,7 @@ public LegacyReaderContext(ShardSearchContextId id, IndexService indexService, I super(id, indexService, indexShard, reader, keepAliveInMillis, false); assert shardSearchRequest.readerId() == null; assert shardSearchRequest.keepAlive() == null; + assert id.getSearcherId() == null : "Legacy reader context must not have searcher id"; this.shardSearchRequest = Objects.requireNonNull(shardSearchRequest); if (shardSearchRequest.scroll() != null) { // Search scroll requests are special, they don't hold indices names so we have diff --git a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchContextId.java b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchContextId.java index f961d21033439..3baacacf1cd3f 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchContextId.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchContextId.java @@ -30,10 +30,17 @@ public final class ShardSearchContextId implements Writeable { private final String sessionId; private final long id; + private final String searcherId; + // TODO: Remove this constructor public ShardSearchContextId(String sessionId, long id) { + this(sessionId, id, null); + } + + public ShardSearchContextId(String sessionId, long id, String searcherId) { this.sessionId = Objects.requireNonNull(sessionId); this.id = id; + this.searcherId = searcherId; } public ShardSearchContextId(StreamInput in) throws IOException { @@ -43,6 +50,11 @@ public ShardSearchContextId(StreamInput in) throws IOException { } else { this.sessionId = ""; } + if (in.getVersion().onOrAfter(Version.V_7_12_0)) { + this.searcherId = in.readOptionalString(); + } else { + this.searcherId = null; + } } @Override @@ -51,6 +63,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_7_0)) { out.writeString(sessionId); } + if (out.getVersion().onOrAfter(Version.V_7_12_0)) { + out.writeOptionalString(searcherId); + } } public String getSessionId() { @@ -61,21 +76,25 @@ public long getId() { return id; } + public String getSearcherId() { + return searcherId; + } + @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ShardSearchContextId other = (ShardSearchContextId) o; - return id == other.id && sessionId.equals(other.sessionId); + return id == other.id && sessionId.equals(other.sessionId) && Objects.equals(searcherId, other.searcherId); } @Override public int hashCode() { - return Objects.hash(sessionId, id); + return Objects.hash(sessionId, id, searcherId); } @Override public String toString() { - return "[" + sessionId + "][" + id + "]"; + return "[" + sessionId + "][" + id + "] searcherId [" + searcherId + "]"; } } diff --git a/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java b/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java index d37dcf56ba06b..f278c15453bef 100644 --- a/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java +++ b/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java @@ -151,4 +151,9 @@ public void onFailure(Exception e) { public void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connection connection, OriginalIndices originalIndices) { releasedSearchContexts.add(contextId); } + + @Override + public boolean isPartOfPointInTime(ShardSearchContextId contextId) { + return false; + } } diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 0b7925913abcd..7272c86ef862d 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlocks; @@ -36,13 +37,16 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.GroupShardsIteratorTests; +import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.query.InnerHitBuilder; import org.elasticsearch.index.query.MatchAllQueryBuilder; @@ -61,6 +65,7 @@ import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.internal.ShardSearchContextId; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; @@ -82,6 +87,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -94,12 +100,16 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Function; +import java.util.stream.Collectors; import static org.elasticsearch.test.InternalAggregationTestCase.emptyReduceContextBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.awaitLatch; import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.startsWith; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasSize; public class TransportSearchActionTests extends ESTestCase { @@ -936,4 +946,57 @@ public void testShouldPreFilterSearchShardsWithReadOnly() { indices, randomIntBetween(127, 10000))); } } + + public void testLocalShardIteratorFromPointInTime() { + final int numberOfShards = randomIntBetween(1, 5); + final int numberOfReplicas = randomIntBetween(0, 2); + final String[] indices = {"test-1", "test-2"}; + final ClusterState clusterState = + ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(indices, numberOfShards, numberOfReplicas); + final IndexMetadata indexMetadata = clusterState.metadata().index("test-1"); + Map contexts = new HashMap<>(); + Set relocatedContexts = new HashSet<>(); + for (int shardId = 0; shardId < numberOfShards; shardId++) { + final String targetNode; + if (randomBoolean()) { + final IndexRoutingTable routingTable = clusterState.routingTable().index(indexMetadata.getIndex()); + targetNode = randomFrom(routingTable.shard(shardId).assignedShards()).currentNodeId(); + } else { + // relocated or no longer assigned + relocatedContexts.add(new ShardId(indexMetadata.getIndex(), shardId)); + targetNode = UUIDs.randomBase64UUID(); + } + contexts.put(new ShardId(indexMetadata.getIndex(), shardId), + new SearchContextIdForNode(null, targetNode, + new ShardSearchContextId(UUIDs.randomBase64UUID(), randomNonNegativeLong(), null))); + } + TimeValue keepAlive = randomBoolean() ? null : TimeValue.timeValueSeconds(between(30, 3600)); + final List shardIterators = TransportSearchAction.getLocalLocalShardsIteratorFromPointInTime( + clusterState, + OriginalIndices.NONE, + null, + new SearchContextId(contexts, Map.of()), + keepAlive + ); + shardIterators.sort(Comparator.comparing(SearchShardIterator::shardId)); + assertThat(shardIterators, hasSize(numberOfShards)); + for (int id = 0; id < numberOfShards; id++) { + final ShardId shardId = new ShardId(indexMetadata.getIndex(), id); + final SearchShardIterator shardIterator = shardIterators.get(id); + final SearchContextIdForNode context = contexts.get(shardId); + if (context.getSearchContextId().getSearcherId() == null) { + assertThat(shardIterator.getTargetNodeIds(), hasSize(1)); + } else { + final List targetNodes = clusterState.routingTable().index(indexMetadata.getIndex()).shard(id).assignedShards() + .stream().map(ShardRouting::currentNodeId).collect(Collectors.toList()); + if (relocatedContexts.contains(shardId)) { + targetNodes.add(context.getNode()); + } + assertThat(shardIterator.getTargetNodeIds(), containsInAnyOrder(targetNodes.toArray(new String[0]))); + } + assertThat(shardIterator.getTargetNodeIds().get(0), equalTo(context.getNode())); + assertThat(shardIterator.getSearchContextId(), equalTo(context.getSearchContextId())); + assertThat(shardIterator.getSearchContextKeepAlive(), equalTo(keepAlive)); + } + } } diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 702ea562542c8..746b739b0af4f 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -1073,8 +1073,7 @@ OriginalIndices.NONE, new SearchRequest().allowPartialSearchResults(true), assertThat(searchService.getActiveContexts(), equalTo(contextIds.size())); while (contextIds.isEmpty() == false) { final ShardSearchContextId contextId = randomFrom(contextIds); - expectThrows(SearchContextMissingException.class, - () -> searchService.freeReaderContext(new ShardSearchContextId(UUIDs.randomBase64UUID(), contextId.getId()))); + assertFalse(searchService.freeReaderContext(new ShardSearchContextId(UUIDs.randomBase64UUID(), contextId.getId()))); assertThat(searchService.getActiveContexts(), equalTo(contextIds.size())); if (randomBoolean()) { assertTrue(searchService.freeReaderContext(contextId)); diff --git a/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexIT.java b/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexIT.java index 5c97e5dec60da..f768f614a79b0 100644 --- a/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexIT.java +++ b/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexIT.java @@ -7,7 +7,10 @@ package org.elasticsearch.index.engine; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.bootstrap.JavaVersion; import org.elasticsearch.cluster.metadata.DataStream; @@ -15,27 +18,38 @@ import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.CancelAllocationCommand; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.index.shard.IndexLongFieldRange; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.protocol.xpack.frozen.FreezeRequest; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.builder.PointInTimeBuilder; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; import org.elasticsearch.xpack.core.frozen.action.FreezeIndexAction; +import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction; +import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest; +import org.elasticsearch.xpack.core.search.action.OpenPointInTimeAction; +import org.elasticsearch.xpack.core.search.action.OpenPointInTimeRequest; import org.elasticsearch.xpack.frozen.FrozenIndices; import org.joda.time.Instant; import java.io.IOException; import java.util.Collection; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_SETTING; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.sameInstance; @@ -172,4 +186,56 @@ public void testTimestampFieldTypeExposedByAllIndicesServices() throws Exception } } + public void testRetryPointInTime() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(1); + final List dataNodes = + StreamSupport.stream(internalCluster().clusterService().state().nodes().getDataNodes().spliterator(), false) + .map(e -> e.value.getName()) + .collect(Collectors.toList()); + final String assignedNode = randomFrom(dataNodes); + final String indexName = "test"; + assertAcked( + client().admin().indices() + .prepareCreate(indexName) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.routing.allocation.require._name", assignedNode) + .build()) + .addMapping("_doc", "created_date", "type=date,format=yyyy-MM-dd")); + int numDocs = randomIntBetween(1, 100); + for (int i = 0; i < numDocs; i++) { + client().prepareIndex(indexName, "_doc").setSource("created_date", "2011-02-02").get(); + } + assertAcked(client().execute(FreezeIndexAction.INSTANCE, new FreezeRequest(indexName)).actionGet()); + final String pitId = client().execute(OpenPointInTimeAction.INSTANCE, + new OpenPointInTimeRequest(new String[]{indexName}, IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED, + TimeValue.timeValueMinutes(2), null, null)).actionGet().getSearchContextId(); + try { + SearchResponse resp = client().prepareSearch() + .setIndices(indexName) + .setPreference(null) + .setPointInTime(new PointInTimeBuilder(pitId)) + .get(); + assertNoFailures(resp); + assertThat(resp.pointInTimeId(), equalTo(pitId)); + assertHitCount(resp, numDocs); + internalCluster().restartNode(assignedNode); + ensureGreen(indexName); + resp = client().prepareSearch() + .setIndices(indexName) + .setQuery(new RangeQueryBuilder("created_date").gte("2011-01-01").lte("2011-12-12")) + .setSearchType(SearchType.QUERY_THEN_FETCH) + .setPreference(null) + .setPreFilterShardSize(between(1, 10)) + .setAllowPartialSearchResults(true) + .setPointInTime(new PointInTimeBuilder(pitId)) + .get(); + assertNoFailures(resp); + assertThat(resp.pointInTimeId(), equalTo(pitId)); + assertHitCount(resp, numDocs); + } finally { + assertAcked(client().execute(FreezeIndexAction.INSTANCE, new FreezeRequest(indexName).setFreeze(false)).actionGet()); + client().execute(ClosePointInTimeAction.INSTANCE, new ClosePointInTimeRequest(pitId)).actionGet(); + } + } } diff --git a/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexTests.java b/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexTests.java index 358b964f9d396..7896c683e4100 100644 --- a/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexTests.java +++ b/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -37,10 +38,12 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.protocol.xpack.frozen.FreezeRequest; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchContextMissingException; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.builder.PointInTimeBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.search.internal.ShardSearchContextId; import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; @@ -347,6 +350,27 @@ public void testCanMatch() throws IOException, ExecutionException, InterruptedEx IndicesStatsResponse response = client().admin().indices().prepareStats("index").clear().setRefresh(true).get(); assertEquals(0, response.getTotal().refresh.getTotal()); + + // Retry with point in time + PlainActionFuture openContextFuture = new PlainActionFuture<>(); + searchService.openReaderContext(shard.shardId(), TimeValue.timeValueSeconds(60), openContextFuture); + final ShardSearchContextId contextId = openContextFuture.actionGet(TimeValue.timeValueSeconds(60)); + assertNotNull(contextId.getSearcherId()); + sourceBuilder.query(QueryBuilders.rangeQuery("field").gt("2010-01-06T02:00").lt("2010-01-07T02:00")); + assertFalse(searchService.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, contextId, null)).canMatch()); + + assertTrue(searchService.freeReaderContext(contextId)); + sourceBuilder.query(QueryBuilders.rangeQuery("field").gt("2010-01-06T02:00").lt("2010-01-07T02:00")); + assertFalse(searchService.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, contextId, null)).canMatch()); + + expectThrows(SearchContextMissingException.class, () -> { + ShardSearchContextId withoutCommitId = new ShardSearchContextId(contextId.getSessionId(), contextId.getId(), null); + sourceBuilder.query(QueryBuilders.rangeQuery("field").gt("2010-01-06T02:00").lt("2010-01-07T02:00")); + assertFalse(searchService.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, withoutCommitId, null)).canMatch()); + }); } } diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java index afabdc107c9ac..a750b668a004b 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java @@ -6,13 +6,23 @@ package org.elasticsearch.xpack.searchablesnapshots; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.search.builder.PointInTimeBuilder; import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction; +import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest; +import org.elasticsearch.xpack.core.search.action.OpenPointInTimeAction; +import org.elasticsearch.xpack.core.search.action.OpenPointInTimeRequest; import java.util.ArrayList; import java.util.Collections; @@ -21,6 +31,8 @@ import java.util.Set; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.equalTo; public class RetrySearchIntegTests extends BaseSearchableSnapshotsIntegTestCase { @@ -93,4 +105,81 @@ public void testSearcherId() throws Exception { } } } + + public void testRetryPointInTime() throws Exception { + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + assertAcked( + client().admin() + .indices() + .prepareCreate(indexName) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)).build()) + .setMapping("{\"properties\":{\"created_date\":{\"type\": \"date\", \"format\": \"yyyy-MM-dd\"}}}") + ); + final List indexRequestBuilders = new ArrayList<>(); + final int docCount = between(0, 100); + for (int i = 0; i < docCount; i++) { + indexRequestBuilders.add(client().prepareIndex(indexName).setSource("created_date", "2011-02-02")); + } + indexRandom(true, false, indexRequestBuilders); + assertThat( + client().admin().indices().prepareForceMerge(indexName).setOnlyExpungeDeletes(true).setFlush(true).get().getFailedShards(), + equalTo(0) + ); + refresh(indexName); + forceMerge(); + + final String repositoryName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createRepository(repositoryName, "fs"); + + final SnapshotId snapshotOne = createSnapshot(repositoryName, "snapshot-1", List.of(indexName)).snapshotId(); + assertAcked(client().admin().indices().prepareDelete(indexName)); + + final int numberOfReplicas = between(0, 2); + final Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas).build(); + internalCluster().ensureAtLeastNumDataNodes(numberOfReplicas + 1); + + mountSnapshot(repositoryName, snapshotOne.getName(), indexName, indexName, indexSettings); + ensureGreen(indexName); + + final String pitId = client().execute( + OpenPointInTimeAction.INSTANCE, + new OpenPointInTimeRequest( + new String[] { indexName }, + IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED, + TimeValue.timeValueMinutes(2), + null, + null + ) + ).actionGet().getSearchContextId(); + try { + SearchResponse resp = client().prepareSearch() + .setIndices(indexName) + .setPreference(null) + .setPointInTime(new PointInTimeBuilder(pitId)) + .get(); + assertNoFailures(resp); + assertThat(resp.pointInTimeId(), equalTo(pitId)); + assertHitCount(resp, docCount); + + final Set allocatedNodes = internalCluster().nodesInclude(indexName); + for (String allocatedNode : allocatedNodes) { + internalCluster().restartNode(allocatedNode); + } + ensureGreen(indexName); + resp = client().prepareSearch() + .setIndices(indexName) + .setQuery(new RangeQueryBuilder("created_date").gte("2011-01-01").lte("2011-12-12")) + .setSearchType(SearchType.QUERY_THEN_FETCH) + .setPreference(null) + .setPreFilterShardSize(between(1, 10)) + .setAllowPartialSearchResults(true) + .setPointInTime(new PointInTimeBuilder(pitId)) + .get(); + assertNoFailures(resp); + assertThat(resp.pointInTimeId(), equalTo(pitId)); + assertHitCount(resp, docCount); + } finally { + client().execute(ClosePointInTimeAction.INSTANCE, new ClosePointInTimeRequest(pitId)).actionGet(); + } + } } From 9f175531cf092cabcf93737b8bd7fa7b77a9e3b6 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sun, 10 Jan 2021 17:04:24 -0500 Subject: [PATCH 2/4] fix tests --- .../action/search/TransportSearchActionTests.java | 2 +- .../xpack/searchablesnapshots/RetrySearchIntegTests.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 7272c86ef862d..948519a857a1a 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -975,7 +975,7 @@ public void testLocalShardIteratorFromPointInTime() { clusterState, OriginalIndices.NONE, null, - new SearchContextId(contexts, Map.of()), + new SearchContextId(contexts, Collections.emptyMap()), keepAlive ); shardIterators.sort(Comparator.comparing(SearchShardIterator::shardId)); diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java index a750b668a004b..3fd1af3eec04c 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java @@ -113,12 +113,12 @@ public void testRetryPointInTime() throws Exception { .indices() .prepareCreate(indexName) .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)).build()) - .setMapping("{\"properties\":{\"created_date\":{\"type\": \"date\", \"format\": \"yyyy-MM-dd\"}}}") + .addMapping("_doc", "created_date", "type=date, format=yyyy-MM-dd") ); final List indexRequestBuilders = new ArrayList<>(); final int docCount = between(0, 100); for (int i = 0; i < docCount; i++) { - indexRequestBuilders.add(client().prepareIndex(indexName).setSource("created_date", "2011-02-02")); + indexRequestBuilders.add(client().prepareIndex(indexName, "_doc").setSource("created_date", "2011-02-02")); } indexRandom(true, false, indexRequestBuilders); assertThat( @@ -131,7 +131,7 @@ public void testRetryPointInTime() throws Exception { final String repositoryName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); createRepository(repositoryName, "fs"); - final SnapshotId snapshotOne = createSnapshot(repositoryName, "snapshot-1", List.of(indexName)).snapshotId(); + final SnapshotId snapshotOne = createSnapshot(repositoryName, "snapshot-1", Collections.singletonList(indexName)).snapshotId(); assertAcked(client().admin().indices().prepareDelete(indexName)); final int numberOfReplicas = between(0, 2); From b7501e8cdefd89838d319a1c21f937e3506adcad Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sun, 10 Jan 2021 18:47:47 -0500 Subject: [PATCH 3/4] fix doc --- docs/reference/search/point-in-time.asciidoc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/reference/search/point-in-time.asciidoc b/docs/reference/search/point-in-time.asciidoc index a79ca0f3ad4a7..2393e4510bfbd 100644 --- a/docs/reference/search/point-in-time.asciidoc +++ b/docs/reference/search/point-in-time.asciidoc @@ -35,7 +35,7 @@ POST /_search <1> } }, "pit": { - "id": "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWICBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <2> + "id": "48myAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwEAA2lkeAV1dWlkMQIGbm9kZV8xAAAAAAAAAAABAWEBAANpZHkFdXVpZDIqBm5vZGVfMgAAAAAAAAAADAFiAQACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <2> "keep_alive": "1m" <3> } } @@ -95,7 +95,7 @@ as soon as they are no longer used in search requests. --------------------------------------- DELETE /_pit { - "id" : "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWIBBXV1aWQyAAA=" + "id" : "48myAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwEAA2lkeAV1dWlkMQIGbm9kZV8xAAAAAAAAAAABAWEBAANpZHkFdXVpZDIqBm5vZGVfMgAAAAAAAAAADAFiAQACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==" } --------------------------------------- // TEST[catch:missing] From f52ee90d1f990ee44d574cdfe20cbee28a992bfb Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sun, 10 Jan 2021 20:18:01 -0500 Subject: [PATCH 4/4] fix space --- .../xpack/searchablesnapshots/RetrySearchIntegTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java index 3fd1af3eec04c..0a3d2dbe6fa00 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java @@ -113,7 +113,7 @@ public void testRetryPointInTime() throws Exception { .indices() .prepareCreate(indexName) .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)).build()) - .addMapping("_doc", "created_date", "type=date, format=yyyy-MM-dd") + .addMapping("_doc", "created_date", "type=date,format=yyyy-MM-dd") ); final List indexRequestBuilders = new ArrayList<>(); final int docCount = between(0, 100);