diff --git a/docs/reference/ccr/getting-started.asciidoc b/docs/reference/ccr/getting-started.asciidoc index 41d013f62f3a7..0793051646cfe 100644 --- a/docs/reference/ccr/getting-started.asciidoc +++ b/docs/reference/ccr/getting-started.asciidoc @@ -135,7 +135,8 @@ remote cluster. "num_nodes_connected" : 1, <2> "max_connections_per_cluster" : 3, "initial_connect_timeout" : "30s", - "skip_unavailable" : false + "skip_unavailable" : false, + "mode" : "sniff" } } -------------------------------------------------- @@ -146,7 +147,7 @@ remote cluster. alias `leader` <2> This shows the number of nodes in the remote cluster the local cluster is connected to. - + Alternatively, you can manage remote clusters on the *Management / Elasticsearch / Remote Clusters* page in {kib}: diff --git a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/100_connection_mode_configuration.yml b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/15_connection_mode_configuration.yml similarity index 100% rename from qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/100_connection_mode_configuration.yml rename to qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/15_connection_mode_configuration.yml diff --git a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/20_info.yml b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/20_info.yml index 59657e2012c8a..761526a7bea60 100644 --- a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/20_info.yml +++ b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/20_info.yml @@ -1,5 +1,5 @@ --- -"Fetch remote cluster info for existing cluster": +"Fetch remote cluster sniff info for existing cluster": - do: cluster.remote_info: {} @@ -7,6 +7,7 @@ - match: { my_remote_cluster.num_nodes_connected: 1} - match: { my_remote_cluster.max_connections_per_cluster: 1} - match: { my_remote_cluster.initial_connect_timeout: "30s" } + - match: { my_remote_cluster.mode: "sniff" } --- "Add transient remote cluster based on the preset cluster and check remote info": @@ -21,9 +22,13 @@ flat_settings: true body: transient: - cluster.remote.test_remote_cluster.seeds: $remote_ip + cluster.remote.test_remote_cluster.mode: "sniff" + cluster.remote.test_remote_cluster.sniff.node_connections: "2" + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip - - match: {transient: {cluster.remote.test_remote_cluster.seeds: $remote_ip}} + - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "sniff"} + - match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.node_connections: "2"} + - match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.seeds: $remote_ip} # we do another search here since this will enforce the connection to be established # otherwise the cluster might not have been connected yet. @@ -45,19 +50,49 @@ - match: { my_remote_cluster.seeds.0: $remote_ip } - match: { my_remote_cluster.num_nodes_connected: 1} - - match: { test_remote_cluster.num_nodes_connected: 1} + - gt: { test_remote_cluster.num_nodes_connected: 0} - match: { my_remote_cluster.max_connections_per_cluster: 1} - - match: { test_remote_cluster.max_connections_per_cluster: 1} + - match: { test_remote_cluster.max_connections_per_cluster: 2} - match: { my_remote_cluster.initial_connect_timeout: "30s" } - match: { test_remote_cluster.initial_connect_timeout: "30s" } + - match: { my_remote_cluster.mode: "sniff" } + - match: { test_remote_cluster.mode: "sniff" } + + - do: + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.sniff.seeds: null + cluster.remote.test_remote_cluster.sniff.node_connections: null + cluster.remote.test_remote_cluster.simple.socket_connections: "10" + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "simple"} + - match: {transient.cluster\.remote\.test_remote_cluster\.simple\.socket_connections: "10"} + - match: {transient.cluster\.remote\.test_remote_cluster\.simple\.addresses: $remote_ip} + + - do: + cluster.remote_info: {} + + - match: { test_remote_cluster.connected: true } + - match: { test_remote_cluster.addresses.0: $remote_ip } + - gt: { test_remote_cluster.num_sockets_connected: 0} + - match: { test_remote_cluster.max_socket_connections: 10} + - match: { test_remote_cluster.initial_connect_timeout: "30s" } + - match: { test_remote_cluster.mode: "simple" } + - do: cluster.put_settings: body: transient: - cluster.remote.test_remote_cluster.seeds: null + cluster.remote.test_remote_cluster.mode: null + cluster.remote.test_remote_cluster.simple.socket_connections: null + cluster.remote.test_remote_cluster.simple.addresses: null --- "skip_unavailable is returned as part of _remote/info response": diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 8b89e8f8f8905..ae697bba95b49 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -34,7 +34,6 @@ import java.io.Closeable; import java.io.IOException; -import java.util.Collections; import java.util.function.Function; /** @@ -206,24 +205,7 @@ boolean isNodeConnected(final DiscoveryNode node) { * Get the information about remote nodes to be rendered on {@code _remote/info} requests. */ public RemoteConnectionInfo getConnectionInfo() { - if (connectionStrategy instanceof SniffConnectionStrategy) { - SniffConnectionStrategy sniffStrategy = (SniffConnectionStrategy) this.connectionStrategy; - return new RemoteConnectionInfo( - clusterAlias, - sniffStrategy.getSeedNodes(), - sniffStrategy.getMaxConnections(), - getNumNodesConnected(), - initialConnectionTimeout, - skipUnavailable); - } else { - return new RemoteConnectionInfo( - clusterAlias, - Collections.emptyList(), - 0, - getNumNodesConnected(), - initialConnectionTimeout, - skipUnavailable); - } + return new RemoteConnectionInfo(clusterAlias, connectionStrategy.getModeInfo(), initialConnectionTimeout, skipUnavailable); } int getNumNodesConnected() { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java index 5bdc6f9874330..e721a0b617fd1 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -36,63 +37,67 @@ * {@code _remote/info} requests. */ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable { - final List seedNodes; - final int connectionsPerCluster; + + final ModeInfo modeInfo; final TimeValue initialConnectionTimeout; - final int numNodesConnected; final String clusterAlias; final boolean skipUnavailable; - RemoteConnectionInfo(String clusterAlias, List seedNodes, - int connectionsPerCluster, int numNodesConnected, - TimeValue initialConnectionTimeout, boolean skipUnavailable) { + RemoteConnectionInfo(String clusterAlias, ModeInfo modeInfo, TimeValue initialConnectionTimeout, boolean skipUnavailable) { this.clusterAlias = clusterAlias; - this.seedNodes = seedNodes; - this.connectionsPerCluster = connectionsPerCluster; - this.numNodesConnected = numNodesConnected; + this.modeInfo = modeInfo; this.initialConnectionTimeout = initialConnectionTimeout; this.skipUnavailable = skipUnavailable; } public RemoteConnectionInfo(StreamInput input) throws IOException { - seedNodes = Arrays.asList(input.readStringArray()); - connectionsPerCluster = input.readVInt(); - initialConnectionTimeout = input.readTimeValue(); - numNodesConnected = input.readVInt(); - clusterAlias = input.readString(); - skipUnavailable = input.readBoolean(); - } - - public List getSeedNodes() { - return seedNodes; - } - - public int getConnectionsPerCluster() { - return connectionsPerCluster; - } - - public TimeValue getInitialConnectionTimeout() { - return initialConnectionTimeout; + // TODO: Change to 7.6 after backport + if (input.getVersion().onOrAfter(Version.V_8_0_0)) { + RemoteConnectionStrategy.ConnectionStrategy mode = input.readEnum(RemoteConnectionStrategy.ConnectionStrategy.class); + modeInfo = mode.getReader().read(input); + initialConnectionTimeout = input.readTimeValue(); + clusterAlias = input.readString(); + skipUnavailable = input.readBoolean(); + } else { + List seedNodes = Arrays.asList(input.readStringArray()); + int connectionsPerCluster = input.readVInt(); + initialConnectionTimeout = input.readTimeValue(); + int numNodesConnected = input.readVInt(); + clusterAlias = input.readString(); + skipUnavailable = input.readBoolean(); + modeInfo = new SniffConnectionStrategy.SniffModeInfo(seedNodes, connectionsPerCluster, numNodesConnected); + } } - public int getNumNodesConnected() { - return numNodesConnected; + public boolean isConnected() { + return modeInfo.isConnected(); } public String getClusterAlias() { return clusterAlias; } - public boolean isSkipUnavailable() { - return skipUnavailable; - } - @Override public void writeTo(StreamOutput out) throws IOException { - out.writeStringArray(seedNodes.toArray(new String[0])); - out.writeVInt(connectionsPerCluster); - out.writeTimeValue(initialConnectionTimeout); - out.writeVInt(numNodesConnected); + // TODO: Change to 7.6 after backport + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeEnum(modeInfo.modeType()); + modeInfo.writeTo(out); + out.writeTimeValue(initialConnectionTimeout); + } else { + if (modeInfo.modeType() == RemoteConnectionStrategy.ConnectionStrategy.SNIFF) { + SniffConnectionStrategy.SniffModeInfo sniffInfo = (SniffConnectionStrategy.SniffModeInfo) this.modeInfo; + out.writeStringArray(sniffInfo.seedNodes.toArray(new String[0])); + out.writeVInt(sniffInfo.maxConnectionsPerCluster); + out.writeTimeValue(initialConnectionTimeout); + out.writeVInt(sniffInfo.numNodesConnected); + } else { + out.writeStringArray(new String[0]); + out.writeVInt(0); + out.writeTimeValue(initialConnectionTimeout); + out.writeVInt(0); + } + } out.writeString(clusterAlias); out.writeBoolean(skipUnavailable); } @@ -101,14 +106,9 @@ public void writeTo(StreamOutput out) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(clusterAlias); { - builder.startArray("seeds"); - for (String addr : seedNodes) { - builder.value(addr); - } - builder.endArray(); - builder.field("connected", numNodesConnected > 0); - builder.field("num_nodes_connected", numNodesConnected); - builder.field("max_connections_per_cluster", connectionsPerCluster); + builder.field("connected", modeInfo.isConnected()); + builder.field("mode", modeInfo.modeName()); + modeInfo.toXContent(builder, params); builder.field("initial_connect_timeout", initialConnectionTimeout); builder.field("skip_unavailable", skipUnavailable); } @@ -121,18 +121,23 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; RemoteConnectionInfo that = (RemoteConnectionInfo) o; - return connectionsPerCluster == that.connectionsPerCluster && - numNodesConnected == that.numNodesConnected && - Objects.equals(seedNodes, that.seedNodes) && + return skipUnavailable == that.skipUnavailable && + Objects.equals(modeInfo, that.modeInfo) && Objects.equals(initialConnectionTimeout, that.initialConnectionTimeout) && - Objects.equals(clusterAlias, that.clusterAlias) && - skipUnavailable == that.skipUnavailable; + Objects.equals(clusterAlias, that.clusterAlias); } @Override public int hashCode() { - return Objects.hash(seedNodes, connectionsPerCluster, initialConnectionTimeout, - numNodesConnected, clusterAlias, skipUnavailable); + return Objects.hash(modeInfo, initialConnectionTimeout, clusterAlias, skipUnavailable); } + public interface ModeInfo extends ToXContentFragment, Writeable { + + boolean isConnected(); + + String modeName(); + + RemoteConnectionStrategy.ConnectionStrategy modeType(); + } } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index d8a459a79a56e..3d994f35de224 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -57,20 +58,39 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionListener, Closeable { enum ConnectionStrategy { - SNIFF(SniffConnectionStrategy.CHANNELS_PER_CONNECTION, SniffConnectionStrategy::enablementSettings), - SIMPLE(SimpleConnectionStrategy.CHANNELS_PER_CONNECTION, SimpleConnectionStrategy::enablementSettings); + SNIFF(SniffConnectionStrategy.CHANNELS_PER_CONNECTION, SniffConnectionStrategy::enablementSettings, + SniffConnectionStrategy::infoReader) { + @Override + public String toString() { + return "sniff"; + } + }, + SIMPLE(SimpleConnectionStrategy.CHANNELS_PER_CONNECTION, SimpleConnectionStrategy::enablementSettings, + SimpleConnectionStrategy::infoReader) { + @Override + public String toString() { + return "simple"; + } + }; private final int numberOfChannels; private final Supplier>> enablementSettings; + private final Supplier> reader; - ConnectionStrategy(int numberOfChannels, Supplier>> enablementSettings) { + ConnectionStrategy(int numberOfChannels, Supplier>> enablementSettings, + Supplier> reader) { this.numberOfChannels = numberOfChannels; this.enablementSettings = enablementSettings; + this.reader = reader; } public int getNumberOfChannels() { return numberOfChannels; } + + public Writeable.Reader getReader() { + return reader.get(); + } } public static final Setting.AffixSetting REMOTE_CONNECTION_MODE = Setting.affixKeySetting( @@ -310,6 +330,8 @@ boolean assertNoRunningConnections() { protected abstract void connectImpl(ActionListener listener); + protected abstract RemoteConnectionInfo.ModeInfo getModeInfo(); + private List> getAndClearListeners() { final List> result; synchronized (mutex) { diff --git a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java index 890cdaf25387b..0250ff73e4e8c 100644 --- a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java @@ -26,14 +26,21 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.common.xcontent.XContentBuilder; +import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -131,6 +138,10 @@ static Stream> enablementSettings() { return Stream.of(SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES); } + static Writeable.Reader infoReader() { + return SimpleModeInfo::new; + } + @Override protected boolean shouldOpenMoreConnections() { return connectionManager.size() < maxNumConnections; @@ -153,6 +164,11 @@ protected void connectImpl(ActionListener listener) { performSimpleConnectionProcess(listener); } + @Override + public RemoteConnectionInfo.ModeInfo getModeInfo() { + return new SimpleModeInfo(configuredAddresses, maxNumConnections, connectionManager.size()); + } + private void performSimpleConnectionProcess(ActionListener listener) { openConnections(listener, 1); } @@ -238,4 +254,72 @@ private boolean addressesChanged(final List oldAddresses, final List newSeeds = new HashSet<>(newAddresses); return oldSeeds.equals(newSeeds) == false; } + + static class SimpleModeInfo implements RemoteConnectionInfo.ModeInfo { + + private final List addresses; + private final int maxSocketConnections; + private final int numSocketsConnected; + + SimpleModeInfo(List addresses, int maxSocketConnections, int numSocketsConnected) { + this.addresses = addresses; + this.maxSocketConnections = maxSocketConnections; + this.numSocketsConnected = numSocketsConnected; + } + + private SimpleModeInfo(StreamInput input) throws IOException { + addresses = Arrays.asList(input.readStringArray()); + maxSocketConnections = input.readVInt(); + numSocketsConnected = input.readVInt(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startArray("addresses"); + for (String address : addresses) { + builder.value(address); + } + builder.endArray(); + builder.field("num_sockets_connected", numSocketsConnected); + builder.field("max_socket_connections", maxSocketConnections); + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeStringArray(addresses.toArray(new String[0])); + out.writeVInt(maxSocketConnections); + out.writeVInt(numSocketsConnected); + } + + @Override + public boolean isConnected() { + return numSocketsConnected > 0; + } + + @Override + public String modeName() { + return "simple"; + } + + @Override + public RemoteConnectionStrategy.ConnectionStrategy modeType() { + return RemoteConnectionStrategy.ConnectionStrategy.SIMPLE; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SimpleModeInfo simple = (SimpleModeInfo) o; + return maxSocketConnections == simple.maxSocketConnections && + numSocketsConnected == simple.numSocketsConnected && + Objects.equals(addresses, simple.addresses); + } + + @Override + public int hashCode() { + return Objects.hash(addresses, maxSocketConnections, numSocketsConnected); + } + } } diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index cfed1d01c47e3..ad1dc6696b57c 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -36,15 +36,19 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; @@ -193,6 +197,10 @@ static Stream> enablementSettings() { return Stream.of(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS, SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD); } + static Writeable.Reader infoReader() { + return SniffModeInfo::new; + } + @Override protected boolean shouldOpenMoreConnections() { return connectionManager.size() < maxNumRemoteConnections; @@ -217,6 +225,11 @@ protected void connectImpl(ActionListener listener) { collectRemoteNodes(seedNodes.iterator(), listener); } + @Override + protected RemoteConnectionInfo.ModeInfo getModeInfo() { + return new SniffModeInfo(configuredSeedNodes, maxNumRemoteConnections, connectionManager.size()); + } + private void collectRemoteNodes(Iterator> seedNodes, ActionListener listener) { if (Thread.currentThread().isInterrupted()) { listener.onFailure(new InterruptedException("remote connect thread got interrupted")); @@ -469,4 +482,72 @@ private boolean proxyChanged(String oldProxy, String newProxy) { return Objects.equals(oldProxy, newProxy) == false; } + + static class SniffModeInfo implements RemoteConnectionInfo.ModeInfo { + + final List seedNodes; + final int maxConnectionsPerCluster; + final int numNodesConnected; + + SniffModeInfo(List seedNodes, int maxConnectionsPerCluster, int numNodesConnected) { + this.seedNodes = seedNodes; + this.maxConnectionsPerCluster = maxConnectionsPerCluster; + this.numNodesConnected = numNodesConnected; + } + + private SniffModeInfo(StreamInput input) throws IOException { + seedNodes = Arrays.asList(input.readStringArray()); + maxConnectionsPerCluster = input.readVInt(); + numNodesConnected = input.readVInt(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startArray("seeds"); + for (String address : seedNodes) { + builder.value(address); + } + builder.endArray(); + builder.field("num_nodes_connected", numNodesConnected); + builder.field("max_connections_per_cluster", maxConnectionsPerCluster); + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeStringArray(seedNodes.toArray(new String[0])); + out.writeVInt(maxConnectionsPerCluster); + out.writeVInt(numNodesConnected); + } + + @Override + public boolean isConnected() { + return numNodesConnected > 0; + } + + @Override + public String modeName() { + return "sniff"; + } + + @Override + public RemoteConnectionStrategy.ConnectionStrategy modeType() { + return RemoteConnectionStrategy.ConnectionStrategy.SNIFF; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SniffModeInfo sniff = (SniffModeInfo) o; + return maxConnectionsPerCluster == sniff.maxConnectionsPerCluster && + numNodesConnected == sniff.numNodesConnected && + Objects.equals(seedNodes, sniff.seedNodes); + } + + @Override + public int hashCode() { + return Objects.hash(seedNodes, maxConnectionsPerCluster, numNodesConnected); + } + } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index d74a8daa98d61..934cf81a503ea 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -324,7 +324,8 @@ public void testGetConnectionInfo() throws Exception { List seedNodes = addresses(node3, node1, node2); Collections.shuffle(seedNodes, random()); - try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, + threadPool, null)) { service.start(); service.acceptIncomingRequests(); int maxNumConnections = randomIntBetween(1, 5); @@ -335,9 +336,10 @@ public void testGetConnectionInfo() throws Exception { // test no nodes connected RemoteConnectionInfo remoteConnectionInfo = assertSerialization(connection.getConnectionInfo()); assertNotNull(remoteConnectionInfo); - assertEquals(0, remoteConnectionInfo.numNodesConnected); - assertEquals(3, remoteConnectionInfo.seedNodes.size()); - assertEquals(maxNumConnections, remoteConnectionInfo.connectionsPerCluster); + SniffConnectionStrategy.SniffModeInfo sniffInfo = (SniffConnectionStrategy.SniffModeInfo) remoteConnectionInfo.modeInfo; + assertEquals(0, sniffInfo.numNodesConnected); + assertEquals(3, sniffInfo.seedNodes.size()); + assertEquals(maxNumConnections, sniffInfo.maxConnectionsPerCluster); assertEquals(clusterAlias, remoteConnectionInfo.clusterAlias); } } @@ -345,32 +347,37 @@ public void testGetConnectionInfo() throws Exception { } public void testRemoteConnectionInfo() throws IOException { + List remoteAddresses = Collections.singletonList("seed:1"); + + RemoteConnectionInfo.ModeInfo modeInfo1; + RemoteConnectionInfo.ModeInfo modeInfo2; + + if (randomBoolean()) { + modeInfo1 = new SniffConnectionStrategy.SniffModeInfo(remoteAddresses, 4, 4); + modeInfo2 = new SniffConnectionStrategy.SniffModeInfo(remoteAddresses, 4, 3); + } else { + modeInfo1 = new SimpleConnectionStrategy.SimpleModeInfo(remoteAddresses, 18, 18); + modeInfo2 = new SniffConnectionStrategy.SniffModeInfo(remoteAddresses, 18, 17); + } + RemoteConnectionInfo stats = - new RemoteConnectionInfo("test_cluster", Arrays.asList("seed:1"), 4, 3, TimeValue.timeValueMinutes(30), false); + new RemoteConnectionInfo("test_cluster", modeInfo1, TimeValue.timeValueMinutes(30), false); assertSerialization(stats); RemoteConnectionInfo stats1 = - new RemoteConnectionInfo("test_cluster", Arrays.asList("seed:1"), 4, 4, TimeValue.timeValueMinutes(30), true); - assertSerialization(stats1); - assertNotEquals(stats, stats1); - - stats1 = new RemoteConnectionInfo("test_cluster_1", Arrays.asList("seed:1"), 4, 3, TimeValue.timeValueMinutes(30), false); - assertSerialization(stats1); - assertNotEquals(stats, stats1); - - stats1 = new RemoteConnectionInfo("test_cluster", Arrays.asList("seed:15"), 4, 3, TimeValue.timeValueMinutes(30), false); + new RemoteConnectionInfo("test_cluster", modeInfo1, TimeValue.timeValueMinutes(30), true); assertSerialization(stats1); assertNotEquals(stats, stats1); - stats1 = new RemoteConnectionInfo("test_cluster", Arrays.asList("seed:1"), 4, 3, TimeValue.timeValueMinutes(30), true); + stats1 = new RemoteConnectionInfo("test_cluster_1", modeInfo1, TimeValue.timeValueMinutes(30), false); assertSerialization(stats1); assertNotEquals(stats, stats1); - stats1 = new RemoteConnectionInfo("test_cluster", Arrays.asList("seed:1"), 4, 3, TimeValue.timeValueMinutes(325), true); + stats1 = new RemoteConnectionInfo("test_cluster", modeInfo1, TimeValue.timeValueMinutes(325), false); assertSerialization(stats1); assertNotEquals(stats, stats1); - stats1 = new RemoteConnectionInfo("test_cluster", Arrays.asList("seed:1"), 5, 3, TimeValue.timeValueMinutes(30), false); + stats1 = new RemoteConnectionInfo("test_cluster", modeInfo2, TimeValue.timeValueMinutes(30), false); assertSerialization(stats1); assertNotEquals(stats, stats1); } @@ -389,27 +396,33 @@ private static RemoteConnectionInfo assertSerialization(RemoteConnectionInfo inf } public void testRenderConnectionInfoXContent() throws IOException { - RemoteConnectionInfo stats = - new RemoteConnectionInfo("test_cluster", Arrays.asList("seed:1"), 4, 3, TimeValue.timeValueMinutes(30), true); + List remoteAddresses = Arrays.asList("seed:1", "seed:2"); + + RemoteConnectionInfo.ModeInfo modeInfo; + + boolean sniff = randomBoolean(); + if (sniff) { + modeInfo = new SniffConnectionStrategy.SniffModeInfo(remoteAddresses, 3, 2); + } else { + modeInfo = new SimpleConnectionStrategy.SimpleModeInfo(remoteAddresses, 18, 16); + } + + RemoteConnectionInfo stats = new RemoteConnectionInfo("test_cluster", modeInfo, TimeValue.timeValueMinutes(30), true); stats = assertSerialization(stats); XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); stats.toXContent(builder, null); builder.endObject(); - assertEquals("{\"test_cluster\":{\"seeds\":[\"seed:1\"],\"connected\":true," + - "\"num_nodes_connected\":3,\"max_connections_per_cluster\":4,\"initial_connect_timeout\":\"30m\"," + - "\"skip_unavailable\":true}}", Strings.toString(builder)); - stats = new RemoteConnectionInfo( - "some_other_cluster", Arrays.asList("seed:1", "seed:2"), 2, 0, TimeValue.timeValueSeconds(30), false); - stats = assertSerialization(stats); - builder = XContentFactory.jsonBuilder(); - builder.startObject(); - stats.toXContent(builder, null); - builder.endObject(); - assertEquals("{\"some_other_cluster\":{\"seeds\":[\"seed:1\",\"seed:2\"]," - + "\"connected\":false,\"num_nodes_connected\":0,\"max_connections_per_cluster\":2,\"initial_connect_timeout\":\"30s\"," + - "\"skip_unavailable\":false}}", Strings.toString(builder)); + if (sniff) { + assertEquals("{\"test_cluster\":{\"connected\":true,\"mode\":\"sniff\",\"seeds\":[\"seed:1\",\"seed:2\"]," + + "\"num_nodes_connected\":2,\"max_connections_per_cluster\":3,\"initial_connect_timeout\":\"30m\"," + + "\"skip_unavailable\":true}}", Strings.toString(builder)); + } else { + assertEquals("{\"test_cluster\":{\"connected\":true,\"mode\":\"simple\",\"addresses\":[\"seed:1\",\"seed:2\"]," + + "\"num_sockets_connected\":16,\"max_socket_connections\":18,\"initial_connect_timeout\":\"30m\"," + + "\"skip_unavailable\":true}}", Strings.toString(builder)); + } } public void testCollectNodes() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java index 5ea54c7356b94..2c6fc691ec5bd 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java @@ -111,5 +111,10 @@ protected boolean shouldOpenMoreConnections() { protected void connectImpl(ActionListener listener) { } + + @Override + protected RemoteConnectionInfo.ModeInfo getModeInfo() { + return null; + } } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java index 3a07ef9aa8852..cbfd078452368 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java @@ -69,7 +69,7 @@ public void setupLocalRemote() throws Exception { List infos = client().execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest()).get().getInfos(); assertThat(infos.size(), equalTo(1)); - assertThat(infos.get(0).getNumNodesConnected(), equalTo(1)); + assertTrue(infos.get(0).isConnected()); } @Before diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java index ed3d1abb35770..88370aed0db71 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java @@ -23,7 +23,6 @@ import static java.util.Collections.singletonMap; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; public class RestartIndexFollowingIT extends CcrIntegTestCase { @@ -96,7 +95,7 @@ private void setupRemoteCluster() throws Exception { List infos = followerClient().execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest()).get().getInfos(); assertThat(infos.size(), equalTo(1)); - assertThat(infos.get(0).getNumNodesConnected(), greaterThanOrEqualTo(1)); + assertTrue(infos.get(0).isConnected()); } private void cleanRemoteCluster() throws Exception {