diff --git a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/15_connection_mode_configuration.yml b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/15_connection_mode_configuration.yml index 5606a08cd261e..05185cb3e3328 100644 --- a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/15_connection_mode_configuration.yml +++ b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/15_connection_mode_configuration.yml @@ -14,7 +14,7 @@ transient: cluster.remote.test_remote_cluster.mode: "proxy" cluster.remote.test_remote_cluster.node_connections: "5" - cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip + cluster.remote.test_remote_cluster.proxy_address: $remote_ip - match: { status: 400 } - match: { error.root_cause.0.type: "illegal_argument_exception" } @@ -29,7 +29,7 @@ transient: cluster.remote.test_remote_cluster.mode: "proxy" cluster.remote.test_remote_cluster.seeds: $remote_ip - cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip + cluster.remote.test_remote_cluster.proxy_address: $remote_ip - match: { status: 400 } - match: { error.root_cause.0.type: "illegal_argument_exception" } @@ -64,12 +64,12 @@ flat_settings: true body: transient: - cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip + cluster.remote.test_remote_cluster.proxy_address: $remote_ip cluster.remote.test_remote_cluster.seeds: $remote_ip - match: { status: 400 } - match: { error.root_cause.0.type: "illegal_argument_exception" } - - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.proxy_addresses\" cannot be + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.proxy_address\" cannot be used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=PROXY, configured=SNIFF]" } --- @@ -87,11 +87,11 @@ transient: cluster.remote.test_remote_cluster.mode: "proxy" cluster.remote.test_remote_cluster.proxy_socket_connections: "3" - cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip + cluster.remote.test_remote_cluster.proxy_address: $remote_ip - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "proxy"} - match: {transient.cluster\.remote\.test_remote_cluster\.proxy_socket_connections: "3"} - - match: {transient.cluster\.remote\.test_remote_cluster\.proxy_addresses: $remote_ip} + - match: {transient.cluster\.remote\.test_remote_cluster\.proxy_address: $remote_ip} - do: search: @@ -179,7 +179,7 @@ body: transient: cluster.remote.test_remote_cluster.mode: "proxy" - cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip + cluster.remote.test_remote_cluster.proxy_address: $remote_ip - match: { status: 400 } - match: { error.root_cause.0.type: "illegal_argument_exception" } @@ -193,10 +193,10 @@ transient: cluster.remote.test_remote_cluster.mode: "proxy" cluster.remote.test_remote_cluster.seeds: null - cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip + cluster.remote.test_remote_cluster.proxy_address: $remote_ip - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "proxy"} - - match: {transient.cluster\.remote\.test_remote_cluster\.proxy_addresses: $remote_ip} + - match: {transient.cluster\.remote\.test_remote_cluster\.proxy_address: $remote_ip} - do: search: diff --git a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/20_info.yml b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/20_info.yml index 10378aaeda125..0e5236f9b1171 100644 --- a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/20_info.yml +++ b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/20_info.yml @@ -70,17 +70,17 @@ cluster.remote.test_remote_cluster.seeds: null cluster.remote.test_remote_cluster.node_connections: null cluster.remote.test_remote_cluster.proxy_socket_connections: "10" - cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip + cluster.remote.test_remote_cluster.proxy_address: $remote_ip - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "proxy"} - match: {transient.cluster\.remote\.test_remote_cluster\.proxy_socket_connections: "10"} - - match: {transient.cluster\.remote\.test_remote_cluster\.proxy_addresses: $remote_ip} + - match: {transient.cluster\.remote\.test_remote_cluster\.proxy_address: $remote_ip} - do: cluster.remote_info: {} - match: { test_remote_cluster.connected: true } - - match: { test_remote_cluster.addresses.0: $remote_ip } + - match: { test_remote_cluster.address: $remote_ip } - gt: { test_remote_cluster.num_sockets_connected: 0} - match: { test_remote_cluster.max_socket_connections: 10} - match: { test_remote_cluster.initial_connect_timeout: "30s" } @@ -92,7 +92,7 @@ transient: cluster.remote.test_remote_cluster.mode: null cluster.remote.test_remote_cluster.proxy_socket_connections: null - cluster.remote.test_remote_cluster.proxy_addresses: null + cluster.remote.test_remote_cluster.proxy_address: null --- "skip_unavailable is returned as part of _remote/info response": diff --git a/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java index dabdd771afaed..78c9f5f28154d 100644 --- a/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -37,18 +38,14 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; -import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; -import java.util.stream.Collectors; import java.util.stream.Stream; import static org.elasticsearch.common.settings.Setting.boolSetting; @@ -57,18 +54,16 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy { /** - * A list of addresses for remote cluster connections. The connections will be opened to the configured addresses in a round-robin - * fashion. + * The remote address for the proxy. The connections will be opened to the configured address. */ - public static final Setting.AffixSetting> REMOTE_CLUSTER_ADDRESSES = Setting.affixKeySetting( + public static final Setting.AffixSetting REMOTE_CLUSTER_ADDRESSES = Setting.affixKeySetting( "cluster.remote.", - "proxy_addresses", - (ns, key) -> Setting.listSetting(key, Collections.emptyList(), s -> { - // validate address - parsePort(s); - return s; - }, new StrategyValidator<>(ns, key, ConnectionStrategy.PROXY), - Setting.Property.Dynamic, Setting.Property.NodeScope)); + "proxy_address", + (ns, key) -> Setting.simpleString(key, new StrategyValidator<>(ns, key, ConnectionStrategy.PROXY, s -> { + if (Strings.hasLength(s)) { + parsePort(s); + } + }), Setting.Property.Dynamic, Setting.Property.NodeScope)); /** * The maximum number of socket connections that will be established to a remote cluster. The default is 18. @@ -95,9 +90,9 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy { private final int maxNumConnections; private final AtomicLong counter = new AtomicLong(0); - private final List configuredAddresses; + private final String configuredAddress; private final boolean includeServerName; - private final List> addresses; + private final Supplier address; private final AtomicReference remoteClusterName = new AtomicReference<>(); private final ConnectionProfile profile; private final ConnectionManager.ConnectionValidator clusterNameValidator; @@ -114,28 +109,26 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy { } ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, - int maxNumConnections, List configuredAddresses) { - this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddresses, - configuredAddresses.stream().map(address -> - (Supplier) () -> resolveAddress(address)).collect(Collectors.toList()), false); + int maxNumConnections, String configuredAddress) { + this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddress, + () -> resolveAddress(configuredAddress), false); } ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, - int maxNumConnections, List configuredAddresses, boolean includeServerName) { - this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddresses, - configuredAddresses.stream().map(address -> - (Supplier) () -> resolveAddress(address)).collect(Collectors.toList()), includeServerName); + int maxNumConnections, String configuredAddress, boolean includeServerName) { + this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddress, + () -> resolveAddress(configuredAddress), includeServerName); } ProxyConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, - int maxNumConnections, List configuredAddresses, List> addresses, + int maxNumConnections, String configuredAddress, Supplier address, boolean includeServerName) { super(clusterAlias, transportService, connectionManager); this.maxNumConnections = maxNumConnections; - this.configuredAddresses = configuredAddresses; + this.configuredAddress = configuredAddress; this.includeServerName = includeServerName; - assert addresses.isEmpty() == false : "Cannot use proxy connection strategy with no configured addresses"; - this.addresses = addresses; + assert Strings.isEmpty(configuredAddress) == false : "Cannot use proxy connection strategy with no configured addresses"; + this.address = address; // TODO: Move into the ConnectionManager this.profile = new ConnectionProfile.Builder() .addConnections(1, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING) @@ -172,9 +165,9 @@ protected boolean shouldOpenMoreConnections() { @Override protected boolean strategyMustBeRebuilt(Settings newSettings) { - List addresses = REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(newSettings); + String address = REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(newSettings); int numOfSockets = REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(newSettings); - return numOfSockets != maxNumConnections || addressesChanged(configuredAddresses, addresses); + return numOfSockets != maxNumConnections || configuredAddress.equals(address) == false; } @Override @@ -189,7 +182,7 @@ protected void connectImpl(ActionListener listener) { @Override public RemoteConnectionInfo.ModeInfo getModeInfo() { - return new ProxyModeInfo(configuredAddresses, maxNumConnections, connectionManager.size()); + return new ProxyModeInfo(configuredAddress, maxNumConnections, connectionManager.size()); } private void performProxyConnectionProcess(ActionListener listener) { @@ -198,7 +191,7 @@ private void performProxyConnectionProcess(ActionListener listener) { private void openConnections(ActionListener finished, int attemptNumber) { if (attemptNumber <= MAX_CONNECT_ATTEMPTS_PER_RUN) { - List resolved = addresses.stream().map(Supplier::get).collect(Collectors.toList()); + TransportAddress resolved = address.get(); int remaining = maxNumConnections - connectionManager.size(); ActionListener compositeListener = new ActionListener<>() { @@ -228,15 +221,14 @@ public void onFailure(Exception e) { for (int i = 0; i < remaining; ++i) { - TransportAddress address = nextAddress(resolved); - String id = clusterAlias + "#" + address; + String id = clusterAlias + "#" + resolved; Map attributes; if (includeServerName) { - attributes = Collections.singletonMap("server_name", address.address().getHostString()); + attributes = Collections.singletonMap("server_name", resolved.address().getHostString()); } else { attributes = Collections.emptyMap(); } - DiscoveryNode node = new DiscoveryNode(id, address, attributes, DiscoveryNodeRole.BUILT_IN_ROLES, + DiscoveryNode node = new DiscoveryNode(id, resolved, attributes, DiscoveryNodeRole.BUILT_IN_ROLES, Version.CURRENT.minimumCompatibilityVersion()); connectionManager.connectToNode(node, profile, clusterNameValidator, new ActionListener<>() { @@ -248,7 +240,7 @@ public void onResponse(Void v) { @Override public void onFailure(Exception e) { logger.debug(new ParameterizedMessage("failed to open remote connection [remote cluster: {}, address: {}]", - clusterAlias, address), e); + clusterAlias, resolved), e); compositeListener.onFailure(e); } }); @@ -276,40 +268,27 @@ private static TransportAddress resolveAddress(String address) { return new TransportAddress(parseConfiguredAddress(address)); } - private boolean addressesChanged(final List oldAddresses, final List newAddresses) { - if (oldAddresses.size() != newAddresses.size()) { - return true; - } - Set oldSeeds = new HashSet<>(oldAddresses); - Set newSeeds = new HashSet<>(newAddresses); - return oldSeeds.equals(newSeeds) == false; - } - static class ProxyModeInfo implements RemoteConnectionInfo.ModeInfo { - private final List addresses; + private final String address; private final int maxSocketConnections; private final int numSocketsConnected; - ProxyModeInfo(List addresses, int maxSocketConnections, int numSocketsConnected) { - this.addresses = addresses; + ProxyModeInfo(String address, int maxSocketConnections, int numSocketsConnected) { + this.address = address; this.maxSocketConnections = maxSocketConnections; this.numSocketsConnected = numSocketsConnected; } private ProxyModeInfo(StreamInput input) throws IOException { - addresses = Arrays.asList(input.readStringArray()); + address = input.readString(); maxSocketConnections = input.readVInt(); numSocketsConnected = input.readVInt(); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startArray("addresses"); - for (String address : addresses) { - builder.value(address); - } - builder.endArray(); + builder.field("address", address); builder.field("num_sockets_connected", numSocketsConnected); builder.field("max_socket_connections", maxSocketConnections); return builder; @@ -317,7 +296,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws @Override public void writeTo(StreamOutput out) throws IOException { - out.writeStringArray(addresses.toArray(new String[0])); + out.writeString(address); out.writeVInt(maxSocketConnections); out.writeVInt(numSocketsConnected); } @@ -344,12 +323,12 @@ public boolean equals(Object o) { ProxyModeInfo otherProxy = (ProxyModeInfo) o; return maxSocketConnections == otherProxy.maxSocketConnections && numSocketsConnected == otherProxy.numSocketsConnected && - Objects.equals(addresses, otherProxy.addresses); + Objects.equals(address, otherProxy.address); } @Override public int hashCode() { - return Objects.hash(addresses, maxSocketConnections, numSocketsConnected); + return Objects.hash(address, maxSocketConnections, numSocketsConnected); } } } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index 562a6bfbe464f..1ee8fba2bd7eb 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Setting; @@ -159,9 +160,8 @@ public static boolean isConnectionEnabled(String clusterAlias, Settings settings List seeds = SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings); return seeds.isEmpty() == false; } else { - List addresses = ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias) - .get(settings); - return addresses.isEmpty() == false; + String address = ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(settings); + return Strings.isEmpty(address) == false; } } diff --git a/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java index 5644ff895725d..1106067055353 100644 --- a/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ProxyConnectionStrategyTests.java @@ -23,7 +23,6 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.AbstractScopedSettings; import org.elasticsearch.common.settings.ClusterSettings; @@ -32,19 +31,16 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.MockTransportService; -import org.elasticsearch.test.transport.StubbableTransport; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -86,11 +82,9 @@ public MockTransportService startTransport(final String id, final Version versio } } - public void testProxyStrategyWillOpenExpectedNumberOfConnectionsToAddresses() { - try (MockTransportService transport1 = startTransport("node1", Version.CURRENT); - MockTransportService transport2 = startTransport("node2", Version.CURRENT)) { + public void testProxyStrategyWillOpenExpectedNumberOfConnectionsToAddress() { + try (MockTransportService transport1 = startTransport("node1", Version.CURRENT)) { TransportAddress address1 = transport1.boundAddress().publishAddress(); - TransportAddress address2 = transport2.boundAddress().publishAddress(); try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) { localService.start(); @@ -100,16 +94,14 @@ public void testProxyStrategyWillOpenExpectedNumberOfConnectionsToAddresses() { int numOfConnections = randomIntBetween(4, 8); try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager, - numOfConnections, addresses(address1, address2))) { + numOfConnections, address1.toString())) { assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); - assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); PlainActionFuture connectFuture = PlainActionFuture.newFuture(); strategy.connect(connectFuture); connectFuture.actionGet(); assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); - assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); assertEquals(numOfConnections, connectionManager.size()); assertTrue(strategy.assertNoRunningConnections()); } @@ -129,9 +121,10 @@ public void testProxyStrategyWillOpenNewConnectionsOnDisconnect() throws Excepti ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); int numOfConnections = randomIntBetween(4, 8); + try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager, - numOfConnections, addresses(address1, address2))) { + numOfConnections, address1.toString(), alternatingResolver(address1, address2), false)) { assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); @@ -143,7 +136,7 @@ numOfConnections, addresses(address1, address2))) { long initialConnectionsToTransport2 = connectionManager.getAllConnectedNodes().stream() .filter(n -> n.getAddress().equals(address2)) .count(); - assertNotEquals(0, initialConnectionsToTransport2); + assertEquals(0, initialConnectionsToTransport2); assertEquals(numOfConnections, connectionManager.size()); assertTrue(strategy.assertNoRunningConnections()); @@ -151,11 +144,12 @@ numOfConnections, addresses(address1, address2))) { assertBusy(() -> { assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); - // More connections now pointing to transport2 + // Connections now pointing to transport2 long finalConnectionsToTransport2 = connectionManager.getAllConnectedNodes().stream() .filter(n -> n.getAddress().equals(address2)) .count(); - assertTrue(finalConnectionsToTransport2 > initialConnectionsToTransport2); + assertNotEquals(0, finalConnectionsToTransport2); + assertEquals(numOfConnections, connectionManager.size()); assertTrue(strategy.assertNoRunningConnections()); }); } @@ -163,56 +157,6 @@ numOfConnections, addresses(address1, address2))) { } } - public void testConnectWithSingleIncompatibleNode() { - Version incompatibleVersion = Version.CURRENT.minimumCompatibilityVersion().minimumCompatibilityVersion(); - try (MockTransportService transport1 = startTransport("compatible-node", Version.CURRENT); - MockTransportService transport2 = startTransport("incompatible-node", incompatibleVersion)) { - TransportAddress address1 = transport1.boundAddress().publishAddress(); - TransportAddress address2 = transport2.boundAddress().publishAddress(); - - try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) { - localService.start(); - localService.acceptIncomingRequests(); - - StubbableTransport stubbableTransport = new StubbableTransport(localService.transport); - ConnectionManager connectionManager = new ConnectionManager(profile, stubbableTransport); - AtomicInteger address1Attempts = new AtomicInteger(0); - AtomicInteger address2Attempts = new AtomicInteger(0); - stubbableTransport.setDefaultConnectBehavior((transport, discoveryNode, profile, listener) -> { - if (discoveryNode.getAddress().equals(address1)) { - address1Attempts.incrementAndGet(); - transport.openConnection(discoveryNode, profile, listener); - } else if (discoveryNode.getAddress().equals(address2)) { - address2Attempts.incrementAndGet(); - transport.openConnection(discoveryNode, profile, listener); - } else { - throw new AssertionError("Unexpected address"); - } - }); - int numOfConnections = 5; - try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); - ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager, - numOfConnections, addresses(address1, address2))) { - assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); - assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); - - PlainActionFuture connectFuture = PlainActionFuture.newFuture(); - strategy.connect(connectFuture); - connectFuture.actionGet(); - - assertEquals(4 ,connectionManager.size()); - assertEquals(4 ,connectionManager.getAllConnectedNodes().stream().map(n -> n.getAddress().equals(address1)).count()); - // Three attempts on first round, one attempts on second round, zero attempts on third round - assertEquals(4, address1Attempts.get()); - // Two attempts on first round, one attempt on second round, one attempt on third round - assertEquals(4, address2Attempts.get()); - assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); - assertTrue(strategy.assertNoRunningConnections()); - } - } - } - } - public void testConnectFailsWithIncompatibleNodes() { Version incompatibleVersion = Version.CURRENT.minimumCompatibilityVersion().minimumCompatibilityVersion(); try (MockTransportService transport1 = startTransport("incompatible-node", incompatibleVersion)) { @@ -226,7 +170,7 @@ public void testConnectFailsWithIncompatibleNodes() { int numOfConnections = randomIntBetween(4, 8); try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager, - numOfConnections, addresses(address1))) { + numOfConnections, address1.toString())) { PlainActionFuture connectFuture = PlainActionFuture.newFuture(); strategy.connect(connectFuture); @@ -254,9 +198,11 @@ public void testClusterNameValidationPreventConnectingToDifferentClusters() thro ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); int numOfConnections = randomIntBetween(4, 8); + + Supplier resolver = alternatingResolver(address1, address2); try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager, - numOfConnections, addresses(address1, address2))) { + numOfConnections, address1.toString(), resolver, false)) { assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); @@ -264,12 +210,23 @@ numOfConnections, addresses(address1, address2))) { strategy.connect(connectFuture); connectFuture.actionGet(); - if (connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))) { - assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); - } else { - assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); - } - assertTrue(strategy.assertNoRunningConnections()); + assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); + assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); + + transport1.close(); + + assertBusy(() -> { + assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); + assertTrue(strategy.assertNoRunningConnections()); + + long finalConnectionsToTransport2 = connectionManager.getAllConnectedNodes().stream() + .filter(n -> n.getAddress().equals(address2)) + .count(); + + // Connections not pointing to transport2 because the cluster name is different + assertEquals(0, finalConnectionsToTransport2); + assertEquals(0, connectionManager.size()); + }); } } } @@ -293,7 +250,7 @@ public void testProxyStrategyWillResolveAddressesEachConnect() throws Exception int numOfConnections = randomIntBetween(4, 8); try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager, - numOfConnections, addresses(address), Collections.singletonList(addressSupplier), false)) { + numOfConnections, address.toString(), addressSupplier, false)) { PlainActionFuture connectFuture = PlainActionFuture.newFuture(); strategy.connect(connectFuture); connectFuture.actionGet(); @@ -307,10 +264,8 @@ numOfConnections, addresses(address), Collections.singletonList(addressSupplier } public void testProxyStrategyWillNeedToBeRebuiltIfNumOfSocketsOrAddressesChange() { - try (MockTransportService transport1 = startTransport("node1", Version.CURRENT); - MockTransportService transport2 = startTransport("node2", Version.CURRENT)) { - TransportAddress address1 = transport1.boundAddress().publishAddress(); - TransportAddress address2 = transport2.boundAddress().publishAddress(); + try (MockTransportService remoteTransport = startTransport("node1", Version.CURRENT)) { + TransportAddress remoteAddress = remoteTransport.boundAddress().publishAddress(); try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) { localService.start(); @@ -320,13 +275,12 @@ public void testProxyStrategyWillNeedToBeRebuiltIfNumOfSocketsOrAddressesChange( int numOfConnections = randomIntBetween(4, 8); try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager, - numOfConnections, addresses(address1, address2))) { + numOfConnections, remoteAddress.toString())) { PlainActionFuture connectFuture = PlainActionFuture.newFuture(); strategy.connect(connectFuture); connectFuture.actionGet(); - assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); - assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); + assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(remoteAddress))); assertEquals(numOfConnections, connectionManager.size()); assertTrue(strategy.assertNoRunningConnections()); @@ -339,18 +293,18 @@ numOfConnections, addresses(address1, address2))) { Settings noChange = Settings.builder() .put(modeSetting.getKey(), "proxy") - .put(addressesSetting.getKey(), Strings.arrayToCommaDelimitedString(addresses(address1, address2).toArray())) + .put(addressesSetting.getKey(), remoteAddress.toString()) .put(socketConnections.getKey(), numOfConnections) .build(); assertFalse(strategy.shouldRebuildConnection(noChange)); Settings addressesChanged = Settings.builder() .put(modeSetting.getKey(), "proxy") - .put(addressesSetting.getKey(), Strings.arrayToCommaDelimitedString(addresses(address1).toArray())) + .put(addressesSetting.getKey(), remoteAddress.toString()) .build(); assertTrue(strategy.shouldRebuildConnection(addressesChanged)); Settings socketsChanged = Settings.builder() .put(modeSetting.getKey(), "proxy") - .put(addressesSetting.getKey(), Strings.arrayToCommaDelimitedString(addresses(address1, address2).toArray())) + .put(addressesSetting.getKey(), remoteAddress.toString()) .put(socketConnections.getKey(), numOfConnections + 1) .build(); assertTrue(strategy.shouldRebuildConnection(socketsChanged)); @@ -398,14 +352,13 @@ public void testServerNameAttributes() { localService.start(); localService.acceptIncomingRequests(); - ArrayList addresses = new ArrayList<>(); - addresses.add("localhost:" + address1.getPort()); + String serverName = "localhost:" + address1.getPort(); ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); int numOfConnections = randomIntBetween(4, 8); try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); ProxyConnectionStrategy strategy = new ProxyConnectionStrategy(clusterAlias, localService, remoteConnectionManager, - numOfConnections, addresses, true)) { + numOfConnections, serverName, true)) { assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); PlainActionFuture connectFuture = PlainActionFuture.newFuture(); @@ -422,7 +375,18 @@ public void testServerNameAttributes() { } } - private static List addresses(final TransportAddress... addresses) { - return Arrays.stream(addresses).map(TransportAddress::toString).collect(Collectors.toList()); + private Supplier alternatingResolver(TransportAddress address1, TransportAddress address2) { + // On the first connection round, the connections will be routed to transport1. On the second + //connection round, the connections will be routed to transport2 + AtomicBoolean transportSwitch = new AtomicBoolean(true); + return () -> { + if (transportSwitch.get()) { + transportSwitch.set(false); + return address1; + } else { + transportSwitch.set(true); + return address2; + } + }; } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 030174dde5f90..195e979c56dc5 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -356,8 +356,8 @@ public void testRemoteConnectionInfo() throws IOException { modeInfo1 = new SniffConnectionStrategy.SniffModeInfo(remoteAddresses, 4, 4); modeInfo2 = new SniffConnectionStrategy.SniffModeInfo(remoteAddresses, 4, 3); } else { - modeInfo1 = new ProxyConnectionStrategy.ProxyModeInfo(remoteAddresses, 18, 18); - modeInfo2 = new SniffConnectionStrategy.SniffModeInfo(remoteAddresses, 18, 17); + modeInfo1 = new ProxyConnectionStrategy.ProxyModeInfo(remoteAddresses.get(0), 18, 18); + modeInfo2 = new ProxyConnectionStrategy.ProxyModeInfo(remoteAddresses.get(0), 18, 17); } RemoteConnectionInfo stats = @@ -404,7 +404,7 @@ public void testRenderConnectionInfoXContent() throws IOException { if (sniff) { modeInfo = new SniffConnectionStrategy.SniffModeInfo(remoteAddresses, 3, 2); } else { - modeInfo = new ProxyConnectionStrategy.ProxyModeInfo(remoteAddresses, 18, 16); + modeInfo = new ProxyConnectionStrategy.ProxyModeInfo(remoteAddresses.get(0), 18, 16); } RemoteConnectionInfo stats = new RemoteConnectionInfo("test_cluster", modeInfo, TimeValue.timeValueMinutes(30), true); @@ -419,7 +419,7 @@ public void testRenderConnectionInfoXContent() throws IOException { "\"num_nodes_connected\":2,\"max_connections_per_cluster\":3,\"initial_connect_timeout\":\"30m\"," + "\"skip_unavailable\":true}}", Strings.toString(builder)); } else { - assertEquals("{\"test_cluster\":{\"connected\":true,\"mode\":\"proxy\",\"addresses\":[\"seed:1\",\"seed:2\"]," + + assertEquals("{\"test_cluster\":{\"connected\":true,\"mode\":\"proxy\",\"address\":\"seed:1\"," + "\"num_sockets_connected\":16,\"max_socket_connections\":18,\"initial_connect_timeout\":\"30m\"," + "\"skip_unavailable\":true}}", Strings.toString(builder)); } @@ -620,7 +620,7 @@ private Settings buildRandomSettings(String clusterAlias, List addresses private static Settings buildProxySettings(String clusterAlias, List addresses) { Settings.Builder builder = Settings.builder(); builder.put(ProxyConnectionStrategy.REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).getKey(), - Strings.collectionToCommaDelimitedString(addresses)); + addresses.get(0)); builder.put(RemoteConnectionStrategy.REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(clusterAlias).getKey(), "proxy"); return builder.build(); } diff --git a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/70_connection_mode_configuration.yml b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/70_connection_mode_configuration.yml index 5606a08cd261e..05185cb3e3328 100644 --- a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/70_connection_mode_configuration.yml +++ b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/70_connection_mode_configuration.yml @@ -14,7 +14,7 @@ transient: cluster.remote.test_remote_cluster.mode: "proxy" cluster.remote.test_remote_cluster.node_connections: "5" - cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip + cluster.remote.test_remote_cluster.proxy_address: $remote_ip - match: { status: 400 } - match: { error.root_cause.0.type: "illegal_argument_exception" } @@ -29,7 +29,7 @@ transient: cluster.remote.test_remote_cluster.mode: "proxy" cluster.remote.test_remote_cluster.seeds: $remote_ip - cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip + cluster.remote.test_remote_cluster.proxy_address: $remote_ip - match: { status: 400 } - match: { error.root_cause.0.type: "illegal_argument_exception" } @@ -64,12 +64,12 @@ flat_settings: true body: transient: - cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip + cluster.remote.test_remote_cluster.proxy_address: $remote_ip cluster.remote.test_remote_cluster.seeds: $remote_ip - match: { status: 400 } - match: { error.root_cause.0.type: "illegal_argument_exception" } - - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.proxy_addresses\" cannot be + - match: { error.root_cause.0.reason: "Setting \"cluster.remote.test_remote_cluster.proxy_address\" cannot be used with the configured \"cluster.remote.test_remote_cluster.mode\" [required=PROXY, configured=SNIFF]" } --- @@ -87,11 +87,11 @@ transient: cluster.remote.test_remote_cluster.mode: "proxy" cluster.remote.test_remote_cluster.proxy_socket_connections: "3" - cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip + cluster.remote.test_remote_cluster.proxy_address: $remote_ip - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "proxy"} - match: {transient.cluster\.remote\.test_remote_cluster\.proxy_socket_connections: "3"} - - match: {transient.cluster\.remote\.test_remote_cluster\.proxy_addresses: $remote_ip} + - match: {transient.cluster\.remote\.test_remote_cluster\.proxy_address: $remote_ip} - do: search: @@ -179,7 +179,7 @@ body: transient: cluster.remote.test_remote_cluster.mode: "proxy" - cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip + cluster.remote.test_remote_cluster.proxy_address: $remote_ip - match: { status: 400 } - match: { error.root_cause.0.type: "illegal_argument_exception" } @@ -193,10 +193,10 @@ transient: cluster.remote.test_remote_cluster.mode: "proxy" cluster.remote.test_remote_cluster.seeds: null - cluster.remote.test_remote_cluster.proxy_addresses: $remote_ip + cluster.remote.test_remote_cluster.proxy_address: $remote_ip - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "proxy"} - - match: {transient.cluster\.remote\.test_remote_cluster\.proxy_addresses: $remote_ip} + - match: {transient.cluster\.remote\.test_remote_cluster\.proxy_address: $remote_ip} - do: search: