From 2d8335313ed1250c7651e0f4c70763051c5be201 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 6 Jun 2017 18:24:03 +0200 Subject: [PATCH 01/11] Add Cross Cluster Search support for scroll searches To complete the cross cluster search capabilities for all search types and function this change adds cross cluster search support for scroll searches. --- .../action/search/ClearScrollController.java | 140 ++++++++++ .../action/search/ScrollIdForNode.java | 19 +- .../search/SearchScrollAsyncAction.java | 154 +++++++---- .../SearchScrollQueryAndFetchAsyncAction.java | 24 +- ...SearchScrollQueryThenFetchAsyncAction.java | 25 +- .../action/search/SearchTransportService.java | 18 +- .../search/TransportClearScrollAction.java | 111 +------- .../action/search/TransportSearchHelper.java | 20 +- .../action/support/GroupedActionListener.java | 8 +- .../transport/RemoteClusterConnection.java | 50 ++++ .../transport/RemoteClusterService.java | 51 +++- .../search/ClearScrollControllerTests.java | 253 ++++++++++++++++++ .../action/search/SearchAsyncActionTests.java | 2 +- .../search/SearchScrollAsyncActionTests.java | 136 ++++++---- .../search/TransportSearchHelperTests.java | 64 +++++ .../RemoteClusterConnectionTests.java | 45 +++- .../transport/RemoteClusterServiceTests.java | 109 ++++++++ .../test/multi_cluster/40_scroll.yml | 40 +++ 18 files changed, 1002 insertions(+), 267 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/action/search/ClearScrollController.java create mode 100644 core/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java create mode 100644 core/src/test/java/org/elasticsearch/action/search/TransportSearchHelperTests.java create mode 100644 qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/40_scroll.yml diff --git a/core/src/main/java/org/elasticsearch/action/search/ClearScrollController.java b/core/src/main/java/org/elasticsearch/action/search/ClearScrollController.java new file mode 100644 index 0000000000000..45a879d5c3a13 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/search/ClearScrollController.java @@ -0,0 +1,140 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.search; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.logging.log4j.util.Supplier; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportResponse; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.elasticsearch.action.search.TransportSearchHelper.parseScrollId; + +final class ClearScrollController implements Runnable { + private final DiscoveryNodes nodes; + private final SearchTransportService searchTransportService; + private final CountDown expectedOps; + private final ActionListener listener; + private final AtomicBoolean hasFailed = new AtomicBoolean(false); + private final AtomicInteger freedSearchContexts = new AtomicInteger(0); + private final Logger logger; + private final Runnable runner; + + public ClearScrollController(ClearScrollRequest request, final ActionListener listener, + DiscoveryNodes nodes, Logger logger, SearchTransportService searchTransportService) { + this.nodes = nodes; + this.logger = logger; + this.searchTransportService = searchTransportService; + this.listener = listener; + List scrollIds = request.getScrollIds(); + final int expectedOps; + if (scrollIds.size() == 1 && "_all".equals(scrollIds.get(0))) { + expectedOps = nodes.getSize(); + runner = this::cleanAllScrolls; + } else { + List parsedScrollIds = new ArrayList<>(); + for (String parsedScrollId : request.getScrollIds()) { + ScrollIdForNode[] context = parseScrollId(parsedScrollId).getContext(); + for (ScrollIdForNode id : context) { + parsedScrollIds.add(id); + } + } + if (parsedScrollIds.isEmpty()) { + expectedOps = 0; + runner = () -> listener.onResponse(new ClearScrollResponse(true, 0)); + } else { + expectedOps = parsedScrollIds.size(); + runner = () -> cleanScrollIds(parsedScrollIds); + } + } + this.expectedOps = new CountDown(expectedOps); + + } + + @Override + public void run() { + runner.run(); + } + + void cleanAllScrolls() { + for (final DiscoveryNode node : nodes) { + searchTransportService.sendClearAllScrollContexts(node, new ActionListener() { + @Override + public void onResponse(TransportResponse response) { + onFreedContext(true); + } + + @Override + public void onFailure(Exception e) { + onFailedFreedContext(e, node); + } + }); + } + } + + void cleanScrollIds(List parsedScrollIds) { + SearchScrollAsyncAction.collectNodesAndRun(parsedScrollIds, nodes, searchTransportService, ActionListener.wrap( + lookup -> { + for (ScrollIdForNode target : parsedScrollIds) { + final DiscoveryNode node = lookup.apply(target.getClusterAlias(), target.getNode()); + if (node == null) { + onFreedContext(false); + } else { + try { + Transport.Connection connection = searchTransportService.getConnection(target.getClusterAlias(), + node); + searchTransportService.sendFreeContext(connection, target.getScrollId(), + ActionListener.wrap(freed -> onFreedContext(freed.isFreed()), + e -> onFailedFreedContext(e, node))); + } catch (Exception e) { + onFailedFreedContext(e, node); + } + } + } + }, listener::onFailure)); + } + + private void onFreedContext(boolean freed) { + if (freed) { + freedSearchContexts.incrementAndGet(); + } + if (expectedOps.countDown()) { + boolean succeeded = hasFailed.get() == false; + listener.onResponse(new ClearScrollResponse(succeeded, freedSearchContexts.get())); + } + } + + private void onFailedFreedContext(Throwable e, DiscoveryNode node) { + logger.warn((Supplier) () -> new ParameterizedMessage("Clear SC failed on node[{}]", node), e); + if (expectedOps.countDown()) { + listener.onResponse(new ClearScrollResponse(false, freedSearchContexts.get())); + } else { + hasFailed.set(true); + } + } +} diff --git a/core/src/main/java/org/elasticsearch/action/search/ScrollIdForNode.java b/core/src/main/java/org/elasticsearch/action/search/ScrollIdForNode.java index 76d4ac1141388..59e1a3310672b 100644 --- a/core/src/main/java/org/elasticsearch/action/search/ScrollIdForNode.java +++ b/core/src/main/java/org/elasticsearch/action/search/ScrollIdForNode.java @@ -19,12 +19,16 @@ package org.elasticsearch.action.search; +import org.elasticsearch.common.inject.internal.Nullable; + class ScrollIdForNode { private final String node; private final long scrollId; + private final String clusterAlias; - ScrollIdForNode(String node, long scrollId) { + ScrollIdForNode(@Nullable String clusterAlias, String node, long scrollId) { this.node = node; + this.clusterAlias = clusterAlias; this.scrollId = scrollId; } @@ -32,7 +36,20 @@ public String getNode() { return node; } + public String getClusterAlias() { + return clusterAlias; + } + public long getScrollId() { return scrollId; } + + @Override + public String toString() { + return "ScrollIdForNode{" + + "node='" + node + '\'' + + ", scrollId=" + scrollId + + ", clusterAlias='" + clusterAlias + '\'' + + '}'; + } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollAsyncAction.java index 5be511f558568..aa757a039b899 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollAsyncAction.java @@ -32,11 +32,17 @@ import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.InternalScrollSearchRequest; import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.Transport; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; import static org.elasticsearch.action.search.TransportSearchHelper.internalScrollSearchRequest; @@ -67,13 +73,15 @@ abstract class SearchScrollAsyncAction implements R protected final DiscoveryNodes nodes; protected final SearchPhaseController searchPhaseController; protected final SearchScrollRequest request; + protected final SearchTransportService searchTransportService; private final long startTime; private final List shardFailures = new ArrayList<>(); private final AtomicInteger successfulOps; protected SearchScrollAsyncAction(ParsedScrollId scrollId, Logger logger, DiscoveryNodes nodes, ActionListener listener, SearchPhaseController searchPhaseController, - SearchScrollRequest request) { + SearchScrollRequest request, + SearchTransportService searchTransportService) { this.startTime = System.currentTimeMillis(); this.scrollId = scrollId; this.successfulOps = new AtomicInteger(scrollId.getContext().length); @@ -82,6 +90,7 @@ protected SearchScrollAsyncAction(ParsedScrollId scrollId, Logger logger, Discov this.nodes = nodes; this.searchPhaseController = searchPhaseController; this.request = request; + this.searchTransportService = searchTransportService; } /** @@ -97,57 +106,104 @@ public final void run() { final ScrollIdForNode[] context = scrollId.getContext(); if (context.length == 0) { listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", ShardSearchFailure.EMPTY_ARRAY)); - return; + } else { + collectNodesAndRun(Arrays.asList(context), nodes, searchTransportService, ActionListener.wrap(lookup -> run(lookup, context), + listener::onFailure)); } + } + + /** + * This method collects nodes from the remote clusters asynchronously if any of the scroll IDs references a remote cluster. + * Otherwise the action listener will be invoked immediately with a function based on the given discovery nodes. + */ + static void collectNodesAndRun(final Iterable scrollIds, DiscoveryNodes nodes, + SearchTransportService searchTransportService, + ActionListener> listener) { + Set clusters = new HashSet<>(); + for (ScrollIdForNode target : scrollIds) { + if (target.getClusterAlias() != null) { + clusters.add(target.getClusterAlias()); + } + } + if (clusters.isEmpty()) { // no remote clusters + listener.onResponse((cluster, node) -> nodes.get(node)); + } else { + RemoteClusterService remoteClusterService = searchTransportService.getRemoteClusterService(); + remoteClusterService.collectNodes(clusters, ActionListener.wrap(nodeFunction -> { + final BiFunction clusterNodeLookup = (clusterAlias, node) -> { + if (clusterAlias == null) { + return nodes.get(node); + } else { + return nodeFunction.apply(clusterAlias, node); + } + }; + listener.onResponse(clusterNodeLookup); + }, listener::onFailure)); + } + } + + private void run(BiFunction clusterNodeLookup, final ScrollIdForNode[] context) { final CountDown counter = new CountDown(scrollId.getContext().length); for (int i = 0; i < context.length; i++) { ScrollIdForNode target = context[i]; - DiscoveryNode node = nodes.get(target.getNode()); final int shardIndex = i; - if (node != null) { // it might happen that a node is going down in-between scrolls... - InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(target.getScrollId(), request); - // we can't create a SearchShardTarget here since we don't know the index and shard ID we are talking to - // we only know the node and the search context ID. Yet, the response will contain the SearchShardTarget - // from the target node instead...that's why we pass null here - SearchActionListener searchActionListener = new SearchActionListener(null, shardIndex) { - - @Override - protected void setSearchShardTarget(T response) { - // don't do this - it's part of the response... - assert response.getSearchShardTarget() != null : "search shard target must not be null"; + final Transport.Connection connection; + try { + DiscoveryNode node = clusterNodeLookup.apply(target.getClusterAlias(), target.getNode()); + if (node == null) { + throw new IllegalStateException("node [" + target.getNode() + "] is not available"); + } + connection = getConnection(target.getClusterAlias(), node); + } catch (Exception ex) { + onShardFailure("query", counter, target.getScrollId(), + ex, null, () -> SearchScrollAsyncAction.this.moveToNextPhase(clusterNodeLookup)); + continue; + } + final InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(target.getScrollId(), request); + // we can't create a SearchShardTarget here since we don't know the index and shard ID we are talking to + // we only know the node and the search context ID. Yet, the response will contain the SearchShardTarget + // from the target node instead...that's why we pass null here + SearchActionListener searchActionListener = new SearchActionListener(null, shardIndex) { + + @Override + protected void setSearchShardTarget(T response) { + // don't do this - it's part of the response... + assert response.getSearchShardTarget() != null : "search shard target must not be null"; + if (target.getClusterAlias() != null) { + // re-create the search target and add the cluster alias if there is any, + // we need this down the road for subseq. phases + SearchShardTarget searchShardTarget = response.getSearchShardTarget(); + response.setSearchShardTarget(new SearchShardTarget(searchShardTarget.getNodeId(), searchShardTarget.getShardId(), + target.getClusterAlias(), null)); } + } - @Override - protected void innerOnResponse(T result) { - assert shardIndex == result.getShardIndex() : "shard index mismatch: " + shardIndex + " but got: " - + result.getShardIndex(); - onFirstPhaseResult(shardIndex, result); - if (counter.countDown()) { - SearchPhase phase = moveToNextPhase(); - try { - phase.run(); - } catch (Exception e) { - // we need to fail the entire request here - the entire phase just blew up - // don't call onShardFailure or onFailure here since otherwise we'd countDown the counter - // again which would result in an exception - listener.onFailure(new SearchPhaseExecutionException(phase.getName(), "Phase failed", e, - ShardSearchFailure.EMPTY_ARRAY)); - } + @Override + protected void innerOnResponse(T result) { + assert shardIndex == result.getShardIndex() : "shard index mismatch: " + shardIndex + " but got: " + + result.getShardIndex(); + onFirstPhaseResult(shardIndex, result); + if (counter.countDown()) { + SearchPhase phase = moveToNextPhase(clusterNodeLookup); + try { + phase.run(); + } catch (Exception e) { + // we need to fail the entire request here - the entire phase just blew up + // don't call onShardFailure or onFailure here since otherwise we'd countDown the counter + // again which would result in an exception + listener.onFailure(new SearchPhaseExecutionException(phase.getName(), "Phase failed", e, + ShardSearchFailure.EMPTY_ARRAY)); } } + } - @Override - public void onFailure(Exception t) { - onShardFailure("query", shardIndex, counter, target.getScrollId(), t, null, - SearchScrollAsyncAction.this::moveToNextPhase); - } - }; - executeInitialPhase(node, internalRequest, searchActionListener); - } else { // the node is not available we treat this as a shard failure here - onShardFailure("query", shardIndex, counter, target.getScrollId(), - new IllegalStateException("node [" + target.getNode() + "] is not available"), null, - SearchScrollAsyncAction.this::moveToNextPhase); - } + @Override + public void onFailure(Exception t) { + onShardFailure("query", counter, target.getScrollId(), t, null, + () -> SearchScrollAsyncAction.this.moveToNextPhase(clusterNodeLookup)); + } + }; + executeInitialPhase(connection, internalRequest, searchActionListener); } } @@ -164,10 +220,10 @@ private synchronized void addShardFailure(ShardSearchFailure failure) { shardFailures.add(failure); } - protected abstract void executeInitialPhase(DiscoveryNode node, InternalScrollSearchRequest internalRequest, + protected abstract void executeInitialPhase(Transport.Connection connection, InternalScrollSearchRequest internalRequest, SearchActionListener searchActionListener); - protected abstract SearchPhase moveToNextPhase(); + protected abstract SearchPhase moveToNextPhase(BiFunction clusterNodeLookup); protected abstract void onFirstPhaseResult(int shardId, T result); @@ -199,9 +255,9 @@ protected final void sendResponse(SearchPhaseController.ReducedQueryPhase queryP } } - protected void onShardFailure(String phaseName, final int shardIndex, final CountDown counter, final long searchId, Exception failure, - @Nullable SearchShardTarget searchShardTarget, - Supplier nextPhaseSupplier) { + protected void onShardFailure(String phaseName, final CountDown counter, final long searchId, Exception failure, + @Nullable SearchShardTarget searchShardTarget, + Supplier nextPhaseSupplier) { if (logger.isDebugEnabled()) { logger.debug((Supplier) () -> new ParameterizedMessage("[{}] Failed to execute {} phase", searchId, phaseName), failure); } @@ -223,4 +279,8 @@ protected void onShardFailure(String phaseName, final int shardIndex, final Coun } } } + + protected Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return searchTransportService.getConnection(clusterAlias, node); + } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java index 9270dfdd82a4b..7f36d71ae256b 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java @@ -20,50 +20,38 @@ package org.elasticsearch.action.search; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.logging.log4j.util.Supplier; -import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.common.util.concurrent.CountDown; -import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.fetch.QueryFetchSearchResult; import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult; import org.elasticsearch.search.internal.InternalScrollSearchRequest; -import org.elasticsearch.search.internal.InternalSearchResponse; -import org.elasticsearch.search.query.ScrollQuerySearchResult; +import org.elasticsearch.transport.Transport; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.elasticsearch.action.search.TransportSearchHelper.internalScrollSearchRequest; +import java.util.function.BiFunction; final class SearchScrollQueryAndFetchAsyncAction extends SearchScrollAsyncAction { - private final SearchTransportService searchTransportService; private final SearchTask task; private final AtomicArray queryFetchResults; SearchScrollQueryAndFetchAsyncAction(Logger logger, ClusterService clusterService, SearchTransportService searchTransportService, SearchPhaseController searchPhaseController, SearchScrollRequest request, SearchTask task, ParsedScrollId scrollId, ActionListener listener) { - super(scrollId, logger, clusterService.state().nodes(), listener, searchPhaseController, request); + super(scrollId, logger, clusterService.state().nodes(), listener, searchPhaseController, request, searchTransportService); this.task = task; - this.searchTransportService = searchTransportService; this.queryFetchResults = new AtomicArray<>(scrollId.getContext().length); } @Override - protected void executeInitialPhase(DiscoveryNode node, InternalScrollSearchRequest internalRequest, + protected void executeInitialPhase(Transport.Connection connection, InternalScrollSearchRequest internalRequest, SearchActionListener searchActionListener) { - searchTransportService.sendExecuteScrollFetch(node, internalRequest, task, searchActionListener); + searchTransportService.sendExecuteScrollFetch(connection, internalRequest, task, searchActionListener); } @Override - protected SearchPhase moveToNextPhase() { + protected SearchPhase moveToNextPhase(BiFunction clusterNodeLookup) { return sendResponsePhase(searchPhaseController.reducedQueryPhase(queryFetchResults.asList(), true), queryFetchResults); } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java index 963838b7a0acd..a964d1904edbb 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java @@ -27,28 +27,28 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.fetch.ShardFetchRequest; import org.elasticsearch.search.internal.InternalScrollSearchRequest; import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.ScrollQuerySearchResult; +import org.elasticsearch.transport.Transport; import java.io.IOException; - -import static org.elasticsearch.action.search.TransportSearchHelper.internalScrollSearchRequest; +import java.util.function.BiFunction; final class SearchScrollQueryThenFetchAsyncAction extends SearchScrollAsyncAction { private final SearchTask task; - private final SearchTransportService searchTransportService; private final AtomicArray fetchResults; private final AtomicArray queryResults; SearchScrollQueryThenFetchAsyncAction(Logger logger, ClusterService clusterService, SearchTransportService searchTransportService, SearchPhaseController searchPhaseController, SearchScrollRequest request, SearchTask task, ParsedScrollId scrollId, ActionListener listener) { - super(scrollId, logger, clusterService.state().nodes(), listener, searchPhaseController, request); - this.searchTransportService = searchTransportService; + super(scrollId, logger, clusterService.state().nodes(), listener, searchPhaseController, request, + searchTransportService); this.task = task; this.fetchResults = new AtomicArray<>(scrollId.getContext().length); this.queryResults = new AtomicArray<>(scrollId.getContext().length); @@ -59,13 +59,13 @@ protected void onFirstPhaseResult(int shardId, ScrollQuerySearchResult result) { } @Override - protected void executeInitialPhase(DiscoveryNode node, InternalScrollSearchRequest internalRequest, + protected void executeInitialPhase(Transport.Connection connection, InternalScrollSearchRequest internalRequest, SearchActionListener searchActionListener) { - searchTransportService.sendExecuteScrollQuery(node, internalRequest, task, searchActionListener); + searchTransportService.sendExecuteScrollQuery(connection, internalRequest, task, searchActionListener); } @Override - protected SearchPhase moveToNextPhase() { + protected SearchPhase moveToNextPhase(BiFunction clusterNodeLookup) { return new SearchPhase("fetch") { @Override public void run() throws IOException { @@ -89,8 +89,11 @@ public void run() throws IOException { ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[index]; ShardFetchRequest shardFetchRequest = new ShardFetchRequest(querySearchResult.getRequestId(), docIds, lastEmittedDoc); - DiscoveryNode node = nodes.get(querySearchResult.getSearchShardTarget().getNodeId()); - searchTransportService.sendExecuteFetchScroll(node, shardFetchRequest, task, + SearchShardTarget searchShardTarget = querySearchResult.getSearchShardTarget(); + DiscoveryNode node = clusterNodeLookup.apply(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId()); + assert node != null : "target node is null in secondary phase"; + Transport.Connection connection = getConnection(searchShardTarget.getClusterAlias(), node); + searchTransportService.sendExecuteFetchScroll(connection, shardFetchRequest, task, new SearchActionListener(querySearchResult.getSearchShardTarget(), index) { @Override protected void innerOnResponse(FetchSearchResult response) { @@ -102,7 +105,7 @@ protected void innerOnResponse(FetchSearchResult response) { @Override public void onFailure(Exception t) { - onShardFailure(getName(), querySearchResult.getShardIndex(), counter, querySearchResult.getRequestId(), + onShardFailure(getName(), counter, querySearchResult.getRequestId(), t, querySearchResult.getSearchShardTarget(), () -> sendResponsePhase(reducedQueryPhase, fetchResults)); } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 9dd2125d5e2fe..a55ea392fd30a 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -98,9 +98,9 @@ public void onFailure(Exception e) { }, SearchFreeContextResponse::new)); } - public void sendFreeContext(DiscoveryNode node, long contextId, final ActionListener listener) { - transportService.sendRequest(node, FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextRequest(contextId), - new ActionListenerResponseHandler<>(listener, SearchFreeContextResponse::new)); + public void sendFreeContext(Transport.Connection connection, long contextId, final ActionListener listener) { + transportService.sendRequest(connection, FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextRequest(contextId), + TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, SearchFreeContextResponse::new)); } public void sendClearAllScrollContexts(DiscoveryNode node, final ActionListener listener) { @@ -145,15 +145,15 @@ public void sendExecuteQuery(Transport.Connection connection, final QuerySearchR new ActionListenerResponseHandler<>(listener, QuerySearchResult::new)); } - public void sendExecuteScrollQuery(DiscoveryNode node, final InternalScrollSearchRequest request, SearchTask task, + public void sendExecuteScrollQuery(Transport.Connection connection, final InternalScrollSearchRequest request, SearchTask task, final SearchActionListener listener) { - transportService.sendChildRequest(transportService.getConnection(node), QUERY_SCROLL_ACTION_NAME, request, task, + transportService.sendChildRequest(connection, QUERY_SCROLL_ACTION_NAME, request, task, new ActionListenerResponseHandler<>(listener, ScrollQuerySearchResult::new)); } - public void sendExecuteScrollFetch(DiscoveryNode node, final InternalScrollSearchRequest request, SearchTask task, + public void sendExecuteScrollFetch(Transport.Connection connection, final InternalScrollSearchRequest request, SearchTask task, final SearchActionListener listener) { - transportService.sendChildRequest(transportService.getConnection(node), QUERY_FETCH_SCROLL_ACTION_NAME, request, task, + transportService.sendChildRequest(connection, QUERY_FETCH_SCROLL_ACTION_NAME, request, task, new ActionListenerResponseHandler<>(listener, ScrollQueryFetchSearchResult::new)); } @@ -162,9 +162,9 @@ public void sendExecuteFetch(Transport.Connection connection, final ShardFetchSe sendExecuteFetch(connection, FETCH_ID_ACTION_NAME, request, task, listener); } - public void sendExecuteFetchScroll(DiscoveryNode node, final ShardFetchRequest request, SearchTask task, + public void sendExecuteFetchScroll(Transport.Connection connection, final ShardFetchRequest request, SearchTask task, final SearchActionListener listener) { - sendExecuteFetch(transportService.getConnection(node), FETCH_ID_SCROLL_ACTION_NAME, request, task, listener); + sendExecuteFetch(connection, FETCH_ID_SCROLL_ACTION_NAME, request, task, listener); } private void sendExecuteFetch(Transport.Connection connection, String action, final ShardFetchRequest request, SearchTask task, diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java index 716077c915d6b..d9afbdacafe3c 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java @@ -19,30 +19,16 @@ package org.elasticsearch.action.search; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import static org.elasticsearch.action.search.TransportSearchHelper.parseScrollId; - public class TransportClearScrollAction extends HandledTransportAction { private final ClusterService clusterService; @@ -53,105 +39,16 @@ public TransportClearScrollAction(Settings settings, TransportService transportS ClusterService clusterService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, SearchTransportService searchTransportService) { - super(settings, ClearScrollAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, ClearScrollRequest::new); + super(settings, ClearScrollAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, + ClearScrollRequest::new); this.clusterService = clusterService; this.searchTransportService = searchTransportService; } @Override protected void doExecute(ClearScrollRequest request, final ActionListener listener) { - new Async(request, listener, clusterService.state()).run(); - } - - private class Async { - final DiscoveryNodes nodes; - final CountDown expectedOps; - final List contexts = new ArrayList<>(); - final ActionListener listener; - final AtomicReference expHolder; - final AtomicInteger numberOfFreedSearchContexts = new AtomicInteger(0); - - private Async(ClearScrollRequest request, ActionListener listener, ClusterState clusterState) { - int expectedOps = 0; - this.nodes = clusterState.nodes(); - if (request.getScrollIds().size() == 1 && "_all".equals(request.getScrollIds().get(0))) { - expectedOps = nodes.getSize(); - } else { - for (String parsedScrollId : request.getScrollIds()) { - ScrollIdForNode[] context = parseScrollId(parsedScrollId).getContext(); - expectedOps += context.length; - this.contexts.add(context); - } - } - this.listener = listener; - this.expHolder = new AtomicReference<>(); - this.expectedOps = new CountDown(expectedOps); - } - - public void run() { - if (expectedOps.isCountedDown()) { - listener.onResponse(new ClearScrollResponse(true, 0)); - return; - } - - if (contexts.isEmpty()) { - for (final DiscoveryNode node : nodes) { - searchTransportService.sendClearAllScrollContexts(node, new ActionListener() { - @Override - public void onResponse(TransportResponse response) { - onFreedContext(true); - } - - @Override - public void onFailure(Exception e) { - onFailedFreedContext(e, node); - } - }); - } - } else { - for (ScrollIdForNode[] context : contexts) { - for (ScrollIdForNode target : context) { - final DiscoveryNode node = nodes.get(target.getNode()); - if (node == null) { - onFreedContext(false); - continue; - } - - searchTransportService.sendFreeContext(node, target.getScrollId(), new ActionListener() { - @Override - public void onResponse(SearchTransportService.SearchFreeContextResponse freed) { - onFreedContext(freed.isFreed()); - } - - @Override - public void onFailure(Exception e) { - onFailedFreedContext(e, node); - } - }); - } - } - } - } - - void onFreedContext(boolean freed) { - if (freed) { - numberOfFreedSearchContexts.incrementAndGet(); - } - if (expectedOps.countDown()) { - boolean succeeded = expHolder.get() == null; - listener.onResponse(new ClearScrollResponse(succeeded, numberOfFreedSearchContexts.get())); - } - } - - void onFailedFreedContext(Throwable e, DiscoveryNode node) { - logger.warn((Supplier) () -> new ParameterizedMessage("Clear SC failed on node[{}]", node), e); - if (expectedOps.countDown()) { - listener.onResponse(new ClearScrollResponse(false, numberOfFreedSearchContexts.get())); - } else { - expHolder.set(e); - } - } - + Runnable runnable = new ClearScrollController(request, listener, clusterService.state().nodes(), logger, searchTransportService); + runnable.run(); } } diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchHelper.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchHelper.java index e494bb6768d65..7a0bb63478c76 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchHelper.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchHelper.java @@ -23,7 +23,9 @@ import org.apache.lucene.store.RAMOutputStream; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.SearchPhaseResult; +import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.InternalScrollSearchRequest; +import org.elasticsearch.transport.RemoteClusterAware; import java.io.IOException; import java.util.Base64; @@ -40,7 +42,13 @@ static String buildScrollId(AtomicArray searchPhase out.writeVInt(searchPhaseResults.asList().size()); for (SearchPhaseResult searchPhaseResult : searchPhaseResults.asList()) { out.writeLong(searchPhaseResult.getRequestId()); - out.writeString(searchPhaseResult.getSearchShardTarget().getNodeId()); + SearchShardTarget searchShardTarget = searchPhaseResult.getSearchShardTarget(); + if (searchShardTarget.getClusterAlias() != null) { + out.writeString(RemoteClusterAware.buildRemoteIndexName(searchShardTarget.getClusterAlias(), + searchShardTarget.getNodeId())); + } else { + out.writeString(searchShardTarget.getNodeId()); + } } byte[] bytes = new byte[(int) out.getFilePointer()]; out.writeTo(bytes, 0); @@ -57,7 +65,15 @@ static ParsedScrollId parseScrollId(String scrollId) { for (int i = 0; i < context.length; ++i) { long id = in.readLong(); String target = in.readString(); - context[i] = new ScrollIdForNode(target, id); + String clusterAlias; + final int index = target.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR); + if (index == -1) { + clusterAlias = null; + } else { + clusterAlias = target.substring(0, index); + target = target.substring(index+1); + } + context[i] = new ScrollIdForNode(clusterAlias, target, id); } if (in.getPosition() != bytes.length) { throw new IllegalArgumentException("Not all bytes were read"); diff --git a/core/src/main/java/org/elasticsearch/action/support/GroupedActionListener.java b/core/src/main/java/org/elasticsearch/action/support/GroupedActionListener.java index 85b418e046cc0..ed9b7c8d15d60 100644 --- a/core/src/main/java/org/elasticsearch/action/support/GroupedActionListener.java +++ b/core/src/main/java/org/elasticsearch/action/support/GroupedActionListener.java @@ -37,7 +37,7 @@ public final class GroupedActionListener implements ActionListener { private final CountDown countDown; private final AtomicInteger pos = new AtomicInteger(); - private final AtomicArray roles; + private final AtomicArray results; private final ActionListener> delegate; private final Collection defaults; private final AtomicReference failure = new AtomicReference<>(); @@ -49,7 +49,7 @@ public final class GroupedActionListener implements ActionListener { */ public GroupedActionListener(ActionListener> delegate, int groupSize, Collection defaults) { - roles = new AtomicArray<>(groupSize); + results = new AtomicArray<>(groupSize); countDown = new CountDown(groupSize); this.delegate = delegate; this.defaults = defaults; @@ -57,12 +57,12 @@ public GroupedActionListener(ActionListener> delegate, int groupSi @Override public void onResponse(T element) { - roles.set(pos.incrementAndGet() - 1, element); + results.setOnce(pos.incrementAndGet() - 1, element); if (countDown.countDown()) { if (failure.get() != null) { delegate.onFailure(failure.get()); } else { - List collect = this.roles.asList(); + List collect = this.results.asList(); collect.addAll(defaults); delegate.onResponse(Collections.unmodifiableList(collect)); } diff --git a/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 2b16c26931b86..b73ad436d2a3a 100644 --- a/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -61,8 +61,10 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Represents a connection to a single remote cluster. In contrast to a local cluster a remote cluster is not joined such that the @@ -206,6 +208,54 @@ public String executor() { }); } + /** + * Collects all nodes on the connected cluster and returns / passes a nodeID to {@link DiscoveryNode} lookup function + * that returns null if the node ID is not found. + */ + void collectNodes(ActionListener> listener) { + Runnable runnable = () -> { + final ClusterStateRequest request = new ClusterStateRequest(); + request.clear(); + request.nodes(true); + request.local(true); // run this on the node that get's the request it's as good as any other + final DiscoveryNode node = nodeSupplier.get(); + transportService.sendRequest(node, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY, + new TransportResponseHandler() { + @Override + public ClusterStateResponse newInstance() { + return new ClusterStateResponse(); + } + + @Override + public void handleResponse(ClusterStateResponse response) { + DiscoveryNodes nodes = response.getState().nodes(); + listener.onResponse(nodes::get); + } + + @Override + public void handleException(TransportException exp) { + listener.onFailure(exp); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }); + }; + if (connectedNodes.isEmpty()) { + // just in case if we are not connected for some reason we try to connect and if we fail we have to notify the listener + // this will cause some back pressure on the search end and eventually will cause rejections but that's fine + // we can't proceed with a search on a cluster level. + // in the future we might want to just skip the remote nodes in such a case but that can already be implemented on the caller + // end since they provide the listener. + ensureConnected(ActionListener.wrap((x) -> runnable.run(), listener::onFailure)); + } else { + runnable.run(); + } + + } + /** * Returns a connection to the remote cluster. This connection might be a proxy connection that redirects internally to the * given node. diff --git a/core/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/core/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 621713c8ab11e..b0c7060f3729a 100644 --- a/core/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/core/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -18,17 +18,12 @@ */ package org.elasticsearch.transport; -import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.util.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.OriginalIndices; -import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchShardIterator; import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; @@ -40,15 +35,10 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.CountDown; -import org.elasticsearch.index.Index; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.search.internal.AliasFilter; import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -59,6 +49,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -346,4 +337,44 @@ public void getRemoteConnectionInfos(ActionListener clusters, ActionListener> listener) { + Map remoteClusters = this.remoteClusters; + for (String cluster : clusters) { + if (remoteClusters.containsKey(cluster) == false) { + listener.onFailure(new IllegalArgumentException("no such remote cluster: [" + cluster + "]")); + return; + } + } + + final Map> clusterMap = new HashMap<>(); + CountDown countDown = new CountDown(clusters.size()); + Function nullFunction = s -> null; + for (final String cluster : clusters) { + RemoteClusterConnection connection = remoteClusters.get(cluster); + connection.collectNodes(new ActionListener>() { + @Override + public void onResponse(Function nodeLookup) { + synchronized (clusterMap) { + clusterMap.put(cluster, nodeLookup); + } + if (countDown.countDown()) { + listener.onResponse((clusterAlias, nodeId) + -> clusterMap.getOrDefault(clusterAlias, nullFunction).apply(nodeId)); + } + } + + @Override + public void onFailure(Exception e) { + if (countDown.fastForward()) { + listener.onFailure(e); + } + } + }); + } + } } diff --git a/core/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java b/core/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java new file mode 100644 index 0000000000000..911e5b4d3d0e0 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java @@ -0,0 +1,253 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.search; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.search.SearchPhaseResult; +import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.NodeNotConnectedException; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportResponse; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +public class ClearScrollControllerTests extends ESTestCase { + + + public void testClearAll() throws IOException, InterruptedException { + DiscoveryNode node1 = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNode node2 = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNode node3 = new DiscoveryNode("node_3", buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build(); + CountDownLatch latch = new CountDownLatch(1); + ActionListener listener = new ActionListener() { + @Override + public void onResponse(ClearScrollResponse clearScrollResponse) { + try { + assertEquals(3, clearScrollResponse.getNumFreed()); + assertTrue(clearScrollResponse.isSucceeded()); + } finally { + latch.countDown(); + } + + } + + @Override + public void onFailure(Exception e) { + try { + throw new AssertionError(e); + } finally { + latch.countDown(); + } + } + }; + List nodesInvoked = new CopyOnWriteArrayList<>(); + SearchTransportService searchTransportService = new SearchTransportService(Settings.EMPTY, null) { + @Override + public void sendClearAllScrollContexts(DiscoveryNode node, ActionListener listener) { + nodesInvoked.add(node); + Thread t = new Thread(() -> listener.onResponse(TransportResponse.Empty.INSTANCE)); // response is unused + t.start(); + } + }; + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.scrollIds(Arrays.asList("_all")); + ClearScrollController controller = new ClearScrollController(clearScrollRequest, listener, + nodes, logger, searchTransportService); + controller.run(); + latch.await(); + assertEquals(3, nodesInvoked.size()); + Collections.sort(nodesInvoked, Comparator.comparing(DiscoveryNode::getId)); + assertEquals(nodesInvoked, Arrays.asList(node1, node2, node3)); + } + + + public void testClearScrollIds() throws IOException, InterruptedException { + DiscoveryNode node1 = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNode node2 = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNode node3 = new DiscoveryNode("node_3", buildNewFakeTransportAddress(), Version.CURRENT); + AtomicArray array = new AtomicArray<>(3); + SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult1 = new SearchAsyncActionTests.TestSearchPhaseResult(1, node1); + testSearchPhaseResult1.setSearchShardTarget(new SearchShardTarget("node_1", new ShardId("idx", "uuid1", 2), null, null)); + SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult2 = new SearchAsyncActionTests.TestSearchPhaseResult(12, node2); + testSearchPhaseResult2.setSearchShardTarget(new SearchShardTarget("node_2", new ShardId("idy", "uuid2", 42), null, null)); + SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult3 = new SearchAsyncActionTests.TestSearchPhaseResult(42, node3); + testSearchPhaseResult3.setSearchShardTarget(new SearchShardTarget("node_3", new ShardId("idy", "uuid2", 43), null, null)); + array.setOnce(0, testSearchPhaseResult1); + array.setOnce(1, testSearchPhaseResult2); + array.setOnce(2, testSearchPhaseResult3); + AtomicInteger numFreed = new AtomicInteger(0); + String scrollId = TransportSearchHelper.buildScrollId(array); + DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build(); + CountDownLatch latch = new CountDownLatch(1); + ActionListener listener = new ActionListener() { + @Override + public void onResponse(ClearScrollResponse clearScrollResponse) { + try { + assertEquals(numFreed.get(), clearScrollResponse.getNumFreed()); + assertTrue(clearScrollResponse.isSucceeded()); + } finally { + latch.countDown(); + } + + } + + @Override + public void onFailure(Exception e) { + try { + throw new AssertionError(e); + } finally { + latch.countDown(); + } + } + }; + List nodesInvoked = new CopyOnWriteArrayList<>(); + SearchTransportService searchTransportService = new SearchTransportService(Settings.EMPTY, null) { + + @Override + public void sendFreeContext(Transport.Connection connection, long contextId, + ActionListener listener) { + nodesInvoked.add(connection.getNode()); + boolean freed = randomBoolean(); + if (freed) { + numFreed.incrementAndGet(); + } + Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(freed))); + t.start(); + } + + @Override + Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.scrollIds(Arrays.asList(scrollId)); + ClearScrollController controller = new ClearScrollController(clearScrollRequest, listener, + nodes, logger, searchTransportService); + controller.run(); + latch.await(); + assertEquals(3, nodesInvoked.size()); + Collections.sort(nodesInvoked, Comparator.comparing(DiscoveryNode::getId)); + assertEquals(nodesInvoked, Arrays.asList(node1, node2, node3)); + } + + public void testClearScrollIdsWithFailure() throws IOException, InterruptedException { + DiscoveryNode node1 = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNode node2 = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNode node3 = new DiscoveryNode("node_3", buildNewFakeTransportAddress(), Version.CURRENT); + AtomicArray array = new AtomicArray<>(3); + SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult1 = new SearchAsyncActionTests.TestSearchPhaseResult(1, node1); + testSearchPhaseResult1.setSearchShardTarget(new SearchShardTarget("node_1", new ShardId("idx", "uuid1", 2), null, null)); + SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult2 = new SearchAsyncActionTests.TestSearchPhaseResult(12, node2); + testSearchPhaseResult2.setSearchShardTarget(new SearchShardTarget("node_2", new ShardId("idy", "uuid2", 42), null, null)); + SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult3 = new SearchAsyncActionTests.TestSearchPhaseResult(42, node3); + testSearchPhaseResult3.setSearchShardTarget(new SearchShardTarget("node_3", new ShardId("idy", "uuid2", 43), null, null)); + array.setOnce(0, testSearchPhaseResult1); + array.setOnce(1, testSearchPhaseResult2); + array.setOnce(2, testSearchPhaseResult3); + AtomicInteger numFreed = new AtomicInteger(0); + AtomicInteger numFailures = new AtomicInteger(0); + AtomicInteger numConnectionFailures = new AtomicInteger(0); + String scrollId = TransportSearchHelper.buildScrollId(array); + DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build(); + CountDownLatch latch = new CountDownLatch(1); + + ActionListener listener = new ActionListener() { + @Override + public void onResponse(ClearScrollResponse clearScrollResponse) { + try { + assertEquals(numFreed.get(), clearScrollResponse.getNumFreed()); + if (numFailures.get() > 0) { + assertFalse(clearScrollResponse.isSucceeded()); + } else { + assertTrue(clearScrollResponse.isSucceeded()); + } + + } finally { + latch.countDown(); + } + + } + + @Override + public void onFailure(Exception e) { + try { + throw new AssertionError(e); + } finally { + latch.countDown(); + } + } + }; + List nodesInvoked = new CopyOnWriteArrayList<>(); + SearchTransportService searchTransportService = new SearchTransportService(Settings.EMPTY, null) { + + @Override + public void sendFreeContext(Transport.Connection connection, long contextId, + ActionListener listener) { + nodesInvoked.add(connection.getNode()); + boolean freed = randomBoolean(); + boolean fail = randomBoolean(); + Thread t = new Thread(() -> { + if (fail) { + numFailures.incrementAndGet(); + listener.onFailure(new IllegalArgumentException("boom")); + } else { + if (freed) { + numFreed.incrementAndGet(); + } + listener.onResponse(new SearchFreeContextResponse(freed)); + } + }); + t.start(); + } + + @Override + Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + if (randomBoolean()) { + numFailures.incrementAndGet(); + numConnectionFailures.incrementAndGet(); + throw new NodeNotConnectedException(node, "boom"); + } + return new SearchAsyncActionTests.MockConnection(node); + } + }; + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.scrollIds(Arrays.asList(scrollId)); + ClearScrollController controller = new ClearScrollController(clearScrollRequest, listener, + nodes, logger, searchTransportService); + controller.run(); + latch.await(); + assertEquals(3 - numConnectionFailures.get(), nodesInvoked.size()); + } +} diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index 39890038f2a2a..878cb7e61266b 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -214,7 +214,7 @@ public void writeTo(StreamOutput out) throws IOException { } } - public final class MockConnection implements Transport.Connection { + public static final class MockConnection implements Transport.Connection { private final DiscoveryNode node; diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchScrollAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchScrollAsyncActionTests.java index 7aa16f473ed6a..e38b04cf4cd12 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchScrollAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchScrollAsyncActionTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.InternalScrollSearchRequest; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.Transport; import java.io.IOException; import java.util.Arrays; @@ -36,17 +37,18 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; public class SearchScrollAsyncActionTests extends ESTestCase { public void testSendRequestsToNodes() throws InterruptedException { ParsedScrollId scrollId = getParsedScrollId( - new ScrollIdForNode("node1", 1), - new ScrollIdForNode("node2", 2), - new ScrollIdForNode("node3", 17), - new ScrollIdForNode("node1", 0), - new ScrollIdForNode("node3", 0)); + new ScrollIdForNode(null, "node1", 1), + new ScrollIdForNode(null, "node2", 2), + new ScrollIdForNode(null, "node3", 17), + new ScrollIdForNode(null, "node1", 0), + new ScrollIdForNode(null, "node3", 0)); DiscoveryNodes discoveryNodes = DiscoveryNodes.builder() .add(new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT)) .add(new DiscoveryNode("node2", buildNewFakeTransportAddress(), Version.CURRENT)) @@ -58,22 +60,29 @@ public void testSendRequestsToNodes() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); AtomicInteger movedCounter = new AtomicInteger(0); SearchScrollAsyncAction action = - new SearchScrollAsyncAction(scrollId, logger, discoveryNodes, null, null, request) + new SearchScrollAsyncAction(scrollId, logger, discoveryNodes, null, null, request + , null) { @Override - protected void executeInitialPhase(DiscoveryNode node, InternalScrollSearchRequest internalRequest, + protected void executeInitialPhase(Transport.Connection connection, InternalScrollSearchRequest internalRequest, SearchActionListener searchActionListener) { new Thread(() -> { SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult = - new SearchAsyncActionTests.TestSearchPhaseResult(internalRequest.id(), node); - testSearchPhaseResult.setSearchShardTarget(new SearchShardTarget(node.getId(), new Index("test", "_na_"), 1)); + new SearchAsyncActionTests.TestSearchPhaseResult(internalRequest.id(), connection.getNode()); + testSearchPhaseResult.setSearchShardTarget(new SearchShardTarget(connection.getNode().getId(), + new Index("test", "_na_"), 1)); searchActionListener.onResponse(testSearchPhaseResult); }).start(); } @Override - protected SearchPhase moveToNextPhase() { + protected Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + + @Override + protected SearchPhase moveToNextPhase(BiFunction clusterNodeLookup) { assertEquals(1, movedCounter.incrementAndGet()); return new SearchPhase("test") { @Override @@ -104,11 +113,11 @@ protected void onFirstPhaseResult(int shardId, SearchAsyncActionTests.TestSearch public void testFailNextPhase() throws InterruptedException { ParsedScrollId scrollId = getParsedScrollId( - new ScrollIdForNode("node1", 1), - new ScrollIdForNode("node2", 2), - new ScrollIdForNode("node3", 17), - new ScrollIdForNode("node1", 0), - new ScrollIdForNode("node3", 0)); + new ScrollIdForNode(null, "node1", 1), + new ScrollIdForNode(null, "node2", 2), + new ScrollIdForNode(null, "node3", 17), + new ScrollIdForNode(null, "node1", 0), + new ScrollIdForNode(null, "node3", 0)); DiscoveryNodes discoveryNodes = DiscoveryNodes.builder() .add(new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT)) .add(new DiscoveryNode("node2", buildNewFakeTransportAddress(), Version.CURRENT)) @@ -143,22 +152,27 @@ public void onFailure(Exception e) { } }; SearchScrollAsyncAction action = - new SearchScrollAsyncAction(scrollId, logger, discoveryNodes, listener, null, - request) { + new SearchScrollAsyncAction(scrollId, logger, discoveryNodes, listener, null, request, null) { @Override - protected void executeInitialPhase(DiscoveryNode node, InternalScrollSearchRequest internalRequest, + protected void executeInitialPhase(Transport.Connection connection, InternalScrollSearchRequest internalRequest, SearchActionListener searchActionListener) { new Thread(() -> { SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult = - new SearchAsyncActionTests.TestSearchPhaseResult(internalRequest.id(), node); - testSearchPhaseResult.setSearchShardTarget(new SearchShardTarget(node.getId(), new Index("test", "_na_"), 1)); + new SearchAsyncActionTests.TestSearchPhaseResult(internalRequest.id(), connection.getNode()); + testSearchPhaseResult.setSearchShardTarget(new SearchShardTarget(connection.getNode().getId(), + new Index("test", "_na_"), 1)); searchActionListener.onResponse(testSearchPhaseResult); }).start(); } @Override - protected SearchPhase moveToNextPhase() { + protected Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + + @Override + protected SearchPhase moveToNextPhase(BiFunction clusterNodeLookup) { assertEquals(1, movedCounter.incrementAndGet()); return new SearchPhase("TEST_PHASE") { @Override @@ -188,11 +202,11 @@ protected void onFirstPhaseResult(int shardId, SearchAsyncActionTests.TestSearch public void testNodeNotAvailable() throws InterruptedException { ParsedScrollId scrollId = getParsedScrollId( - new ScrollIdForNode("node1", 1), - new ScrollIdForNode("node2", 2), - new ScrollIdForNode("node3", 17), - new ScrollIdForNode("node1", 0), - new ScrollIdForNode("node3", 0)); + new ScrollIdForNode(null, "node1", 1), + new ScrollIdForNode(null, "node2", 2), + new ScrollIdForNode(null, "node3", 17), + new ScrollIdForNode(null, "node1", 0), + new ScrollIdForNode(null, "node3", 0)); // node2 is not available DiscoveryNodes discoveryNodes = DiscoveryNodes.builder() .add(new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT)) @@ -204,23 +218,33 @@ public void testNodeNotAvailable() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); AtomicInteger movedCounter = new AtomicInteger(0); SearchScrollAsyncAction action = - new SearchScrollAsyncAction(scrollId, logger, discoveryNodes, null, null, request) + new SearchScrollAsyncAction(scrollId, logger, discoveryNodes, null, null, request, null) { @Override - protected void executeInitialPhase(DiscoveryNode node, InternalScrollSearchRequest internalRequest, + protected void executeInitialPhase(Transport.Connection connection, InternalScrollSearchRequest internalRequest, SearchActionListener searchActionListener) { - assertNotEquals("node2 is not available", "node2", node.getId()); + try { + assertNotEquals("node2 is not available", "node2", connection.getNode().getId()); + } catch (NullPointerException e) { + logger.warn(e); + } new Thread(() -> { SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult = - new SearchAsyncActionTests.TestSearchPhaseResult(internalRequest.id(), node); - testSearchPhaseResult.setSearchShardTarget(new SearchShardTarget(node.getId(), new Index("test", "_na_"), 1)); + new SearchAsyncActionTests.TestSearchPhaseResult(internalRequest.id(), connection.getNode()); + testSearchPhaseResult.setSearchShardTarget(new SearchShardTarget(connection.getNode().getId(), + new Index("test", "_na_"), 1)); searchActionListener.onResponse(testSearchPhaseResult); }).start(); } @Override - protected SearchPhase moveToNextPhase() { + protected Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + + @Override + protected SearchPhase moveToNextPhase(BiFunction clusterNodeLookup) { assertEquals(1, movedCounter.incrementAndGet()); return new SearchPhase("test") { @Override @@ -256,11 +280,11 @@ protected void onFirstPhaseResult(int shardId, SearchAsyncActionTests.TestSearch public void testShardFailures() throws InterruptedException { ParsedScrollId scrollId = getParsedScrollId( - new ScrollIdForNode("node1", 1), - new ScrollIdForNode("node2", 2), - new ScrollIdForNode("node3", 17), - new ScrollIdForNode("node1", 0), - new ScrollIdForNode("node3", 0)); + new ScrollIdForNode(null, "node1", 1), + new ScrollIdForNode(null, "node2", 2), + new ScrollIdForNode(null, "node3", 17), + new ScrollIdForNode(null, "node1", 0), + new ScrollIdForNode(null, "node3", 0)); DiscoveryNodes discoveryNodes = DiscoveryNodes.builder() .add(new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT)) .add(new DiscoveryNode("node2", buildNewFakeTransportAddress(), Version.CURRENT)) @@ -272,10 +296,10 @@ public void testShardFailures() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); AtomicInteger movedCounter = new AtomicInteger(0); SearchScrollAsyncAction action = - new SearchScrollAsyncAction(scrollId, logger, discoveryNodes, null, null, request) + new SearchScrollAsyncAction(scrollId, logger, discoveryNodes, null, null, request, null) { @Override - protected void executeInitialPhase(DiscoveryNode node, InternalScrollSearchRequest internalRequest, + protected void executeInitialPhase(Transport.Connection connection, InternalScrollSearchRequest internalRequest, SearchActionListener searchActionListener) { new Thread(() -> { @@ -283,15 +307,21 @@ protected void executeInitialPhase(DiscoveryNode node, InternalScrollSearchReque searchActionListener.onFailure(new IllegalArgumentException("BOOM on shard")); } else { SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult = - new SearchAsyncActionTests.TestSearchPhaseResult(internalRequest.id(), node); - testSearchPhaseResult.setSearchShardTarget(new SearchShardTarget(node.getId(), new Index("test", "_na_"), 1)); + new SearchAsyncActionTests.TestSearchPhaseResult(internalRequest.id(), connection.getNode()); + testSearchPhaseResult.setSearchShardTarget(new SearchShardTarget(connection.getNode().getId(), + new Index("test", "_na_"), 1)); searchActionListener.onResponse(testSearchPhaseResult); } }).start(); } @Override - protected SearchPhase moveToNextPhase() { + protected Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + + @Override + protected SearchPhase moveToNextPhase(BiFunction clusterNodeLookup) { assertEquals(1, movedCounter.incrementAndGet()); return new SearchPhase("test") { @Override @@ -327,11 +357,11 @@ protected void onFirstPhaseResult(int shardId, SearchAsyncActionTests.TestSearch public void testAllShardsFailed() throws InterruptedException { ParsedScrollId scrollId = getParsedScrollId( - new ScrollIdForNode("node1", 1), - new ScrollIdForNode("node2", 2), - new ScrollIdForNode("node3", 17), - new ScrollIdForNode("node1", 0), - new ScrollIdForNode("node3", 0)); + new ScrollIdForNode(null, "node1", 1), + new ScrollIdForNode(null, "node2", 2), + new ScrollIdForNode(null, "node3", 17), + new ScrollIdForNode(null, "node1", 0), + new ScrollIdForNode(null, "node3", 0)); DiscoveryNodes discoveryNodes = DiscoveryNodes.builder() .add(new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT)) .add(new DiscoveryNode("node2", buildNewFakeTransportAddress(), Version.CURRENT)) @@ -365,17 +395,21 @@ public void onFailure(Exception e) { } }; SearchScrollAsyncAction action = - new SearchScrollAsyncAction(scrollId, logger, discoveryNodes, listener, null, - request) { + new SearchScrollAsyncAction(scrollId, logger, discoveryNodes, listener, null, request, null) { @Override - protected void executeInitialPhase(DiscoveryNode node, InternalScrollSearchRequest internalRequest, + protected void executeInitialPhase(Transport.Connection connection, InternalScrollSearchRequest internalRequest, SearchActionListener searchActionListener) { new Thread(() -> searchActionListener.onFailure(new IllegalArgumentException("BOOM on shard"))).start(); } @Override - protected SearchPhase moveToNextPhase() { + protected Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + + @Override + protected SearchPhase moveToNextPhase(BiFunction clusterNodeLookup) { fail("don't move all shards failed"); return null; } diff --git a/core/src/test/java/org/elasticsearch/action/search/TransportSearchHelperTests.java b/core/src/test/java/org/elasticsearch/action/search/TransportSearchHelperTests.java new file mode 100644 index 0000000000000..49d7450096bb5 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/search/TransportSearchHelperTests.java @@ -0,0 +1,64 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.search; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.search.SearchPhaseResult; +import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +public class TransportSearchHelperTests extends ESTestCase { + + public void testParseScrollId() throws IOException { + AtomicArray array = new AtomicArray<>(3); + DiscoveryNode node1 = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNode node2 = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNode node3 = new DiscoveryNode("node_3", buildNewFakeTransportAddress(), Version.CURRENT); + SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult1 = new SearchAsyncActionTests.TestSearchPhaseResult(1, node1); + testSearchPhaseResult1.setSearchShardTarget(new SearchShardTarget("node_1", new ShardId("idx", "uuid1", 2), "cluster_x", null)); + SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult2 = new SearchAsyncActionTests.TestSearchPhaseResult(12, node2); + testSearchPhaseResult2.setSearchShardTarget(new SearchShardTarget("node_2", new ShardId("idy", "uuid2", 42), "cluster_y", null)); + SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult3 = new SearchAsyncActionTests.TestSearchPhaseResult(42, node3); + testSearchPhaseResult3.setSearchShardTarget(new SearchShardTarget("node_3", new ShardId("idy", "uuid2", 43), null, null)); + array.setOnce(0, testSearchPhaseResult1); + array.setOnce(1, testSearchPhaseResult2); + array.setOnce(2, testSearchPhaseResult3); + + + String scrollId = TransportSearchHelper.buildScrollId(array); + ParsedScrollId parseScrollId = TransportSearchHelper.parseScrollId(scrollId); + assertEquals(3, parseScrollId.getContext().length); + assertEquals("node_1", parseScrollId.getContext()[0].getNode()); + assertEquals("cluster_x", parseScrollId.getContext()[0].getClusterAlias()); + assertEquals(1, parseScrollId.getContext()[0].getScrollId()); + + assertEquals("node_2", parseScrollId.getContext()[1].getNode()); + assertEquals("cluster_y", parseScrollId.getContext()[1].getClusterAlias()); + assertEquals(12, parseScrollId.getContext()[1].getScrollId()); + + assertEquals("node_3", parseScrollId.getContext()[2].getNode()); + assertNull(parseScrollId.getContext()[2].getClusterAlias()); + assertEquals(42, parseScrollId.getContext()[2].getScrollId()); + } +} diff --git a/core/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/core/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 3c1181b68258d..44a134857f93f 100644 --- a/core/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/core/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -55,11 +55,6 @@ import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.RemoteClusterConnection; -import org.elasticsearch.transport.RemoteConnectionInfo; -import org.elasticsearch.transport.RemoteTransportException; -import org.elasticsearch.transport.TransportConnectionListener; -import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.net.InetAddress; @@ -78,6 +73,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -357,7 +353,6 @@ public void run() { public void testFetchShards() throws Exception { List knownNodes = new CopyOnWriteArrayList<>(); - try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); @@ -785,4 +780,42 @@ public void onFailure(Exception e) { } } } + + public void testCollectNodes() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", + Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + if (randomBoolean()) { + updateSeedNodes(connection, Arrays.asList(seedNode)); + } + CountDownLatch responseLatch = new CountDownLatch(1); + AtomicReference> reference = new AtomicReference<>(); + AtomicReference failReference = new AtomicReference<>(); + ActionListener> shardsListener = ActionListener.wrap( + x -> { + reference.set(x); + responseLatch.countDown(); + }, + x -> { + failReference.set(x); + responseLatch.countDown(); + }); + connection.collectNodes(shardsListener); + responseLatch.await(); + assertNull(failReference.get()); + assertNotNull(reference.get()); + Function function = reference.get(); + assertEquals(seedNode, function.apply(seedNode.getId())); + assertNull(function.apply(seedNode.getId() + "foo")); + assertTrue(connection.assertNoRunningConnections()); + } + } + } + } } diff --git a/core/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/core/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 32a672e1bbc9a..8c74b914e5782 100644 --- a/core/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/core/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -34,11 +34,14 @@ import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; public class RemoteClusterServiceTests extends ESTestCase { @@ -265,4 +268,110 @@ private ActionListener connectionListener(final CountDownLatch latch) { return ActionListener.wrap(x -> latch.countDown(), x -> fail()); } + + public void testCollectNodes() throws InterruptedException, IOException { + final Settings settings = Settings.EMPTY; + final List knownNodes_c1 = new CopyOnWriteArrayList<>(); + final List knownNodes_c2 = new CopyOnWriteArrayList<>(); + + try (MockTransportService c1N1 = + startTransport("cluster_1_node_1", knownNodes_c1, Version.CURRENT); + MockTransportService c1N2 = + startTransport("cluster_1_node_2", knownNodes_c1, Version.CURRENT); + MockTransportService c2N1 = + startTransport("cluster_2_node_1", knownNodes_c2, Version.CURRENT); + MockTransportService c2N2 = + startTransport("cluster_2_node_2", knownNodes_c2, Version.CURRENT)) { + final DiscoveryNode c1N1Node = c1N1.getLocalDiscoNode(); + final DiscoveryNode c1N2Node = c1N2.getLocalDiscoNode(); + final DiscoveryNode c2N1Node = c2N1.getLocalDiscoNode(); + final DiscoveryNode c2N2Node = c2N2.getLocalDiscoNode(); + knownNodes_c1.add(c1N1Node); + knownNodes_c1.add(c1N2Node); + knownNodes_c2.add(c2N1Node); + knownNodes_c2.add(c2N2Node); + Collections.shuffle(knownNodes_c1, random()); + Collections.shuffle(knownNodes_c2, random()); + + try (MockTransportService transportService = MockTransportService.createNewService( + settings, + Version.CURRENT, + threadPool, + null)) { + transportService.start(); + transportService.acceptIncomingRequests(); + final Settings.Builder builder = Settings.builder(); + builder.putArray( + "search.remote.cluster_1.seeds", c1N1Node.getAddress().toString()); + builder.putArray( + "search.remote.cluster_2.seeds", c2N1Node.getAddress().toString()); + try (RemoteClusterService service = + new RemoteClusterService(settings, transportService)) { + assertFalse(service.isCrossClusterSearchEnabled()); + service.initializeRemoteClusters(); + assertFalse(service.isCrossClusterSearchEnabled()); + + final InetSocketAddress c1N1Address = c1N1Node.getAddress().address(); + final InetSocketAddress c1N2Address = c1N2Node.getAddress().address(); + final InetSocketAddress c2N1Address = c2N1Node.getAddress().address(); + final InetSocketAddress c2N2Address = c2N2Node.getAddress().address(); + + final CountDownLatch firstLatch = new CountDownLatch(1); + service.updateRemoteCluster( + "cluster_1", + Arrays.asList(c1N1Address, c1N2Address), + connectionListener(firstLatch)); + firstLatch.await(); + + final CountDownLatch secondLatch = new CountDownLatch(1); + service.updateRemoteCluster( + "cluster_2", + Arrays.asList(c2N1Address, c2N2Address), + connectionListener(secondLatch)); + secondLatch.await(); + CountDownLatch latch = new CountDownLatch(1); + service.collectNodes(new HashSet<>(Arrays.asList("cluster_1", "cluster_2")), + new ActionListener>() { + @Override + public void onResponse(BiFunction func) { + try { + assertEquals(c1N1Node, func.apply("cluster_1", c1N1Node.getId())); + assertEquals(c1N2Node, func.apply("cluster_1", c1N2Node.getId())); + assertEquals(c2N1Node, func.apply("cluster_2", c2N1Node.getId())); + assertEquals(c2N2Node, func.apply("cluster_2", c2N2Node.getId())); + } finally { + latch.countDown(); + } + } + + @Override + public void onFailure(Exception e) { + try { + throw new AssertionError(e); + } finally { + latch.countDown(); + } + } + }); + + AtomicReference ex = new AtomicReference<>(); + service.collectNodes(new HashSet<>(Arrays.asList("cluster_1", "cluster_2", "no such cluster")), + new ActionListener>() { + @Override + public void onResponse(BiFunction stringStringDiscoveryNodeBiFunction) { + fail("should not be called"); + } + + @Override + public void onFailure(Exception e) { + ex.set(e); + } + }); + assertNotNull(ex.get()); + assertTrue(ex.get() instanceof IllegalArgumentException); + assertEquals("no such remote cluster: [no such cluster]", ex.get().getMessage()); + } + } + } + } } diff --git a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/40_scroll.yml b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/40_scroll.yml new file mode 100644 index 0000000000000..aac5c438c323c --- /dev/null +++ b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/40_scroll.yml @@ -0,0 +1,40 @@ +--- +"Scroll on the mixed cluster": + + - do: + search: + index: my_remote_cluster:test_index + size: 4 + scroll: 1m + sort: filter_field + body: + query: + match_all: {} + + - set: {_scroll_id: scroll_id} + - match: {hits.total: 6 } + - length: {hits.hits: 4 } + - match: {hits.hits.0._source.filter_field: 0 } + - match: {hits.hits.1._source.filter_field: 0 } + - match: {hits.hits.2._source.filter_field: 0 } + - match: {hits.hits.3._source.filter_field: 0 } + + - do: + scroll: + body: { "scroll_id": "$scroll_id", "scroll": "1m"} + + - match: {hits.total: 6 } + - length: {hits.hits: 2 } + - match: {hits.hits.0._source.filter_field: 1 } + - match: {hits.hits.1._source.filter_field: 1 } + - do: + scroll: + scroll_id: $scroll_id + scroll: 1m + + - match: {hits.total: 6 } + - length: {hits.hits: 0 } + + - do: + clear_scroll: + scroll_id: $scroll_id From 20daeb32809805da853bf6ff604950c8c259c76a Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 7 Jun 2017 10:08:24 +0200 Subject: [PATCH 02/11] foo --- .../org/elasticsearch/action/search/ClearScrollController.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/ClearScrollController.java b/core/src/main/java/org/elasticsearch/action/search/ClearScrollController.java index 45a879d5c3a13..c39b528f4ba16 100644 --- a/core/src/main/java/org/elasticsearch/action/search/ClearScrollController.java +++ b/core/src/main/java/org/elasticsearch/action/search/ClearScrollController.java @@ -45,7 +45,7 @@ final class ClearScrollController implements Runnable { private final Logger logger; private final Runnable runner; - public ClearScrollController(ClearScrollRequest request, final ActionListener listener, + public ClearScrollController(ClearScrollRequest request, ActionListener listener, DiscoveryNodes nodes, Logger logger, SearchTransportService searchTransportService) { this.nodes = nodes; this.logger = logger; From 28b7f0cecbc80a4d7c1c09009df37f4fb5e46f35 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 7 Jun 2017 10:26:47 +0200 Subject: [PATCH 03/11] fix modifyer --- .../elasticsearch/action/search/ClearScrollController.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/ClearScrollController.java b/core/src/main/java/org/elasticsearch/action/search/ClearScrollController.java index c39b528f4ba16..d8525246206a5 100644 --- a/core/src/main/java/org/elasticsearch/action/search/ClearScrollController.java +++ b/core/src/main/java/org/elasticsearch/action/search/ClearScrollController.java @@ -45,8 +45,8 @@ final class ClearScrollController implements Runnable { private final Logger logger; private final Runnable runner; - public ClearScrollController(ClearScrollRequest request, ActionListener listener, - DiscoveryNodes nodes, Logger logger, SearchTransportService searchTransportService) { + ClearScrollController(ClearScrollRequest request, ActionListener listener, DiscoveryNodes nodes, Logger logger, + SearchTransportService searchTransportService) { this.nodes = nodes; this.logger = logger; this.searchTransportService = searchTransportService; From 0d6982e3425f3595e8c609a74339adf45921fb01 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 7 Jun 2017 11:03:30 +0200 Subject: [PATCH 04/11] fix line len --- .../action/search/SearchScrollAsyncActionTests.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchScrollAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchScrollAsyncActionTests.java index e38b04cf4cd12..9986b84cdc344 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchScrollAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchScrollAsyncActionTests.java @@ -152,7 +152,8 @@ public void onFailure(Exception e) { } }; SearchScrollAsyncAction action = - new SearchScrollAsyncAction(scrollId, logger, discoveryNodes, listener, null, request, null) { + new SearchScrollAsyncAction(scrollId, logger, discoveryNodes, listener, null, + request, null) { @Override protected void executeInitialPhase(Transport.Connection connection, InternalScrollSearchRequest internalRequest, SearchActionListener searchActionListener) @@ -218,7 +219,8 @@ public void testNodeNotAvailable() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); AtomicInteger movedCounter = new AtomicInteger(0); SearchScrollAsyncAction action = - new SearchScrollAsyncAction(scrollId, logger, discoveryNodes, null, null, request, null) + new SearchScrollAsyncAction(scrollId, logger, discoveryNodes, null, null, + request, null) { @Override protected void executeInitialPhase(Transport.Connection connection, InternalScrollSearchRequest internalRequest, @@ -296,7 +298,8 @@ public void testShardFailures() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); AtomicInteger movedCounter = new AtomicInteger(0); SearchScrollAsyncAction action = - new SearchScrollAsyncAction(scrollId, logger, discoveryNodes, null, null, request, null) + new SearchScrollAsyncAction(scrollId, logger, discoveryNodes, null, null, + request, null) { @Override protected void executeInitialPhase(Transport.Connection connection, InternalScrollSearchRequest internalRequest, @@ -395,7 +398,8 @@ public void onFailure(Exception e) { } }; SearchScrollAsyncAction action = - new SearchScrollAsyncAction(scrollId, logger, discoveryNodes, listener, null, request, null) { + new SearchScrollAsyncAction(scrollId, logger, discoveryNodes, listener, null, + request, null) { @Override protected void executeInitialPhase(Transport.Connection connection, InternalScrollSearchRequest internalRequest, SearchActionListener searchActionListener) From 5e169a6f7cdbb644fdc3b5e4dabed8ecb4ab626c Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 7 Jun 2017 14:15:20 +0200 Subject: [PATCH 05/11] pass dummy listener to tests --- .../search/SearchScrollAsyncActionTests.java | 26 ++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchScrollAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchScrollAsyncActionTests.java index 9986b84cdc344..038bb6ca8f659 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchScrollAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchScrollAsyncActionTests.java @@ -60,8 +60,8 @@ public void testSendRequestsToNodes() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); AtomicInteger movedCounter = new AtomicInteger(0); SearchScrollAsyncAction action = - new SearchScrollAsyncAction(scrollId, logger, discoveryNodes, null, null, request - , null) + new SearchScrollAsyncAction(scrollId, logger, discoveryNodes, dummyListener(), + null, request, null) { @Override protected void executeInitialPhase(Transport.Connection connection, InternalScrollSearchRequest internalRequest, @@ -219,8 +219,8 @@ public void testNodeNotAvailable() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); AtomicInteger movedCounter = new AtomicInteger(0); SearchScrollAsyncAction action = - new SearchScrollAsyncAction(scrollId, logger, discoveryNodes, null, null, - request, null) + new SearchScrollAsyncAction(scrollId, logger, discoveryNodes, dummyListener() + , null, request, null) { @Override protected void executeInitialPhase(Transport.Connection connection, InternalScrollSearchRequest internalRequest, @@ -298,8 +298,8 @@ public void testShardFailures() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); AtomicInteger movedCounter = new AtomicInteger(0); SearchScrollAsyncAction action = - new SearchScrollAsyncAction(scrollId, logger, discoveryNodes, null, null, - request, null) + new SearchScrollAsyncAction(scrollId, logger, discoveryNodes, dummyListener(), + null, request, null) { @Override protected void executeInitialPhase(Transport.Connection connection, InternalScrollSearchRequest internalRequest, @@ -442,4 +442,18 @@ private static ParsedScrollId getParsedScrollId(ScrollIdForNode... idsForNodes) Collections.shuffle(scrollIdForNodes, random()); return new ParsedScrollId("", "test", scrollIdForNodes.toArray(new ScrollIdForNode[0])); } + + private ActionListener dummyListener() { + return new ActionListener() { + @Override + public void onResponse(SearchResponse response) { + fail("dummy"); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + }; + } } From ae95dc15fb2d1733ee013f4954c437d9594219f2 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 7 Jun 2017 14:40:34 +0200 Subject: [PATCH 06/11] wait for listener to be called --- .../transport/RemoteClusterServiceTests.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/core/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 8c74b914e5782..868ba093196b4 100644 --- a/core/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/core/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -353,20 +353,31 @@ public void onFailure(Exception e) { } } }); + latch.await(); + CountDownLatch failLatch = new CountDownLatch(1); AtomicReference ex = new AtomicReference<>(); service.collectNodes(new HashSet<>(Arrays.asList("cluster_1", "cluster_2", "no such cluster")), new ActionListener>() { @Override public void onResponse(BiFunction stringStringDiscoveryNodeBiFunction) { - fail("should not be called"); + try { + fail("should not be called"); + } finally { + failLatch.countDown(); + } } @Override public void onFailure(Exception e) { - ex.set(e); + try { + ex.set(e); + } finally { + failLatch.countDown(); + } } }); + failLatch.await(); assertNotNull(ex.get()); assertTrue(ex.get() instanceof IllegalArgumentException); assertEquals("no such remote cluster: [no such cluster]", ex.get().getMessage()); From 1e5e08b2194b7497c6262d967692510517f81cd0 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 7 Jun 2017 17:23:16 +0200 Subject: [PATCH 07/11] add extra exception handling --- .../transport/RemoteClusterConnection.java | 76 ++++++++++--------- 1 file changed, 40 insertions(+), 36 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index b73ad436d2a3a..1e766cb2c2b38 100644 --- a/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -213,45 +213,49 @@ public String executor() { * that returns null if the node ID is not found. */ void collectNodes(ActionListener> listener) { - Runnable runnable = () -> { - final ClusterStateRequest request = new ClusterStateRequest(); - request.clear(); - request.nodes(true); - request.local(true); // run this on the node that get's the request it's as good as any other - final DiscoveryNode node = nodeSupplier.get(); - transportService.sendRequest(node, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY, - new TransportResponseHandler() { - @Override - public ClusterStateResponse newInstance() { - return new ClusterStateResponse(); - } + try { + Runnable runnable = () -> { + final ClusterStateRequest request = new ClusterStateRequest(); + request.clear(); + request.nodes(true); + request.local(true); // run this on the node that get's the request it's as good as any other + final DiscoveryNode node = nodeSupplier.get(); + transportService.sendRequest(node, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY, + new TransportResponseHandler() { + @Override + public ClusterStateResponse newInstance() { + return new ClusterStateResponse(); + } - @Override - public void handleResponse(ClusterStateResponse response) { - DiscoveryNodes nodes = response.getState().nodes(); - listener.onResponse(nodes::get); - } + @Override + public void handleResponse(ClusterStateResponse response) { + DiscoveryNodes nodes = response.getState().nodes(); + listener.onResponse(nodes::get); + } - @Override - public void handleException(TransportException exp) { - listener.onFailure(exp); - } + @Override + public void handleException(TransportException exp) { + listener.onFailure(exp); + } - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - }); - }; - if (connectedNodes.isEmpty()) { - // just in case if we are not connected for some reason we try to connect and if we fail we have to notify the listener - // this will cause some back pressure on the search end and eventually will cause rejections but that's fine - // we can't proceed with a search on a cluster level. - // in the future we might want to just skip the remote nodes in such a case but that can already be implemented on the caller - // end since they provide the listener. - ensureConnected(ActionListener.wrap((x) -> runnable.run(), listener::onFailure)); - } else { - runnable.run(); + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }); + }; + if (connectedNodes.isEmpty()) { + // just in case if we are not connected for some reason we try to connect and if we fail we have to notify the listener + // this will cause some back pressure on the search end and eventually will cause rejections but that's fine + // we can't proceed with a search on a cluster level. + // in the future we might want to just skip the remote nodes in such a case but that can already be implemented on the caller + // end since they provide the listener. + ensureConnected(ActionListener.wrap((x) -> runnable.run(), listener::onFailure)); + } else { + runnable.run(); + } + } catch (Exception ex) { + listener.onFailure(ex); } } From 7769d3315c5e9ae5d2b47cc56ab0534e7bc578bd Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 7 Jun 2017 18:49:58 +0200 Subject: [PATCH 08/11] add basic proxy actions --- .../transport/TransportActionProxy.java | 14 ++++++++++++++ .../transport/TransportActionProxyTests.java | 12 ++++++++++++ 2 files changed, 26 insertions(+) diff --git a/core/src/main/java/org/elasticsearch/transport/TransportActionProxy.java b/core/src/main/java/org/elasticsearch/transport/TransportActionProxy.java index 502ab51e0fa1b..5259fca507e49 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportActionProxy.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportActionProxy.java @@ -158,4 +158,18 @@ public static TransportRequest unwrapRequest(TransportRequest request) { } return request; } + + /** + * Returns true iff the given action is a proxy action + */ + public static boolean isProxyAction(String action) { + return action.startsWith(PROXY_ACTION_PREFIX); + } + + /** + * Returns true iff the given request is a proxy request + */ + public static boolean isProxyRequest(TransportRequest request) { + return request instanceof ProxyRequest; + } } diff --git a/core/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java b/core/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java index bb1c70da34417..e73ad8e439cb8 100644 --- a/core/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java +++ b/core/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java @@ -258,4 +258,16 @@ public void testUnwrap() { assertTrue(transportRequest instanceof TransportActionProxy.ProxyRequest); assertSame(TransportService.HandshakeRequest.INSTANCE, TransportActionProxy.unwrapRequest(transportRequest)); } + + public void testIsProxyAction() { + String action = "foo/bar"; + String proxyAction = TransportActionProxy.getProxyAction(action); + assertTrue(TransportActionProxy.isProxyAction(proxyAction)); + assertFalse(TransportActionProxy.isProxyAction(action)); + } + + public void testIsProxyRequest() { + assertTrue(TransportActionProxy.isProxyRequest(new TransportActionProxy.ProxyRequest<>(() -> null))); + assertFalse(TransportActionProxy.isProxyRequest(TransportRequest.Empty.INSTANCE)); + } } From 3dff739732d029f00818c87d58b8aa3d197678a4 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 8 Jun 2017 07:57:02 +0200 Subject: [PATCH 09/11] remove extra catch block since it's not needed --- .../transport/RemoteClusterConnection.java | 77 +++++++++---------- 1 file changed, 36 insertions(+), 41 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 1e766cb2c2b38..ff4f2dc586c36 100644 --- a/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -213,51 +213,46 @@ public String executor() { * that returns null if the node ID is not found. */ void collectNodes(ActionListener> listener) { - try { - Runnable runnable = () -> { - final ClusterStateRequest request = new ClusterStateRequest(); - request.clear(); - request.nodes(true); - request.local(true); // run this on the node that get's the request it's as good as any other - final DiscoveryNode node = nodeSupplier.get(); - transportService.sendRequest(node, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY, - new TransportResponseHandler() { - @Override - public ClusterStateResponse newInstance() { - return new ClusterStateResponse(); - } + Runnable runnable = () -> { + final ClusterStateRequest request = new ClusterStateRequest(); + request.clear(); + request.nodes(true); + request.local(true); // run this on the node that get's the request it's as good as any other + final DiscoveryNode node = nodeSupplier.get(); + transportService.sendRequest(node, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY, + new TransportResponseHandler() { + @Override + public ClusterStateResponse newInstance() { + return new ClusterStateResponse(); + } - @Override - public void handleResponse(ClusterStateResponse response) { - DiscoveryNodes nodes = response.getState().nodes(); - listener.onResponse(nodes::get); - } + @Override + public void handleResponse(ClusterStateResponse response) { + DiscoveryNodes nodes = response.getState().nodes(); + listener.onResponse(nodes::get); + } - @Override - public void handleException(TransportException exp) { - listener.onFailure(exp); - } + @Override + public void handleException(TransportException exp) { + listener.onFailure(exp); + } - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - }); - }; - if (connectedNodes.isEmpty()) { - // just in case if we are not connected for some reason we try to connect and if we fail we have to notify the listener - // this will cause some back pressure on the search end and eventually will cause rejections but that's fine - // we can't proceed with a search on a cluster level. - // in the future we might want to just skip the remote nodes in such a case but that can already be implemented on the caller - // end since they provide the listener. - ensureConnected(ActionListener.wrap((x) -> runnable.run(), listener::onFailure)); - } else { - runnable.run(); - } - } catch (Exception ex) { - listener.onFailure(ex); + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }); + }; + if (connectedNodes.isEmpty()) { + // just in case if we are not connected for some reason we try to connect and if we fail we have to notify the listener + // this will cause some back pressure on the search end and eventually will cause rejections but that's fine + // we can't proceed with a search on a cluster level. + // in the future we might want to just skip the remote nodes in such a case but that can already be implemented on the + // caller end since they provide the listener. + ensureConnected(ActionListener.wrap((x) -> runnable.run(), listener::onFailure)); + } else { + runnable.run(); } - } /** From a4399e37922d9a6d91e102f8de6ab61349263cc9 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 13 Jun 2017 14:19:28 +0200 Subject: [PATCH 10/11] fix typo --- .../org/elasticsearch/transport/RemoteClusterConnection.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index ff4f2dc586c36..59da9bee7efe2 100644 --- a/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -217,7 +217,7 @@ void collectNodes(ActionListener> listener) { final ClusterStateRequest request = new ClusterStateRequest(); request.clear(); request.nodes(true); - request.local(true); // run this on the node that get's the request it's as good as any other + request.local(true); // run this on the node that gets the request it's as good as any other final DiscoveryNode node = nodeSupplier.get(); transportService.sendRequest(node, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY, new TransportResponseHandler() { From 9248816a7b97ac2418a56975569000e34ace9f28 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 13 Jun 2017 15:24:18 +0200 Subject: [PATCH 11/11] apply feedback --- .../elasticsearch/action/search/ClearScrollController.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/ClearScrollController.java b/core/src/main/java/org/elasticsearch/action/search/ClearScrollController.java index 7a4dbf23bb0ea..ac708d9b6b0c7 100644 --- a/core/src/main/java/org/elasticsearch/action/search/ClearScrollController.java +++ b/core/src/main/java/org/elasticsearch/action/search/ClearScrollController.java @@ -111,11 +111,9 @@ void cleanScrollIds(List parsedScrollIds) { onFreedContext(false); } else { try { - Transport.Connection connection = searchTransportService.getConnection(target.getClusterAlias(), - node); + Transport.Connection connection = searchTransportService.getConnection(target.getClusterAlias(), node); searchTransportService.sendFreeContext(connection, target.getScrollId(), - ActionListener.wrap(freed -> onFreedContext(freed.isFreed()), - e -> onFailedFreedContext(e, node))); + ActionListener.wrap(freed -> onFreedContext(freed.isFreed()), e -> onFailedFreedContext(e, node))); } catch (Exception e) { onFailedFreedContext(e, node); }