Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/reference/search/point-in-time-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ POST /_search <1>
}
},
"pit": {
"id": "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWICBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <2>
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <2>
"keep_alive": "1m" <3>
}
}
Expand Down Expand Up @@ -99,7 +99,7 @@ as soon as they are no longer used in search requests.
---------------------------------------
DELETE /_pit
{
"id" : "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWIBBXV1aWQyAAA="
"id" : "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="
}
---------------------------------------
// TEST[catch:missing]
Expand Down
4 changes: 2 additions & 2 deletions docs/reference/search/point-in-time.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ POST /_search <1>
}
},
"pit": {
"id": "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWICBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <2>
"id": "48myAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwEAA2lkeAV1dWlkMQIGbm9kZV8xAAAAAAAAAAABAWEBAANpZHkFdXVpZDIqBm5vZGVfMgAAAAAAAAAADAFiAQACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <2>
"keep_alive": "1m" <3>
}
}
Expand Down Expand Up @@ -95,7 +95,7 @@ as soon as they are no longer used in search requests.
---------------------------------------
DELETE /_pit
{
"id" : "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWIBBXV1aWQyAAA="
"id" : "48myAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwEAA2lkeAV1dWlkMQIGbm9kZV8xAAAAAAAAAAABAWEBAANpZHkFdXVpZDIqBm5vZGVfMgAAAAAAAAAADAFiAQACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="
}
---------------------------------------
// TEST[catch:missing]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ The API returns a PIT ID.
[source,console-result]
----
{
"id": "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWICBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="
}
----
// TESTRESPONSE[s/"id": "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWICBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="/"id": $body.id/]
// TESTRESPONSE[s/"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="/"id": $body.id/]

