Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,21 +103,23 @@ public void onFailure(Exception e) {
}

void cleanScrollIds(List<ScrollIdForNode> parsedScrollIds) {
for (ScrollIdForNode target : parsedScrollIds) {
final DiscoveryNode node = nodes.get(target.getNode());
if (node == null) {
onFreedContext(false);
} else {
try {
Transport.Connection connection = searchTransportService.getConnection(null, node);
searchTransportService.sendFreeContext(connection, target.getScrollId(),
ActionListener.wrap(freed -> onFreedContext(freed.isFreed()),
e -> onFailedFreedContext(e, node)));
} catch (Exception e) {
onFailedFreedContext(e, node);
SearchScrollAsyncAction.collectNodesAndRun(parsedScrollIds, nodes, searchTransportService, ActionListener.wrap(
lookup -> {
for (ScrollIdForNode target : parsedScrollIds) {
final DiscoveryNode node = lookup.apply(target.getClusterAlias(), target.getNode());
if (node == null) {
onFreedContext(false);
} else {
try {
Transport.Connection connection = searchTransportService.getConnection(target.getClusterAlias(), node);
searchTransportService.sendFreeContext(connection, target.getScrollId(),
ActionListener.wrap(freed -> onFreedContext(freed.isFreed()), e -> onFailedFreedContext(e, node)));
} catch (Exception e) {
onFailedFreedContext(e, node);
}
}
}
}
}
}, listener::onFailure));
}

private void onFreedContext(boolean freed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,37 @@

package org.elasticsearch.action.search;

import org.elasticsearch.common.inject.internal.Nullable;

class ScrollIdForNode {
private final String node;
private final long scrollId;
private final String clusterAlias;

ScrollIdForNode(String node, long scrollId) {
ScrollIdForNode(@Nullable String clusterAlias, String node, long scrollId) {
this.node = node;
this.clusterAlias = clusterAlias;
this.scrollId = scrollId;
}

public String getNode() {
return node;
}

public String getClusterAlias() {
return clusterAlias;
}

public long getScrollId() {
return scrollId;
}

@Override
public String toString() {
return "ScrollIdForNode{" +
"node='" + node + '\'' +
", scrollId=" + scrollId +
", clusterAlias='" + clusterAlias + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,17 @@
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.Transport;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;

import static org.elasticsearch.action.search.TransportSearchHelper.internalScrollSearchRequest;

Expand Down Expand Up @@ -67,13 +73,15 @@ abstract class SearchScrollAsyncAction<T extends SearchPhaseResult> implements R
protected final DiscoveryNodes nodes;
protected final SearchPhaseController searchPhaseController;
protected final SearchScrollRequest request;
protected final SearchTransportService searchTransportService;
private final long startTime;
private final List<ShardSearchFailure> shardFailures = new ArrayList<>();
private final AtomicInteger successfulOps;

protected SearchScrollAsyncAction(ParsedScrollId scrollId, Logger logger, DiscoveryNodes nodes,
ActionListener<SearchResponse> listener, SearchPhaseController searchPhaseController,
SearchScrollRequest request) {
SearchScrollRequest request,
SearchTransportService searchTransportService) {
this.startTime = System.currentTimeMillis();
this.scrollId = scrollId;
this.successfulOps = new AtomicInteger(scrollId.getContext().length);
Expand All @@ -82,6 +90,7 @@ protected SearchScrollAsyncAction(ParsedScrollId scrollId, Logger logger, Discov
this.nodes = nodes;
this.searchPhaseController = searchPhaseController;
this.request = request;
this.searchTransportService = searchTransportService;
}

/**
Expand All @@ -97,57 +106,104 @@ public final void run() {
final ScrollIdForNode[] context = scrollId.getContext();
if (context.length == 0) {
listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", ShardSearchFailure.EMPTY_ARRAY));
return;
} else {
collectNodesAndRun(Arrays.asList(context), nodes, searchTransportService, ActionListener.wrap(lookup -> run(lookup, context),
listener::onFailure));
}
}

/**
* This method collects nodes from the remote clusters asynchronously if any of the scroll IDs references a remote cluster.
* Otherwise the action listener will be invoked immediately with a function based on the given discovery nodes.
*/
static void collectNodesAndRun(final Iterable<ScrollIdForNode> scrollIds, DiscoveryNodes nodes,
SearchTransportService searchTransportService,
ActionListener<BiFunction<String, String, DiscoveryNode>> listener) {
Set<String> clusters = new HashSet<>();
for (ScrollIdForNode target : scrollIds) {
if (target.getClusterAlias() != null) {
clusters.add(target.getClusterAlias());
}
}
if (clusters.isEmpty()) { // no remote clusters
listener.onResponse((cluster, node) -> nodes.get(node));
} else {
RemoteClusterService remoteClusterService = searchTransportService.getRemoteClusterService();
remoteClusterService.collectNodes(clusters, ActionListener.wrap(nodeFunction -> {
final BiFunction<String, String, DiscoveryNode> clusterNodeLookup = (clusterAlias, node) -> {
if (clusterAlias == null) {
return nodes.get(node);
} else {
return nodeFunction.apply(clusterAlias, node);
}
};
listener.onResponse(clusterNodeLookup);
}, listener::onFailure));
}
}

private void run(BiFunction<String, String, DiscoveryNode> clusterNodeLookup, final ScrollIdForNode[] context) {
final CountDown counter = new CountDown(scrollId.getContext().length);
for (int i = 0; i < context.length; i++) {
ScrollIdForNode target = context[i];
DiscoveryNode node = nodes.get(target.getNode());
final int shardIndex = i;
if (node != null) { // it might happen that a node is going down in-between scrolls...
InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(target.getScrollId(), request);
// we can't create a SearchShardTarget here since we don't know the index and shard ID we are talking to
// we only know the node and the search context ID. Yet, the response will contain the SearchShardTarget
// from the target node instead...that's why we pass null here
SearchActionListener<T> searchActionListener = new SearchActionListener<T>(null, shardIndex) {

@Override
protected void setSearchShardTarget(T response) {
// don't do this - it's part of the response...
assert response.getSearchShardTarget() != null : "search shard target must not be null";
final Transport.Connection connection;
try {
DiscoveryNode node = clusterNodeLookup.apply(target.getClusterAlias(), target.getNode());
if (node == null) {
throw new IllegalStateException("node [" + target.getNode() + "] is not available");
}
connection = getConnection(target.getClusterAlias(), node);
} catch (Exception ex) {
onShardFailure("query", counter, target.getScrollId(),
ex, null, () -> SearchScrollAsyncAction.this.moveToNextPhase(clusterNodeLookup));
continue;
}
final InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(target.getScrollId(), request);
// we can't create a SearchShardTarget here since we don't know the index and shard ID we are talking to
// we only know the node and the search context ID. Yet, the response will contain the SearchShardTarget
// from the target node instead...that's why we pass null here
SearchActionListener<T> searchActionListener = new SearchActionListener<T>(null, shardIndex) {

@Override
protected void setSearchShardTarget(T response) {
// don't do this - it's part of the response...
assert response.getSearchShardTarget() != null : "search shard target must not be null";
if (target.getClusterAlias() != null) {
// re-create the search target and add the cluster alias if there is any,
// we need this down the road for subseq. phases
SearchShardTarget searchShardTarget = response.getSearchShardTarget();
response.setSearchShardTarget(new SearchShardTarget(searchShardTarget.getNodeId(), searchShardTarget.getShardId(),
target.getClusterAlias(), null));
}
}

@Override
protected void innerOnResponse(T result) {
assert shardIndex == result.getShardIndex() : "shard index mismatch: " + shardIndex + " but got: "
+ result.getShardIndex();
onFirstPhaseResult(shardIndex, result);
if (counter.countDown()) {
SearchPhase phase = moveToNextPhase();
try {
phase.run();
} catch (Exception e) {
// we need to fail the entire request here - the entire phase just blew up
// don't call onShardFailure or onFailure here since otherwise we'd countDown the counter
// again which would result in an exception
listener.onFailure(new SearchPhaseExecutionException(phase.getName(), "Phase failed", e,
ShardSearchFailure.EMPTY_ARRAY));
}
@Override
protected void innerOnResponse(T result) {
assert shardIndex == result.getShardIndex() : "shard index mismatch: " + shardIndex + " but got: "
+ result.getShardIndex();
onFirstPhaseResult(shardIndex, result);
if (counter.countDown()) {
SearchPhase phase = moveToNextPhase(clusterNodeLookup);
try {
phase.run();
} catch (Exception e) {
// we need to fail the entire request here - the entire phase just blew up
// don't call onShardFailure or onFailure here since otherwise we'd countDown the counter
// again which would result in an exception
listener.onFailure(new SearchPhaseExecutionException(phase.getName(), "Phase failed", e,
ShardSearchFailure.EMPTY_ARRAY));
}
}
}

@Override
public void onFailure(Exception t) {
onShardFailure("query", shardIndex, counter, target.getScrollId(), t, null,
SearchScrollAsyncAction.this::moveToNextPhase);
}
};
executeInitialPhase(node, internalRequest, searchActionListener);
} else { // the node is not available we treat this as a shard failure here
onShardFailure("query", shardIndex, counter, target.getScrollId(),
new IllegalStateException("node [" + target.getNode() + "] is not available"), null,
SearchScrollAsyncAction.this::moveToNextPhase);
}
@Override
public void onFailure(Exception t) {
onShardFailure("query", counter, target.getScrollId(), t, null,
() -> SearchScrollAsyncAction.this.moveToNextPhase(clusterNodeLookup));
}
};
executeInitialPhase(connection, internalRequest, searchActionListener);
}
}

Expand All @@ -164,10 +220,10 @@ private synchronized void addShardFailure(ShardSearchFailure failure) {
shardFailures.add(failure);
}

protected abstract void executeInitialPhase(DiscoveryNode node, InternalScrollSearchRequest internalRequest,
protected abstract void executeInitialPhase(Transport.Connection connection, InternalScrollSearchRequest internalRequest,
SearchActionListener<T> searchActionListener);

protected abstract SearchPhase moveToNextPhase();
protected abstract SearchPhase moveToNextPhase(BiFunction<String, String, DiscoveryNode> clusterNodeLookup);

protected abstract void onFirstPhaseResult(int shardId, T result);

Expand Down Expand Up @@ -199,9 +255,9 @@ protected final void sendResponse(SearchPhaseController.ReducedQueryPhase queryP
}
}

protected void onShardFailure(String phaseName, final int shardIndex, final CountDown counter, final long searchId, Exception failure,
@Nullable SearchShardTarget searchShardTarget,
Supplier<SearchPhase> nextPhaseSupplier) {
protected void onShardFailure(String phaseName, final CountDown counter, final long searchId, Exception failure,
@Nullable SearchShardTarget searchShardTarget,
Supplier<SearchPhase> nextPhaseSupplier) {
if (logger.isDebugEnabled()) {
logger.debug((Supplier<?>) () -> new ParameterizedMessage("[{}] Failed to execute {} phase", searchId, phaseName), failure);
}
Expand All @@ -223,4 +279,8 @@ protected void onShardFailure(String phaseName, final int shardIndex, final Coun
}
}
}

protected Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) {
return searchTransportService.getConnection(clusterAlias, node);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,50 +20,38 @@
package org.elasticsearch.action.search;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult;
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.query.ScrollQuerySearchResult;
import org.elasticsearch.transport.Transport;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import static org.elasticsearch.action.search.TransportSearchHelper.internalScrollSearchRequest;
import java.util.function.BiFunction;

final class SearchScrollQueryAndFetchAsyncAction extends SearchScrollAsyncAction<ScrollQueryFetchSearchResult> {

private final SearchTransportService searchTransportService;
private final SearchTask task;
private final AtomicArray<QueryFetchSearchResult> queryFetchResults;

SearchScrollQueryAndFetchAsyncAction(Logger logger, ClusterService clusterService, SearchTransportService searchTransportService,
SearchPhaseController searchPhaseController, SearchScrollRequest request, SearchTask task,
ParsedScrollId scrollId, ActionListener<SearchResponse> listener) {
super(scrollId, logger, clusterService.state().nodes(), listener, searchPhaseController, request);
super(scrollId, logger, clusterService.state().nodes(), listener, searchPhaseController, request, searchTransportService);
this.task = task;
this.searchTransportService = searchTransportService;
this.queryFetchResults = new AtomicArray<>(scrollId.getContext().length);
}

@Override
protected void executeInitialPhase(DiscoveryNode node, InternalScrollSearchRequest internalRequest,
protected void executeInitialPhase(Transport.Connection connection, InternalScrollSearchRequest internalRequest,
SearchActionListener<ScrollQueryFetchSearchResult> searchActionListener) {
searchTransportService.sendExecuteScrollFetch(node, internalRequest, task, searchActionListener);
searchTransportService.sendExecuteScrollFetch(connection, internalRequest, task, searchActionListener);
}

@Override
protected SearchPhase moveToNextPhase() {
protected SearchPhase moveToNextPhase(BiFunction<String, String, DiscoveryNode> clusterNodeLookup) {
return sendResponsePhase(searchPhaseController.reducedQueryPhase(queryFetchResults.asList(), true), queryFetchResults);
}

Expand Down
Loading