diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 52c3be8789416..ad3b2efd42f1c 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -50,7 +50,6 @@ import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -351,8 +350,7 @@ private boolean shouldPreFilterSearchShards(SearchRequest searchRequest, GroupSh static GroupShardsIterator mergeShardsIterators(GroupShardsIterator localShardsIterator, OriginalIndices localIndices, List remoteShardIterators) { - List shards = new ArrayList<>(); - shards.addAll(remoteShardIterators); + List shards = new ArrayList<>(remoteShardIterators); for (ShardIterator shardIterator : localShardsIterator) { shards.add(new SearchShardIterator(null, shardIterator.shardId(), shardIterator.getShardRoutings(), localIndices)); } @@ -384,7 +382,7 @@ private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchReque clusterStateVersion, aliasFilter, concreteIndexBoosts, indexRoutings, listener, false, clusters); return new SearchPhase(action.getName()) { @Override - public void run() throws IOException { + public void run() { action.start(); } }; diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 59869d585a5c2..82b921bd233b0 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -20,7 +20,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; -import org.elasticsearch.core.internal.io.IOUtils; import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -40,6 +39,7 @@ import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; @@ -50,7 +50,6 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Optional; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -61,7 +60,6 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; -import java.util.function.Supplier; import java.util.stream.Collectors; /** @@ -181,7 +179,7 @@ public void ensureConnected(ActionListener voidActionListener) { private void fetchShardsInternal(ClusterSearchShardsRequest searchShardsRequest, final ActionListener listener) { - final DiscoveryNode node = connectedNodes.get(); + final DiscoveryNode node = connectedNodes.getAny(); transportService.sendRequest(node, ClusterSearchShardsAction.NAME, searchShardsRequest, new TransportResponseHandler() { @@ -217,7 +215,7 @@ void collectNodes(ActionListener> listener) { request.clear(); request.nodes(true); request.local(true); // run this on the node that gets the request it's as good as any other - final DiscoveryNode node = connectedNodes.get(); + final DiscoveryNode node = connectedNodes.getAny(); transportService.sendRequest(node, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY, new TransportResponseHandler() { @Override @@ -255,40 +253,52 @@ public String executor() { } /** - * Returns a connection to the remote cluster. This connection might be a proxy connection that redirects internally to the - * given node. + * Returns a connection to the remote cluster, preferably a direct connection to the provided {@link DiscoveryNode}. + * If such node is not connected, the returned connection will be a proxy connection that redirects to it. */ Transport.Connection getConnection(DiscoveryNode remoteClusterNode) { - DiscoveryNode discoveryNode = connectedNodes.get(); + if (transportService.nodeConnected(remoteClusterNode)) { + return transportService.getConnection(remoteClusterNode); + } + DiscoveryNode discoveryNode = connectedNodes.getAny(); Transport.Connection connection = transportService.getConnection(discoveryNode); - return new Transport.Connection() { - @Override - public DiscoveryNode getNode() { - return remoteClusterNode; - } + return new ProxyConnection(connection, remoteClusterNode); + } - @Override - public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) + static final class ProxyConnection implements Transport.Connection { + private final Transport.Connection proxyConnection; + private final DiscoveryNode targetNode; + + private ProxyConnection(Transport.Connection proxyConnection, DiscoveryNode targetNode) { + this.proxyConnection = proxyConnection; + this.targetNode = targetNode; + } + + @Override + public DiscoveryNode getNode() { + return targetNode; + } + + @Override + public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { - connection.sendRequest(requestId, TransportActionProxy.getProxyAction(action), - TransportActionProxy.wrapRequest(remoteClusterNode, request), options); - } + proxyConnection.sendRequest(requestId, TransportActionProxy.getProxyAction(action), + TransportActionProxy.wrapRequest(targetNode, request), options); + } - @Override - public void close() throws IOException { - assert false: "proxy connections must not be closed"; - } + @Override + public void close() { + assert false: "proxy connections must not be closed"; + } - @Override - public Version getVersion() { - return connection.getVersion(); - } - }; + @Override + public Version getVersion() { + return proxyConnection.getVersion(); + } } Transport.Connection getConnection() { - DiscoveryNode discoveryNode = connectedNodes.get(); - return transportService.getConnection(discoveryNode); + return transportService.getConnection(getAnyConnectedNode()); } @Override @@ -385,7 +395,7 @@ public void onFailure(Exception e) { } @Override - protected void doRun() throws Exception { + protected void doRun() { ActionListener listener = ActionListener.wrap((x) -> { synchronized (queue) { running.release(); @@ -590,8 +600,8 @@ boolean isNodeConnected(final DiscoveryNode node) { return connectedNodes.contains(node); } - DiscoveryNode getConnectedNode() { - return connectedNodes.get(); + DiscoveryNode getAnyConnectedNode() { + return connectedNodes.getAny(); } void addConnectedNode(DiscoveryNode node) { @@ -612,7 +622,7 @@ int getNumNodesConnected() { return connectedNodes.size(); } - private static class ConnectedNodes implements Supplier { + private static final class ConnectedNodes { private final Set nodeSet = new HashSet<>(); private final String clusterAlias; @@ -623,8 +633,7 @@ private ConnectedNodes(String clusterAlias) { this.clusterAlias = clusterAlias; } - @Override - public synchronized DiscoveryNode get() { + public synchronized DiscoveryNode getAny() { ensureIteratorAvailable(); if (currentIterator.hasNext()) { return currentIterator.next(); @@ -657,15 +666,6 @@ synchronized boolean contains(DiscoveryNode node) { return nodeSet.contains(node); } - synchronized Optional getAny() { - ensureIteratorAvailable(); - if (currentIterator.hasNext()) { - return Optional.of(currentIterator.next()); - } else { - return Optional.empty(); - } - } - private synchronized void ensureIteratorAvailable() { if (currentIterator == null) { currentIterator = nodeSet.iterator(); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 0739ff5633bec..ac6f99351e46d 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -81,6 +81,7 @@ import static org.hamcrest.Matchers.iterableWithSize; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.Matchers.startsWith; public class RemoteClusterConnectionTests extends ESTestCase { @@ -992,7 +993,7 @@ public void testConnectedNodesConcurrentAccess() throws IOException, Interrupted barrier.await(); for (int j = 0; j < numGetCalls; j++) { try { - DiscoveryNode node = connection.getConnectedNode(); + DiscoveryNode node = connection.getAnyConnectedNode(); assertNotNull(node); } catch (IllegalStateException e) { if (e.getMessage().startsWith("No node available for cluster:") == false) { @@ -1053,10 +1054,10 @@ public void testClusterNameIsChecked() throws Exception { try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT, threadPool, settings); MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT, threadPool, settings); - MockTransportService otherClusterTransport = startTransport("other_cluster_discoverable_node", otherClusterKnownNodes, - Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build()); - MockTransportService otherClusterDiscoverable= startTransport("other_cluster_discoverable_node", otherClusterKnownNodes, - Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build())) { + MockTransportService otherClusterTransport = startTransport("other_cluster_discoverable_node", otherClusterKnownNodes, + Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build()); + MockTransportService otherClusterDiscoverable= startTransport("other_cluster_discoverable_node", otherClusterKnownNodes, + Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build())) { DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode(); knownNodes.add(seedTransport.getLocalDiscoNode()); @@ -1093,4 +1094,76 @@ public void testClusterNameIsChecked() throws Exception { } } } + + public void testGetConnection() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { + + DiscoveryNode connectedNode = seedTransport.getLocalDiscoNode(); + assertThat(connectedNode, notNullValue()); + knownNodes.add(connectedNode); + + DiscoveryNode disconnectedNode = discoverableTransport.getLocalDiscoNode(); + assertThat(disconnectedNode, notNullValue()); + knownNodes.add(disconnectedNode); + + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + Transport.Connection seedConnection = new Transport.Connection() { + @Override + public DiscoveryNode getNode() { + return connectedNode; + } + + @Override + public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) + throws TransportException { + // no-op + } + + @Override + public void close() { + // no-op + } + }; + service.addDelegate(connectedNode.getAddress(), new MockTransportService.DelegateTransport(service.getOriginalTransport()) { + @Override + public Connection getConnection(DiscoveryNode node) { + if (node == connectedNode) { + return seedConnection; + } + return super.getConnection(node); + } + + @Override + public boolean nodeConnected(DiscoveryNode node) { + return node.equals(connectedNode); + } + }); + service.start(); + service.acceptIncomingRequests(); + try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", + Collections.singletonList(connectedNode), service, Integer.MAX_VALUE, n -> true)) { + connection.addConnectedNode(connectedNode); + for (int i = 0; i < 10; i++) { + //always a direct connection as the remote node is already connected + Transport.Connection remoteConnection = connection.getConnection(connectedNode); + assertSame(seedConnection, remoteConnection); + } + for (int i = 0; i < 10; i++) { + //always a direct connection as the remote node is already connected + Transport.Connection remoteConnection = connection.getConnection(service.getLocalNode()); + assertThat(remoteConnection, not(instanceOf(RemoteClusterConnection.ProxyConnection.class))); + assertThat(remoteConnection.getNode(), equalTo(service.getLocalNode())); + } + for (int i = 0; i < 10; i++) { + //always a proxy connection as the target node is not connected + Transport.Connection remoteConnection = connection.getConnection(disconnectedNode); + assertThat(remoteConnection, instanceOf(RemoteClusterConnection.ProxyConnection.class)); + assertThat(remoteConnection.getNode(), sameInstance(disconnectedNode)); + } + } + } + } + } }