diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index ef40f05a2f261..694c1e2bddf64 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -295,6 +295,7 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteConnectionStrategy.REMOTE_CONNECTION_MODE, SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES, SimpleConnectionStrategy.REMOTE_SOCKET_CONNECTIONS, + SimpleConnectionStrategy.INCLUDE_SERVER_NAME, SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD, SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY, SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS, diff --git a/server/src/main/java/org/elasticsearch/common/settings/Setting.java b/server/src/main/java/org/elasticsearch/common/settings/Setting.java index bbfe56ffcda97..05329882b7428 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/Setting.java +++ b/server/src/main/java/org/elasticsearch/common/settings/Setting.java @@ -1257,6 +1257,10 @@ public static Setting boolSetting(String key, Setting fallback return new Setting<>(key, fallbackSetting, b -> parseBoolean(b, key, isFiltered(properties)), properties); } + public static Setting boolSetting(String key, boolean defaultValue, Validator validator, Property... properties) { + return new Setting<>(key, Boolean.toString(defaultValue), b -> parseBoolean(b, key, isFiltered(properties)), validator, properties); + } + public static Setting boolSetting(String key, Function defaultValueFn, Property... properties) { return new Setting<>(key, defaultValueFn, b -> parseBoolean(b, key, isFiltered(properties)), properties); } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java index be1ca9a1a2c43..63a9b857f5f7d 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java @@ -111,7 +111,8 @@ public void listenForUpdates(ClusterSettings clusterSettings) { SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD, SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS, SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES, - SimpleConnectionStrategy.REMOTE_SOCKET_CONNECTIONS); + SimpleConnectionStrategy.REMOTE_SOCKET_CONNECTIONS, + SimpleConnectionStrategy.INCLUDE_SERVER_NAME); clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::validateAndUpdateRemoteCluster); } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index 3d994f35de224..addd68b1c535e 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -170,7 +170,7 @@ private static Stream getClusterAlias(Settings settings, Setting.Aff return allConcreteSettings.map(affixSetting::getNamespace); } - static InetSocketAddress parseSeedAddress(String remoteHost) { + static InetSocketAddress parseConfiguredAddress(String remoteHost) { final Tuple hostPort = parseHostPort(remoteHost); final String host = hostPort.v1(); assert hostPort.v2() != null : remoteHost; diff --git a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java index 0250ff73e4e8c..9393e4d0cafc0 100644 --- a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -40,6 +41,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -49,6 +51,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.elasticsearch.common.settings.Setting.boolSetting; import static org.elasticsearch.common.settings.Setting.intSetting; public class SimpleConnectionStrategy extends RemoteConnectionStrategy { @@ -76,6 +79,15 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy { (ns, key) -> intSetting(key, 18, 1, new StrategyValidator<>(ns, key, ConnectionStrategy.SIMPLE), Setting.Property.Dynamic, Setting.Property.NodeScope)); + /** + * Whether to include the hostname as a server_name attribute + */ + public static final Setting.AffixSetting INCLUDE_SERVER_NAME = Setting.affixKeySetting( + "cluster.remote.", + "simple.include_server_name", + (ns, key) -> boolSetting(key, false, new StrategyValidator<>(ns, key, ConnectionStrategy.SIMPLE), + Setting.Property.Dynamic, Setting.Property.NodeScope)); + static final int CHANNELS_PER_CONNECTION = 1; private static final int MAX_CONNECT_ATTEMPTS_PER_RUN = 3; @@ -84,6 +96,7 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy { private final int maxNumConnections; private final AtomicLong counter = new AtomicLong(0); private final List configuredAddresses; + private final boolean includeServerName; private final List> addresses; private final AtomicReference remoteClusterName = new AtomicReference<>(); private final ConnectionProfile profile; @@ -96,21 +109,31 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy { transportService, connectionManager, REMOTE_SOCKET_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(settings), - REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(settings)); + REMOTE_CLUSTER_ADDRESSES.getConcreteSettingForNamespace(clusterAlias).get(settings), + INCLUDE_SERVER_NAME.getConcreteSettingForNamespace(clusterAlias).get(settings)); } SimpleConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, int maxNumConnections, List configuredAddresses) { this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddresses, configuredAddresses.stream().map(address -> - (Supplier) () -> resolveAddress(address)).collect(Collectors.toList())); + (Supplier) () -> resolveAddress(address)).collect(Collectors.toList()), false); } SimpleConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, - int maxNumConnections, List configuredAddresses, List> addresses) { + int maxNumConnections, List configuredAddresses, boolean includeServerName) { + this(clusterAlias, transportService, connectionManager, maxNumConnections, configuredAddresses, + configuredAddresses.stream().map(address -> + (Supplier) () -> resolveAddress(address)).collect(Collectors.toList()), includeServerName); + } + + SimpleConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, + int maxNumConnections, List configuredAddresses, List> addresses, + boolean includeServerName) { super(clusterAlias, transportService, connectionManager); this.maxNumConnections = maxNumConnections; this.configuredAddresses = configuredAddresses; + this.includeServerName = includeServerName; assert addresses.isEmpty() == false : "Cannot use simple connection strategy with no configured addresses"; this.addresses = addresses; // TODO: Move into the ConnectionManager @@ -207,7 +230,14 @@ public void onFailure(Exception e) { for (int i = 0; i < remaining; ++i) { TransportAddress address = nextAddress(resolved); String id = clusterAlias + "#" + address; - DiscoveryNode node = new DiscoveryNode(id, address, Version.CURRENT.minimumCompatibilityVersion()); + Map attributes; + if (includeServerName) { + attributes = Collections.singletonMap("server_name", address.address().getHostString()); + } else { + attributes = Collections.emptyMap(); + } + DiscoveryNode node = new DiscoveryNode(id, address, attributes, DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT.minimumCompatibilityVersion()); connectionManager.connectToNode(node, profile, clusterNameValidator, new ActionListener<>() { @Override @@ -243,7 +273,7 @@ private TransportAddress nextAddress(List resolvedAddresses) { } private static TransportAddress resolveAddress(String address) { - return new TransportAddress(parseSeedAddress(address)); + return new TransportAddress(parseConfiguredAddress(address)); } private boolean addressesChanged(final List oldAddresses, final List newAddresses) { diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index ad1dc6696b57c..bb7d6202c59b7 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -425,11 +425,11 @@ public String toString() { private static DiscoveryNode resolveSeedNode(String clusterAlias, String address, String proxyAddress) { if (proxyAddress == null || proxyAddress.isEmpty()) { - TransportAddress transportAddress = new TransportAddress(parseSeedAddress(address)); + TransportAddress transportAddress = new TransportAddress(parseConfiguredAddress(address)); return new DiscoveryNode(clusterAlias + "#" + transportAddress.toString(), transportAddress, Version.CURRENT.minimumCompatibilityVersion()); } else { - TransportAddress transportAddress = new TransportAddress(parseSeedAddress(proxyAddress)); + TransportAddress transportAddress = new TransportAddress(parseConfiguredAddress(proxyAddress)); String hostName = address.substring(0, indexOfPortSeparator(address)); return new DiscoveryNode("", clusterAlias + "#" + address, UUIDs.randomBase64UUID(), hostName, address, transportAddress, Collections.singletonMap("server_name", hostName), DiscoveryNodeRole.BUILT_IN_ROLES, @@ -460,7 +460,7 @@ private static DiscoveryNode maybeAddProxyAddress(String proxyAddress, Discovery return node; } else { // resolve proxy address lazy here - InetSocketAddress proxyInetAddress = parseSeedAddress(proxyAddress); + InetSocketAddress proxyInetAddress = parseConfiguredAddress(proxyAddress); return new DiscoveryNode(node.getName(), node.getId(), node.getEphemeralId(), node.getHostName(), node .getHostAddress(), new TransportAddress(proxyInetAddress), node.getAttributes(), node.getRoles(), node.getVersion()); } diff --git a/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java index 4144cc856bd3a..53fe4a04d379c 100644 --- a/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.AbstractScopedSettings; @@ -35,6 +36,7 @@ import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -291,7 +293,7 @@ public void testSimpleStrategyWillResolveAddressesEachConnect() throws Exception int numOfConnections = randomIntBetween(4, 8); try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SimpleConnectionStrategy strategy = new SimpleConnectionStrategy(clusterAlias, localService, remoteConnectionManager, - numOfConnections, addresses(address), Collections.singletonList(addressSupplier))) { + numOfConnections, addresses(address), Collections.singletonList(addressSupplier), false)) { PlainActionFuture connectFuture = PlainActionFuture.newFuture(); strategy.connect(connectFuture); connectFuture.actionGet(); @@ -387,6 +389,39 @@ public void testModeSettingsCannotBeUsedWhenInDifferentMode() { } } + public void testServerNameAttributes() { + Settings bindSettings = Settings.builder().put(TransportSettings.BIND_HOST.getKey(), "localhost").build(); + try (MockTransportService transport1 = startTransport("node1", Version.CURRENT, bindSettings)) { + TransportAddress address1 = transport1.boundAddress().publishAddress(); + + try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) { + localService.start(); + localService.acceptIncomingRequests(); + + ArrayList addresses = new ArrayList<>(); + addresses.add("localhost:" + address1.getPort()); + + ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); + int numOfConnections = randomIntBetween(4, 8); + try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + SimpleConnectionStrategy strategy = new SimpleConnectionStrategy(clusterAlias, localService, remoteConnectionManager, + numOfConnections, addresses, true)) { + assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); + + PlainActionFuture connectFuture = PlainActionFuture.newFuture(); + strategy.connect(connectFuture); + connectFuture.actionGet(); + + assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); + assertTrue(strategy.assertNoRunningConnections()); + + DiscoveryNode discoveryNode = connectionManager.getAllConnectedNodes().stream().findFirst().get(); + assertEquals("localhost", discoveryNode.getAttributes().get("server_name")); + } + } + } + } + private static List addresses(final TransportAddress... addresses) { return Arrays.stream(addresses).map(TransportAddress::toString).collect(Collectors.toList()); }