To get the first page of results, submit a search request with a `sort`
argument. If using a PIT, specify the PIT ID in the `pit.id` parameter and omit
Expand All @@ -86,7 +86,7 @@ GET /_search
}
},
"pit": {
"id": "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWICBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
"keep_alive": "1m"
},
"sort": [ <2>
Expand All @@ -106,7 +106,7 @@ a PIT, the response's `pit_id` parameter contains an updated PIT ID.
[source,console-result]
----
{
"pit_id" : "46ToAwEPbXktaW5kZXgtMDAwMDAxFnVzaTVuenpUVGQ2TFNheUxVUG5LVVEAFldicVdzOFFtVHZTZDFoWWowTGkwS0EAAAAAAAAAAAQURzZzcUszUUJ5U1NMX3Jyak5ET0wBFnVzaTVuenpUVGQ2TFNheUxVUG5LVVEAAA==", <1>
"pit_id" : "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
"took" : 17,
"timed_out" : false,
"_shards" : ...,
Expand Down Expand Up @@ -150,7 +150,7 @@ GET /_search
}
},
"pit": {
"id": "46ToAwEPbXktaW5kZXgtMDAwMDAxFnVzaTVuenpUVGQ2TFNheUxVUG5LVVEAFldicVdzOFFtVHZTZDFoWWowTGkwS0EAAAAAAAAAAAQURzZzcUszUUJ5U1NMX3Jyak5ET0wBFnVzaTVuenpUVGQ2TFNheUxVUG5LVVEAAA==", <1>
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
"keep_alive": "1m"
},
"sort": [
Expand Down Expand Up @@ -178,7 +178,7 @@ When you're finished, you should delete your PIT.
----
DELETE /_pit
{
"id" : "46ToAwEPbXktaW5kZXgtMDAwMDAxFnVzaTVuenpUVGQ2TFNheUxVUG5LVVEAFldicVdzOFFtVHZTZDFoWWowTGkwS0EAAAAAAAAAAAQURzZzcUszUUJ5U1NMX3Jyak5ET0wBFnVzaTVuenpUVGQ2TFNheUxVUG5LVVEAAA=="
"id" : "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="
}
----
// TEST[catch:missing]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,14 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchContextMissingException;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchContextId;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.transport.Transport;
Expand Down Expand Up @@ -478,7 +481,7 @@ public final void onShardFailure(final int shardIndex, SearchShardTarget shardTa
} else {
// the failure is already present, try and not override it with an exception that is less meaningless
// for example, getting illegal shard state
if (TransportActions.isReadOverrideException(e)) {
if (TransportActions.isReadOverrideException(e) && (e instanceof SearchContextMissingException == false)) {
shardFailures.set(shardIndex, new ShardSearchFailure(e, shardTarget));
}
}
Expand Down Expand Up @@ -567,6 +570,16 @@ public final SearchRequest getRequest() {
return request;
}

@Override
public boolean isPartOfPointInTime(ShardSearchContextId contextId) {
final PointInTimeBuilder pointInTimeBuilder = request.pointInTimeBuilder();
if (pointInTimeBuilder != null) {
return request.pointInTimeBuilder().getSearchContextId(searchTransportService.getNamedWriteableRegistry()).contains(contextId);
} else {
return false;
}
}

protected final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, ShardSearchFailure[] failures,
String scrollId, String searchContextId) {
int numSuccess = successfulOps.get();
Expand Down Expand Up @@ -598,7 +611,7 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At
searchContextId = SearchContextId.encode(queryResults.asList(), aliasFilter, minNodeVersion);
} else {
if (request.source() != null && request.source().pointInTimeBuilder() != null) {
searchContextId = request.source().pointInTimeBuilder().getId();
searchContextId = request.source().pointInTimeBuilder().getEncodedId();
} else {
searchContextId = null;
}
Expand All @@ -619,21 +632,19 @@ public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause)
* @param exception the exception explaining or causing the phase failure
*/
private void raisePhaseFailure(SearchPhaseExecutionException exception) {
// we don't release persistent readers (point in time).
if (request.pointInTimeBuilder() == null) {
results.getSuccessfulResults().forEach((entry) -> {
if (entry.getContextId() != null) {
try {
SearchShardTarget searchShardTarget = entry.getSearchShardTarget();
Transport.Connection connection = getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
sendReleaseSearchContext(entry.getContextId(), connection, searchShardTarget.getOriginalIndices());
} catch (Exception inner) {
inner.addSuppressed(exception);
logger.trace("failed to release context", inner);
}
results.getSuccessfulResults().forEach((entry) -> {
// Do not release search contexts that are part of the point in time
if (entry.getContextId() != null && isPartOfPointInTime(entry.getContextId()) == false) {
try {
SearchShardTarget searchShardTarget = entry.getSearchShardTarget();
Transport.Connection connection = getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
sendReleaseSearchContext(entry.getContextId(), connection, searchShardTarget.getOriginalIndices());
} catch (Exception inner) {
inner.addSuppressed(exception);
logger.trace("failed to release context", inner);
}
});
}
}
});
Releasables.close(releasables);
listener.onFailure(exception);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void onFailure(Exception exception) {
progressListener.notifyQueryFailure(shardIndex, searchShardTarget, exception);
counter.onFailure(shardIndex, searchShardTarget, exception);
} finally {
if (context.getRequest().pointInTimeBuilder() == null) {
if (context.isPartOfPointInTime(querySearchRequest.contextId()) == false) {
// the query might not have been executed at all (for example because thread pool rejected
// execution) and the search context that was created in dfs phase might not be released.
// release it again to be in the safe side
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ private void releaseIrrelevantSearchContext(QuerySearchResult queryResult) {
// or using a PIT and if it has at least one hit that didn't make it to the global topDocs
if (queryResult.hasSearchContext()
&& context.getRequest().scroll() == null
&& context.getRequest().pointInTimeBuilder() == null) {
&& (context.isPartOfPointInTime(queryResult.getContextId()) == false)) {
try {
SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget();
Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ShardSearchContextId;
import org.elasticsearch.transport.RemoteClusterAware;

import java.io.IOException;
Expand All @@ -43,14 +44,17 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class SearchContextId {
public final class SearchContextId {
private final Map<ShardId, SearchContextIdForNode> shards;
private final Map<String, AliasFilter> aliasFilter;
private transient Set<ShardSearchContextId> contextIds;

private SearchContextId(Map<ShardId, SearchContextIdForNode> shards, Map<String, AliasFilter> aliasFilter) {
SearchContextId(Map<ShardId, SearchContextIdForNode> shards, Map<String, AliasFilter> aliasFilter) {
this.shards = shards;
this.aliasFilter = aliasFilter;
this.contextIds = shards.values().stream().map(SearchContextIdForNode::getSearchContextId).collect(Collectors.toSet());
}

public Map<ShardId, SearchContextIdForNode> shards() {
Expand All @@ -61,6 +65,10 @@ public Map<String, AliasFilter> aliasFilter() {
return aliasFilter;
}

public boolean contains(ShardSearchContextId contextId) {
return contextIds.contains(contextId);
}

public static String encode(List<SearchPhaseResult> searchPhaseResults, Map<String, AliasFilter> aliasFilter, Version version) {
final Map<ShardId, SearchContextIdForNode> shards = new HashMap<>();
for (SearchPhaseResult searchPhaseResult : searchPhaseResults) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ interface SearchPhaseContext extends Executor {
*/
SearchRequest getRequest();

/**
* Checks if the given context id is part of the point in time of this search (if exists).
* We should not release search contexts that belong to the point in time during or after searches.
*/
boolean isPartOfPointInTime(ShardSearchContextId contextId);

/**
* Builds and sends the final search response back to the user.
*
Expand Down Expand Up @@ -108,6 +114,7 @@ interface SearchPhaseContext extends Executor {
default void sendReleaseSearchContext(ShardSearchContextId contextId,
Transport.Connection connection,
OriginalIndices originalIndices) {
assert isPartOfPointInTime(contextId) == false : "Must not release point in time context [" + contextId + "]";
if (connection != null) {
getSearchTransport().sendFreeContext(connection, contextId, originalIndices);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -438,4 +439,8 @@ public void cancelSearchTask(SearchTask task, String reason) {
// force the origin to execute the cancellation as a system user
new OriginSettingClient(client, GetTaskAction.TASKS_ORIGIN).admin().cluster().cancelTasks(req, ActionListener.wrap(() -> {}));
}

public NamedWriteableRegistry getNamedWriteableRegistry() {
return client.getNamedWriteableRegistry();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -284,7 +285,7 @@ private void executeRequest(Task task, SearchRequest searchRequest,
final SearchContextId searchContext;
final Map<String, OriginalIndices> remoteClusterIndices;
if (searchRequest.pointInTimeBuilder() != null) {
searchContext = SearchContextId.decode(namedWriteableRegistry, searchRequest.pointInTimeBuilder().getId());
searchContext = searchRequest.pointInTimeBuilder().getSearchContextId(namedWriteableRegistry);
remoteClusterIndices = getIndicesFromSearchContexts(searchContext, searchRequest.indicesOptions());
} else {
searchContext = null;
Expand Down Expand Up @@ -581,7 +582,15 @@ static List<SearchShardIterator> getRemoteShardsIteratorFromPointInTime(Map<Stri
final String clusterAlias = entry.getKey();
final SearchContextIdForNode perNode = searchContextId.shards().get(shardId);
assert clusterAlias.equals(perNode.getClusterAlias()) : clusterAlias + " != " + perNode.getClusterAlias();
final List<String> targetNodes = Collections.singletonList(perNode.getNode());
final List<String> targetNodes = new ArrayList<>(group.getShards().length);
targetNodes.add(perNode.getNode());
if (perNode.getSearchContextId().getSearcherId() != null) {
for (ShardRouting shard : group.getShards()) {
if (shard.currentNodeId().equals(perNode.getNode()) == false) {
targetNodes.add(shard.currentNodeId());
}
}
}
SearchShardIterator shardIterator = new SearchShardIterator(clusterAlias, shardId, targetNodes,
remoteClusterIndices.get(clusterAlias), perNode.getSearchContextId(), searchContextKeepAlive);
remoteShardIterators.add(shardIterator);
Expand Down Expand Up @@ -915,8 +924,16 @@ static List<SearchShardIterator> getLocalLocalShardsIteratorFromPointInTime(Clus
final SearchContextIdForNode perNode = entry.getValue();
if (Strings.isEmpty(perNode.getClusterAlias())) {
final ShardId shardId = entry.getKey();
OperationRouting.getShards(clusterState, shardId);
final List<String> targetNodes = Collections.singletonList(perNode.getNode());
final ShardIterator shards = OperationRouting.getShards(clusterState, shardId);
final List<String> targetNodes = new ArrayList<>(shards.size());
targetNodes.add(perNode.getNode());
if (perNode.getSearchContextId().getSearcherId() != null) {
for (ShardRouting shard : shards) {
if (shard.currentNodeId().equals(perNode.getNode()) == false) {
targetNodes.add(shard.currentNodeId());
}
}
}
iterators.add(new SearchShardIterator(localClusterAlias, shardId, targetNodes, originalIndices,
perNode.getSearchContextId(), keepAlive));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ static void preparePointInTime(SearchRequest request, RestRequest restRequest, N
indicesOptions.ignoreUnavailable(), indicesOptions.allowNoIndices(), false, false, false,
true, true, indicesOptions.ignoreThrottled());
request.indicesOptions(stricterIndicesOptions);
final SearchContextId searchContextId = SearchContextId.decode(namedWriteableRegistry, request.pointInTimeBuilder().getId());
final SearchContextId searchContextId = request.pointInTimeBuilder().getSearchContextId(namedWriteableRegistry);
request.indices(searchContextId.getActualIndices());
}

Expand Down
Loading