From 23b3a40b128bc16c2a69a2829611f12accd149f9 Mon Sep 17 00:00:00 2001 From: javanna Date: Mon, 11 Jun 2018 17:46:42 +0200 Subject: [PATCH 1/2] CCS: don't proxy requests for already connected node Cross-cluster search selects a subset of nodes for each remote cluster and sends requests only to them, which will act as a proxy and properly redirect such requests to the target nodes that hold the relevant data. What happens today is that every time we send a request to a remote cluster, it will be sent to the next node in the proxy list (in round-robin fashion), regardless of whether the target node is already amongst the ones that we are connected to. In case for instance we need to send a shard search request to a data node that's also one of the selected proxy nodes, we may end up sending the request to it through one of the other proxy nodes. This commit optimizes this case to make sure that whenever we are already connected to a remote node, we will send a direct request rather than using the next proxy node. There is a side-effect to this, which is that round-robin will be a bit unbalanced as the data nodes that are also selected as proxies will receive more requests. --- .../action/search/TransportSearchAction.java | 7 +- .../transport/RemoteClusterConnection.java | 92 +++++++++---------- .../RemoteClusterConnectionTests.java | 72 ++++++++++++++- 3 files changed, 113 insertions(+), 58 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index ac9248ef98d41..59aef0e229b8d 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -27,7 +27,6 @@ import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -51,7 +50,6 @@ import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -364,8 +362,7 @@ private boolean shouldPreFilterSearchShards(SearchRequest searchRequest, GroupSh static GroupShardsIterator mergeShardsIterators(GroupShardsIterator localShardsIterator, OriginalIndices localIndices, List remoteShardIterators) { - List shards = new ArrayList<>(); - shards.addAll(remoteShardIterators); + List shards = new ArrayList<>(remoteShardIterators); for (ShardIterator shardIterator : localShardsIterator) { shards.add(new SearchShardIterator(null, shardIterator.shardId(), shardIterator.getShardRoutings(), localIndices)); } @@ -397,7 +394,7 @@ private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchReque clusterStateVersion, aliasFilter, concreteIndexBoosts, indexRoutings, listener, false, clusters); return new SearchPhase(action.getName()) { @Override - public void run() throws IOException { + public void run() { action.start(); } }; diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index f24a1a928d50f..cb66dcac39c79 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -20,14 +20,9 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; -import org.elasticsearch.core.internal.io.IOUtils; import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; -import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction; -import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; -import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; @@ -44,6 +39,7 @@ import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; @@ -54,7 +50,6 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Optional; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -65,7 +60,6 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; -import java.util.function.Supplier; import java.util.stream.Collectors; /** @@ -185,7 +179,7 @@ public void ensureConnected(ActionListener voidActionListener) { private void fetchShardsInternal(ClusterSearchShardsRequest searchShardsRequest, final ActionListener listener) { - final DiscoveryNode node = connectedNodes.get(); + final DiscoveryNode node = connectedNodes.getAny(); transportService.sendRequest(node, ClusterSearchShardsAction.NAME, searchShardsRequest, new TransportResponseHandler() { @@ -221,7 +215,7 @@ void collectNodes(ActionListener> listener) { request.clear(); request.nodes(true); request.local(true); // run this on the node that gets the request it's as good as any other - final DiscoveryNode node = connectedNodes.get(); + final DiscoveryNode node = connectedNodes.getAny(); transportService.sendRequest(node, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY, new TransportResponseHandler() { @Override @@ -259,40 +253,52 @@ public String executor() { } /** - * Returns a connection to the remote cluster. This connection might be a proxy connection that redirects internally to the - * given node. + * Returns a connection to the remote cluster, preferably a direct connection to the provided {@link DiscoveryNode}. + * If such node is not connected, the returned connection will be a proxy connection that redirects to it. */ Transport.Connection getConnection(DiscoveryNode remoteClusterNode) { - DiscoveryNode discoveryNode = connectedNodes.get(); + if (connectedNodes.contains(remoteClusterNode)) { + return transportService.getConnection(remoteClusterNode); + } + DiscoveryNode discoveryNode = connectedNodes.getAny(); Transport.Connection connection = transportService.getConnection(discoveryNode); - return new Transport.Connection() { - @Override - public DiscoveryNode getNode() { - return remoteClusterNode; - } + return new ProxyConnection(connection, remoteClusterNode); + } - @Override - public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) + static class ProxyConnection implements Transport.Connection { + private final Transport.Connection proxyConnection; + private final DiscoveryNode targetNode; + + private ProxyConnection(Transport.Connection proxyConnection, DiscoveryNode targetNode) { + this.proxyConnection = proxyConnection; + this.targetNode = targetNode; + } + + @Override + public DiscoveryNode getNode() { + return targetNode; + } + + @Override + public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { - connection.sendRequest(requestId, TransportActionProxy.getProxyAction(action), - TransportActionProxy.wrapRequest(remoteClusterNode, request), options); - } + proxyConnection.sendRequest(requestId, TransportActionProxy.getProxyAction(action), + TransportActionProxy.wrapRequest(targetNode, request), options); + } - @Override - public void close() throws IOException { - assert false: "proxy connections must not be closed"; - } + @Override + public void close() { + assert false: "proxy connections must not be closed"; + } - @Override - public Version getVersion() { - return connection.getVersion(); - } - }; + @Override + public Version getVersion() { + return proxyConnection.getVersion(); + } } Transport.Connection getConnection() { - DiscoveryNode discoveryNode = connectedNodes.get(); - return transportService.getConnection(discoveryNode); + return transportService.getConnection(getAnyConnectedNode()); } @Override @@ -389,7 +395,7 @@ public void onFailure(Exception e) { } @Override - protected void doRun() throws Exception { + protected void doRun() { ActionListener listener = ActionListener.wrap((x) -> { synchronized (queue) { running.release(); @@ -594,8 +600,8 @@ boolean isNodeConnected(final DiscoveryNode node) { return connectedNodes.contains(node); } - DiscoveryNode getConnectedNode() { - return connectedNodes.get(); + DiscoveryNode getAnyConnectedNode() { + return connectedNodes.getAny(); } void addConnectedNode(DiscoveryNode node) { @@ -616,7 +622,7 @@ int getNumNodesConnected() { return connectedNodes.size(); } - private static class ConnectedNodes implements Supplier { + private static class ConnectedNodes { private final Set nodeSet = new HashSet<>(); private final String clusterAlias; @@ -627,8 +633,7 @@ private ConnectedNodes(String clusterAlias) { this.clusterAlias = clusterAlias; } - @Override - public synchronized DiscoveryNode get() { + public synchronized DiscoveryNode getAny() { ensureIteratorAvailable(); if (currentIterator.hasNext()) { return currentIterator.next(); @@ -661,15 +666,6 @@ synchronized boolean contains(DiscoveryNode node) { return nodeSet.contains(node); } - synchronized Optional getAny() { - ensureIteratorAvailable(); - if (currentIterator.hasNext()) { - return Optional.of(currentIterator.next()); - } else { - return Optional.empty(); - } - } - private synchronized void ensureIteratorAvailable() { if (currentIterator == null) { currentIterator = nodeSet.iterator(); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 0739ff5633bec..2ffa8d0461f7a 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -81,6 +81,7 @@ import static org.hamcrest.Matchers.iterableWithSize; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.Matchers.startsWith; public class RemoteClusterConnectionTests extends ESTestCase { @@ -992,7 +993,7 @@ public void testConnectedNodesConcurrentAccess() throws IOException, Interrupted barrier.await(); for (int j = 0; j < numGetCalls; j++) { try { - DiscoveryNode node = connection.getConnectedNode(); + DiscoveryNode node = connection.getAnyConnectedNode(); assertNotNull(node); } catch (IllegalStateException e) { if (e.getMessage().startsWith("No node available for cluster:") == false) { @@ -1053,10 +1054,10 @@ public void testClusterNameIsChecked() throws Exception { try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT, threadPool, settings); MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT, threadPool, settings); - MockTransportService otherClusterTransport = startTransport("other_cluster_discoverable_node", otherClusterKnownNodes, - Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build()); - MockTransportService otherClusterDiscoverable= startTransport("other_cluster_discoverable_node", otherClusterKnownNodes, - Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build())) { + MockTransportService otherClusterTransport = startTransport("other_cluster_discoverable_node", otherClusterKnownNodes, + Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build()); + MockTransportService otherClusterDiscoverable= startTransport("other_cluster_discoverable_node", otherClusterKnownNodes, + Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build())) { DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode(); knownNodes.add(seedTransport.getLocalDiscoNode()); @@ -1093,4 +1094,65 @@ public void testClusterNameIsChecked() throws Exception { } } } + + public void testGetConnection() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { + + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + assertThat(seedNode, notNullValue()); + knownNodes.add(seedNode); + + DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode(); + assertThat(discoverableNode, notNullValue()); + knownNodes.add(discoverableNode); + + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + Transport.Connection seedConnection = new Transport.Connection() { + @Override + public DiscoveryNode getNode() { + return seedNode; + } + + @Override + public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) + throws TransportException { + // no-op + } + + @Override + public void close() { + // no-op + } + }; + service.addDelegate(seedNode.getAddress(), new MockTransportService.DelegateTransport(service.getOriginalTransport()) { + @Override + public Connection getConnection(DiscoveryNode node) { + if (node == seedNode) { + return seedConnection; + } + return super.getConnection(node); + } + }); + service.start(); + service.acceptIncomingRequests(); + try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", + Collections.singletonList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + connection.addConnectedNode(seedNode); + for (int i = 0; i < 10; i++) { + //always a direct connection as the remote node is already connected + Transport.Connection remoteConnection = connection.getConnection(seedNode); + assertSame(seedConnection, remoteConnection); + } + for (int i = 0; i < 10; i++) { + //always a proxy connection as the target node is not connected + Transport.Connection remoteConnection = connection.getConnection(discoverableNode); + assertThat(remoteConnection, instanceOf(RemoteClusterConnection.ProxyConnection.class)); + assertThat(remoteConnection.getNode(), sameInstance(discoverableNode)); + } + } + } + } + } } From 8979a5b7df07ae01fe25a2d1758a7575cd7f3712 Mon Sep 17 00:00:00 2001 From: javanna Date: Wed, 13 Jun 2018 12:38:52 +0200 Subject: [PATCH 2/2] address review comments --- .../transport/RemoteClusterConnection.java | 6 +-- .../RemoteClusterConnectionTests.java | 39 ++++++++++++------- 2 files changed, 28 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index cb66dcac39c79..82b921bd233b0 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -257,7 +257,7 @@ public String executor() { * If such node is not connected, the returned connection will be a proxy connection that redirects to it. */ Transport.Connection getConnection(DiscoveryNode remoteClusterNode) { - if (connectedNodes.contains(remoteClusterNode)) { + if (transportService.nodeConnected(remoteClusterNode)) { return transportService.getConnection(remoteClusterNode); } DiscoveryNode discoveryNode = connectedNodes.getAny(); @@ -265,7 +265,7 @@ Transport.Connection getConnection(DiscoveryNode remoteClusterNode) { return new ProxyConnection(connection, remoteClusterNode); } - static class ProxyConnection implements Transport.Connection { + static final class ProxyConnection implements Transport.Connection { private final Transport.Connection proxyConnection; private final DiscoveryNode targetNode; @@ -622,7 +622,7 @@ int getNumNodesConnected() { return connectedNodes.size(); } - private static class ConnectedNodes { + private static final class ConnectedNodes { private final Set nodeSet = new HashSet<>(); private final String clusterAlias; diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 2ffa8d0461f7a..ac6f99351e46d 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -1100,19 +1100,19 @@ public void testGetConnection() throws Exception { try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { - DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); - assertThat(seedNode, notNullValue()); - knownNodes.add(seedNode); + DiscoveryNode connectedNode = seedTransport.getLocalDiscoNode(); + assertThat(connectedNode, notNullValue()); + knownNodes.add(connectedNode); - DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode(); - assertThat(discoverableNode, notNullValue()); - knownNodes.add(discoverableNode); + DiscoveryNode disconnectedNode = discoverableTransport.getLocalDiscoNode(); + assertThat(disconnectedNode, notNullValue()); + knownNodes.add(disconnectedNode); try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { Transport.Connection seedConnection = new Transport.Connection() { @Override public DiscoveryNode getNode() { - return seedNode; + return connectedNode; } @Override @@ -1126,30 +1126,41 @@ public void close() { // no-op } }; - service.addDelegate(seedNode.getAddress(), new MockTransportService.DelegateTransport(service.getOriginalTransport()) { + service.addDelegate(connectedNode.getAddress(), new MockTransportService.DelegateTransport(service.getOriginalTransport()) { @Override public Connection getConnection(DiscoveryNode node) { - if (node == seedNode) { + if (node == connectedNode) { return seedConnection; } return super.getConnection(node); } + + @Override + public boolean nodeConnected(DiscoveryNode node) { + return node.equals(connectedNode); + } }); service.start(); service.acceptIncomingRequests(); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Collections.singletonList(seedNode), service, Integer.MAX_VALUE, n -> true)) { - connection.addConnectedNode(seedNode); + Collections.singletonList(connectedNode), service, Integer.MAX_VALUE, n -> true)) { + connection.addConnectedNode(connectedNode); for (int i = 0; i < 10; i++) { //always a direct connection as the remote node is already connected - Transport.Connection remoteConnection = connection.getConnection(seedNode); + Transport.Connection remoteConnection = connection.getConnection(connectedNode); assertSame(seedConnection, remoteConnection); } + for (int i = 0; i < 10; i++) { + //always a direct connection as the remote node is already connected + Transport.Connection remoteConnection = connection.getConnection(service.getLocalNode()); + assertThat(remoteConnection, not(instanceOf(RemoteClusterConnection.ProxyConnection.class))); + assertThat(remoteConnection.getNode(), equalTo(service.getLocalNode())); + } for (int i = 0; i < 10; i++) { //always a proxy connection as the target node is not connected - Transport.Connection remoteConnection = connection.getConnection(discoverableNode); + Transport.Connection remoteConnection = connection.getConnection(disconnectedNode); assertThat(remoteConnection, instanceOf(RemoteClusterConnection.ProxyConnection.class)); - assertThat(remoteConnection.getNode(), sameInstance(discoverableNode)); + assertThat(remoteConnection.getNode(), sameInstance(disconnectedNode)); } } }