From a2e2be300e3b1862872c5a5759f6f8e7357b27a2 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 5 Dec 2019 21:32:59 -0700 Subject: [PATCH 1/4] WIP --- .../transport/RemoteClusterConnection.java | 9 +- .../transport/RemoteClusterService.java | 2 - .../transport/RemoteConnectionInfo.java | 108 ++++++----- .../transport/RemoteConnectionStrategy.java | 28 ++- .../transport/SimpleConnectionStrategy.java | 84 ++++++++ .../transport/SniffConnectionStrategy.java | 81 ++++++++ .../RemoteClusterConnectionTests.java | 180 +++++++++--------- .../RemoteConnectionStrategyTests.java | 5 + .../xpack/CcrSingleNodeTestCase.java | 2 +- .../xpack/ccr/RestartIndexFollowingIT.java | 3 +- 10 files changed, 344 insertions(+), 158 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 8b89e8f8f8905..4535aa8481f09 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -34,7 +34,6 @@ import java.io.Closeable; import java.io.IOException; -import java.util.Collections; import java.util.function.Function; /** @@ -210,17 +209,13 @@ public RemoteConnectionInfo getConnectionInfo() { SniffConnectionStrategy sniffStrategy = (SniffConnectionStrategy) this.connectionStrategy; return new RemoteConnectionInfo( clusterAlias, - sniffStrategy.getSeedNodes(), - sniffStrategy.getMaxConnections(), - getNumNodesConnected(), + null, initialConnectionTimeout, skipUnavailable); } else { return new RemoteConnectionInfo( clusterAlias, - Collections.emptyList(), - 0, - getNumNodesConnected(), + null, initialConnectionTimeout, skipUnavailable); } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 2bfe3980ed8d3..f398100049a8f 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -62,8 +62,6 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl private static final Logger logger = LogManager.getLogger(RemoteClusterService.class); - private static final ActionListener noopListener = ActionListener.wrap((x) -> {}, (x) -> {}); - /** * The initial connect timeout for remote cluster connections */ diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java index 5bdc6f9874330..9ae214395d9ed 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -36,63 +37,66 @@ * {@code _remote/info} requests. */ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable { - final List seedNodes; - final int connectionsPerCluster; + + final ModeInfo modeInfo; final TimeValue initialConnectionTimeout; - final int numNodesConnected; final String clusterAlias; final boolean skipUnavailable; - RemoteConnectionInfo(String clusterAlias, List seedNodes, - int connectionsPerCluster, int numNodesConnected, - TimeValue initialConnectionTimeout, boolean skipUnavailable) { + RemoteConnectionInfo(String clusterAlias, ModeInfo modeInfo, TimeValue initialConnectionTimeout, boolean skipUnavailable) { this.clusterAlias = clusterAlias; - this.seedNodes = seedNodes; - this.connectionsPerCluster = connectionsPerCluster; - this.numNodesConnected = numNodesConnected; + this.modeInfo = modeInfo; this.initialConnectionTimeout = initialConnectionTimeout; this.skipUnavailable = skipUnavailable; } public RemoteConnectionInfo(StreamInput input) throws IOException { - seedNodes = Arrays.asList(input.readStringArray()); - connectionsPerCluster = input.readVInt(); - initialConnectionTimeout = input.readTimeValue(); - numNodesConnected = input.readVInt(); - clusterAlias = input.readString(); - skipUnavailable = input.readBoolean(); - } - - public List getSeedNodes() { - return seedNodes; - } - - public int getConnectionsPerCluster() { - return connectionsPerCluster; - } - - public TimeValue getInitialConnectionTimeout() { - return initialConnectionTimeout; + // TODO: Change to 7.6 after backport + if (input.getVersion().onOrAfter(Version.V_8_0_0)) { + RemoteConnectionStrategy.ConnectionStrategy mode = input.readEnum(RemoteConnectionStrategy.ConnectionStrategy.class); + modeInfo = mode.getReader().read(input); + initialConnectionTimeout = input.readTimeValue(); + clusterAlias = input.readString(); + skipUnavailable = input.readBoolean(); + } else { + List seedNodes = Arrays.asList(input.readStringArray()); + int connectionsPerCluster = input.readVInt(); + initialConnectionTimeout = input.readTimeValue(); + int numNodesConnected = input.readVInt(); + clusterAlias = input.readString(); + skipUnavailable = input.readBoolean(); + modeInfo = new SniffConnectionStrategy.SniffModeInfo(seedNodes, connectionsPerCluster, numNodesConnected); + } } - public int getNumNodesConnected() { - return numNodesConnected; + public boolean isConnected() { + return modeInfo.isConnected(); } public String getClusterAlias() { return clusterAlias; } - public boolean isSkipUnavailable() { - return skipUnavailable; - } - @Override public void writeTo(StreamOutput out) throws IOException { - out.writeStringArray(seedNodes.toArray(new String[0])); - out.writeVInt(connectionsPerCluster); - out.writeTimeValue(initialConnectionTimeout); - out.writeVInt(numNodesConnected); + // TODO: Change to 7.6 after backport + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeEnum(modeInfo.modeType()); + out.writeTimeValue(initialConnectionTimeout); + } else { + if (modeInfo.modeType() == RemoteConnectionStrategy.ConnectionStrategy.SNIFF) { + SniffConnectionStrategy.SniffModeInfo sniffInfo = (SniffConnectionStrategy.SniffModeInfo) this.modeInfo; + out.writeStringArray(sniffInfo.seedNodes.toArray(new String[0])); + out.writeVInt(sniffInfo.maxConnectionsPerCluster); + out.writeTimeValue(initialConnectionTimeout); + out.writeVInt(sniffInfo.numNodesConnected); + } else { + out.writeStringArray(new String[0]); + out.writeVInt(0); + out.writeTimeValue(initialConnectionTimeout); + out.writeVInt(0); + } + } out.writeString(clusterAlias); out.writeBoolean(skipUnavailable); } @@ -101,14 +105,9 @@ public void writeTo(StreamOutput out) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(clusterAlias); { - builder.startArray("seeds"); - for (String addr : seedNodes) { - builder.value(addr); - } - builder.endArray(); - builder.field("connected", numNodesConnected > 0); - builder.field("num_nodes_connected", numNodesConnected); - builder.field("max_connections_per_cluster", connectionsPerCluster); + builder.field("connected", modeInfo.isConnected()); + builder.field("mode", modeInfo.modeName()); + modeInfo.toXContent(builder, params); builder.field("initial_connect_timeout", initialConnectionTimeout); builder.field("skip_unavailable", skipUnavailable); } @@ -121,18 +120,23 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; RemoteConnectionInfo that = (RemoteConnectionInfo) o; - return connectionsPerCluster == that.connectionsPerCluster && - numNodesConnected == that.numNodesConnected && - Objects.equals(seedNodes, that.seedNodes) && + return skipUnavailable == that.skipUnavailable && + Objects.equals(modeInfo, that.modeInfo) && Objects.equals(initialConnectionTimeout, that.initialConnectionTimeout) && - Objects.equals(clusterAlias, that.clusterAlias) && - skipUnavailable == that.skipUnavailable; + Objects.equals(clusterAlias, that.clusterAlias); } @Override public int hashCode() { - return Objects.hash(seedNodes, connectionsPerCluster, initialConnectionTimeout, - numNodesConnected, clusterAlias, skipUnavailable); + return Objects.hash(modeInfo, initialConnectionTimeout, clusterAlias, skipUnavailable); } + public interface ModeInfo extends ToXContentFragment, Writeable { + + boolean isConnected(); + + String modeName(); + + RemoteConnectionStrategy.ConnectionStrategy modeType(); + } } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index d8a459a79a56e..2a316246da2e3 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -57,20 +58,39 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionListener, Closeable { enum ConnectionStrategy { - SNIFF(SniffConnectionStrategy.CHANNELS_PER_CONNECTION, SniffConnectionStrategy::enablementSettings), - SIMPLE(SimpleConnectionStrategy.CHANNELS_PER_CONNECTION, SimpleConnectionStrategy::enablementSettings); + SNIFF(SniffConnectionStrategy.CHANNELS_PER_CONNECTION, SniffConnectionStrategy::enablementSettings, + SniffConnectionStrategy.infoReader()) { + @Override + public String toString() { + return "sniff"; + } + }, + SIMPLE(SimpleConnectionStrategy.CHANNELS_PER_CONNECTION, SimpleConnectionStrategy::enablementSettings, + SimpleConnectionStrategy.infoReader()) { + @Override + public String toString() { + return "simple"; + } + }; private final int numberOfChannels; private final Supplier>> enablementSettings; + private final Writeable.Reader reader; - ConnectionStrategy(int numberOfChannels, Supplier>> enablementSettings) { + ConnectionStrategy(int numberOfChannels, Supplier>> enablementSettings, + Writeable.Reader reader) { this.numberOfChannels = numberOfChannels; this.enablementSettings = enablementSettings; + this.reader = reader; } public int getNumberOfChannels() { return numberOfChannels; } + + public Writeable.Reader getReader() { + return reader; + } } public static final Setting.AffixSetting REMOTE_CONNECTION_MODE = Setting.affixKeySetting( @@ -310,6 +330,8 @@ boolean assertNoRunningConnections() { protected abstract void connectImpl(ActionListener listener); + protected abstract RemoteConnectionInfo.ModeInfo getModeInfo(); + private List> getAndClearListeners() { final List> result; synchronized (mutex) { diff --git a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java index 890cdaf25387b..b44b45009f6ca 100644 --- a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java @@ -26,14 +26,21 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.common.xcontent.XContentBuilder; +import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -131,6 +138,10 @@ static Stream> enablementSettings() { return Stream.of(SimpleConnectionStrategy.REMOTE_CLUSTER_ADDRESSES); } + static Writeable.Reader infoReader() { + return SimpleModeInfo::new; + } + @Override protected boolean shouldOpenMoreConnections() { return connectionManager.size() < maxNumConnections; @@ -153,6 +164,11 @@ protected void connectImpl(ActionListener listener) { performSimpleConnectionProcess(listener); } + @Override + public RemoteConnectionInfo.ModeInfo getModeInfo() { + return new SimpleModeInfo(configuredAddresses, maxNumConnections, connectionManager.size()); + } + private void performSimpleConnectionProcess(ActionListener listener) { openConnections(listener, 1); } @@ -238,4 +254,72 @@ private boolean addressesChanged(final List oldAddresses, final List newSeeds = new HashSet<>(newAddresses); return oldSeeds.equals(newSeeds) == false; } + + private static class SimpleModeInfo implements RemoteConnectionInfo.ModeInfo { + + private final List addresses; + private final int maxSocketConnections; + private final int numSocketsConnected; + + private SimpleModeInfo(List addresses, int maxSocketConnections, int numSocketsConnected) { + this.addresses = addresses; + this.maxSocketConnections = maxSocketConnections; + this.numSocketsConnected = numSocketsConnected; + } + + private SimpleModeInfo(StreamInput input) throws IOException { + addresses = Arrays.asList(input.readStringArray()); + maxSocketConnections = input.readVInt(); + numSocketsConnected = input.readVInt(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startArray("addresses"); + for (String address : addresses) { + builder.value(address); + } + builder.endArray(); + builder.field("num_sockets_connected", numSocketsConnected); + builder.field("max_socket_connections", maxSocketConnections); + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeStringArray(addresses.toArray(new String[0])); + out.writeVInt(maxSocketConnections); + out.writeVInt(numSocketsConnected); + } + + @Override + public boolean isConnected() { + return false; + } + + @Override + public String modeName() { + return "simple"; + } + + @Override + public RemoteConnectionStrategy.ConnectionStrategy modeType() { + return RemoteConnectionStrategy.ConnectionStrategy.SIMPLE; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SimpleModeInfo simple = (SimpleModeInfo) o; + return maxSocketConnections == simple.maxSocketConnections && + numSocketsConnected == simple.numSocketsConnected && + Objects.equals(addresses, simple.addresses); + } + + @Override + public int hashCode() { + return Objects.hash(addresses, maxSocketConnections, numSocketsConnected); + } + } } diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index 9ec0f4afe9997..edcca75ced19a 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -36,15 +36,19 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; @@ -193,6 +197,10 @@ static Stream> enablementSettings() { return Stream.of(SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS, SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS_OLD); } + static Writeable.Reader infoReader() { + return null; + } + @Override protected boolean shouldOpenMoreConnections() { return connectionManager.size() < maxNumRemoteConnections; @@ -217,6 +225,11 @@ protected void connectImpl(ActionListener listener) { collectRemoteNodes(seedNodes.iterator(), listener); } + @Override + protected RemoteConnectionInfo.ModeInfo getModeInfo() { + return null; + } + private void collectRemoteNodes(Iterator> seedNodes, ActionListener listener) { if (Thread.currentThread().isInterrupted()) { listener.onFailure(new InterruptedException("remote connect thread got interrupted")); @@ -469,4 +482,72 @@ private boolean proxyChanged(String oldProxy, String newProxy) { return Objects.equals(oldProxy, newProxy) == false; } + + static class SniffModeInfo implements RemoteConnectionInfo.ModeInfo { + + final List seedNodes; + final int maxConnectionsPerCluster; + final int numNodesConnected; + + SniffModeInfo(List seedNodes, int maxConnectionsPerCluster, int numNodesConnected) { + this.seedNodes = seedNodes; + this.maxConnectionsPerCluster = maxConnectionsPerCluster; + this.numNodesConnected = numNodesConnected; + } + + private SniffModeInfo(StreamInput input) throws IOException { + seedNodes = Arrays.asList(input.readStringArray()); + maxConnectionsPerCluster = input.readVInt(); + numNodesConnected = input.readVInt(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startArray("seeds"); + for (String address : seedNodes) { + builder.value(address); + } + builder.endArray(); + builder.field("num_nodes_connected", numNodesConnected); + builder.field("max_connections_per_cluster", maxConnectionsPerCluster); + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeStringArray(seedNodes.toArray(new String[0])); + out.writeVInt(maxConnectionsPerCluster); + out.writeVInt(numNodesConnected); + } + + @Override + public boolean isConnected() { + return numNodesConnected > 0; + } + + @Override + public String modeName() { + return "sniff"; + } + + @Override + public RemoteConnectionStrategy.ConnectionStrategy modeType() { + return RemoteConnectionStrategy.ConnectionStrategy.SNIFF; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SniffModeInfo sniff = (SniffModeInfo) o; + return maxConnectionsPerCluster == sniff.maxConnectionsPerCluster && + numNodesConnected == sniff.numNodesConnected && + Objects.equals(seedNodes, sniff.seedNodes); + } + + @Override + public int hashCode() { + return Objects.hash(seedNodes, maxConnectionsPerCluster, numNodesConnected); + } + } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index d74a8daa98d61..78b70860cdfda 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -44,9 +44,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.mocksocket.MockServerSocket; @@ -309,71 +306,72 @@ public void run() { } } - public void testGetConnectionInfo() throws Exception { - List knownNodes = new CopyOnWriteArrayList<>(); - try (MockTransportService transport1 = startTransport("seed_node", knownNodes, Version.CURRENT); - MockTransportService transport2 = startTransport("seed_node_1", knownNodes, Version.CURRENT); - MockTransportService transport3 = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { - DiscoveryNode node1 = transport1.getLocalDiscoNode(); - DiscoveryNode node2 = transport3.getLocalDiscoNode(); - DiscoveryNode node3 = transport2.getLocalDiscoNode(); - knownNodes.add(transport1.getLocalDiscoNode()); - knownNodes.add(transport3.getLocalDiscoNode()); - knownNodes.add(transport2.getLocalDiscoNode()); - Collections.shuffle(knownNodes, random()); - List seedNodes = addresses(node3, node1, node2); - Collections.shuffle(seedNodes, random()); - - try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { - service.start(); - service.acceptIncomingRequests(); - int maxNumConnections = randomIntBetween(1, 5); - String clusterAlias = "test-cluster"; - Settings settings = Settings.builder().put(buildSniffSettings(clusterAlias, seedNodes)) - .put(SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER.getKey(), maxNumConnections).build(); - try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service)) { - // test no nodes connected - RemoteConnectionInfo remoteConnectionInfo = assertSerialization(connection.getConnectionInfo()); - assertNotNull(remoteConnectionInfo); - assertEquals(0, remoteConnectionInfo.numNodesConnected); - assertEquals(3, remoteConnectionInfo.seedNodes.size()); - assertEquals(maxNumConnections, remoteConnectionInfo.connectionsPerCluster); - assertEquals(clusterAlias, remoteConnectionInfo.clusterAlias); - } - } - } - } - - public void testRemoteConnectionInfo() throws IOException { - RemoteConnectionInfo stats = - new RemoteConnectionInfo("test_cluster", Arrays.asList("seed:1"), 4, 3, TimeValue.timeValueMinutes(30), false); - assertSerialization(stats); - - RemoteConnectionInfo stats1 = - new RemoteConnectionInfo("test_cluster", Arrays.asList("seed:1"), 4, 4, TimeValue.timeValueMinutes(30), true); - assertSerialization(stats1); - assertNotEquals(stats, stats1); - - stats1 = new RemoteConnectionInfo("test_cluster_1", Arrays.asList("seed:1"), 4, 3, TimeValue.timeValueMinutes(30), false); - assertSerialization(stats1); - assertNotEquals(stats, stats1); - - stats1 = new RemoteConnectionInfo("test_cluster", Arrays.asList("seed:15"), 4, 3, TimeValue.timeValueMinutes(30), false); - assertSerialization(stats1); - assertNotEquals(stats, stats1); - - stats1 = new RemoteConnectionInfo("test_cluster", Arrays.asList("seed:1"), 4, 3, TimeValue.timeValueMinutes(30), true); - assertSerialization(stats1); - assertNotEquals(stats, stats1); - - stats1 = new RemoteConnectionInfo("test_cluster", Arrays.asList("seed:1"), 4, 3, TimeValue.timeValueMinutes(325), true); - assertSerialization(stats1); - assertNotEquals(stats, stats1); - - stats1 = new RemoteConnectionInfo("test_cluster", Arrays.asList("seed:1"), 5, 3, TimeValue.timeValueMinutes(30), false); - assertSerialization(stats1); - assertNotEquals(stats, stats1); - } +// public void testGetConnectionInfo() throws Exception { +// List knownNodes = new CopyOnWriteArrayList<>(); +// try (MockTransportService transport1 = startTransport("seed_node", knownNodes, Version.CURRENT); +// MockTransportService transport2 = startTransport("seed_node_1", knownNodes, Version.CURRENT); +// MockTransportService transport3 = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { +// DiscoveryNode node1 = transport1.getLocalDiscoNode(); +// DiscoveryNode node2 = transport3.getLocalDiscoNode(); +// DiscoveryNode node3 = transport2.getLocalDiscoNode(); +// knownNodes.add(transport1.getLocalDiscoNode()); +// knownNodes.add(transport3.getLocalDiscoNode()); +// knownNodes.add(transport2.getLocalDiscoNode()); +// Collections.shuffle(knownNodes, random()); +// List seedNodes = addresses(node3, node1, node2); +// Collections.shuffle(seedNodes, random()); +// +// try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, +// threadPool, null)) { +// service.start(); +// service.acceptIncomingRequests(); +// int maxNumConnections = randomIntBetween(1, 5); +// String clusterAlias = "test-cluster"; +// Settings settings = Settings.builder().put(buildSniffSettings(clusterAlias, seedNodes)) +// .put(SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER.getKey(), maxNumConnections).build(); +// try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service)) { +// // test no nodes connected +// RemoteConnectionInfo remoteConnectionInfo = assertSerialization(connection.getConnectionInfo()); +// assertNotNull(remoteConnectionInfo); +// assertEquals(0, remoteConnectionInfo.numNodesConnected); +// assertEquals(3, remoteConnectionInfo.seedNodes.size()); +// assertEquals(maxNumConnections, remoteConnectionInfo.connectionsPerCluster); +// assertEquals(clusterAlias, remoteConnectionInfo.clusterAlias); +// } +// } +// } +// } +// +// public void testRemoteConnectionInfo() throws IOException { +// RemoteConnectionInfo stats = +// new RemoteConnectionInfo("test_cluster", null, Arrays.asList("seed:1"), 4, 3, TimeValue.timeValueMinutes(30), false); +// assertSerialization(stats); +// +// RemoteConnectionInfo stats1 = +// new RemoteConnectionInfo("test_cluster", null, Arrays.asList("seed:1"), 4, 4, TimeValue.timeValueMinutes(30), true); +// assertSerialization(stats1); +// assertNotEquals(stats, stats1); +// +// stats1 = new RemoteConnectionInfo("test_cluster_1", null, Arrays.asList("seed:1"), 4, 3, TimeValue.timeValueMinutes(30), false); +// assertSerialization(stats1); +// assertNotEquals(stats, stats1); +// +// stats1 = new RemoteConnectionInfo("test_cluster", null, Arrays.asList("seed:15"), 4, 3, TimeValue.timeValueMinutes(30), false); +// assertSerialization(stats1); +// assertNotEquals(stats, stats1); +// +// stats1 = new RemoteConnectionInfo("test_cluster", null, Arrays.asList("seed:1"), 4, 3, TimeValue.timeValueMinutes(30), true); +// assertSerialization(stats1); +// assertNotEquals(stats, stats1); +// +// stats1 = new RemoteConnectionInfo("test_cluster", null, Arrays.asList("seed:1"), 4, 3, TimeValue.timeValueMinutes(325), true); +// assertSerialization(stats1); +// assertNotEquals(stats, stats1); +// +// stats1 = new RemoteConnectionInfo("test_cluster", null, Arrays.asList("seed:1"), 5, 3, TimeValue.timeValueMinutes(30), false); +// assertSerialization(stats1); +// assertNotEquals(stats, stats1); +// } private static RemoteConnectionInfo assertSerialization(RemoteConnectionInfo info) throws IOException { try (BytesStreamOutput out = new BytesStreamOutput()) { @@ -388,29 +386,29 @@ private static RemoteConnectionInfo assertSerialization(RemoteConnectionInfo inf } } - public void testRenderConnectionInfoXContent() throws IOException { - RemoteConnectionInfo stats = - new RemoteConnectionInfo("test_cluster", Arrays.asList("seed:1"), 4, 3, TimeValue.timeValueMinutes(30), true); - stats = assertSerialization(stats); - XContentBuilder builder = XContentFactory.jsonBuilder(); - builder.startObject(); - stats.toXContent(builder, null); - builder.endObject(); - assertEquals("{\"test_cluster\":{\"seeds\":[\"seed:1\"],\"connected\":true," + - "\"num_nodes_connected\":3,\"max_connections_per_cluster\":4,\"initial_connect_timeout\":\"30m\"," + - "\"skip_unavailable\":true}}", Strings.toString(builder)); - - stats = new RemoteConnectionInfo( - "some_other_cluster", Arrays.asList("seed:1", "seed:2"), 2, 0, TimeValue.timeValueSeconds(30), false); - stats = assertSerialization(stats); - builder = XContentFactory.jsonBuilder(); - builder.startObject(); - stats.toXContent(builder, null); - builder.endObject(); - assertEquals("{\"some_other_cluster\":{\"seeds\":[\"seed:1\",\"seed:2\"]," - + "\"connected\":false,\"num_nodes_connected\":0,\"max_connections_per_cluster\":2,\"initial_connect_timeout\":\"30s\"," + - "\"skip_unavailable\":false}}", Strings.toString(builder)); - } +// public void testRenderConnectionInfoXContent() throws IOException { +// RemoteConnectionInfo stats = +// new RemoteConnectionInfo("test_cluster", null, Arrays.asList("seed:1"), 4, 3, TimeValue.timeValueMinutes(30), true); +// stats = assertSerialization(stats); +// XContentBuilder builder = XContentFactory.jsonBuilder(); +// builder.startObject(); +// stats.toXContent(builder, null); +// builder.endObject(); +// assertEquals("{\"test_cluster\":{\"seeds\":[\"seed:1\"],\"connected\":true," + +// "\"num_nodes_connected\":3,\"max_connections_per_cluster\":4,\"initial_connect_timeout\":\"30m\"," + +// "\"skip_unavailable\":true}}", Strings.toString(builder)); +// +// stats = new RemoteConnectionInfo( +// "some_other_cluster", null, Arrays.asList("seed:1", "seed:2"), 2, 0, TimeValue.timeValueSeconds(30), false); +// stats = assertSerialization(stats); +// builder = XContentFactory.jsonBuilder(); +// builder.startObject(); +// stats.toXContent(builder, null); +// builder.endObject(); +// assertEquals("{\"some_other_cluster\":{\"seeds\":[\"seed:1\",\"seed:2\"]," +// + "\"connected\":false,\"num_nodes_connected\":0,\"max_connections_per_cluster\":2,\"initial_connect_timeout\":\"30s\"," + +// "\"skip_unavailable\":false}}", Strings.toString(builder)); +// } public void testCollectNodes() throws Exception { List knownNodes = new CopyOnWriteArrayList<>(); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java index 5ea54c7356b94..2c6fc691ec5bd 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java @@ -111,5 +111,10 @@ protected boolean shouldOpenMoreConnections() { protected void connectImpl(ActionListener listener) { } + + @Override + protected RemoteConnectionInfo.ModeInfo getModeInfo() { + return null; + } } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java index 3a07ef9aa8852..cbfd078452368 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java @@ -69,7 +69,7 @@ public void setupLocalRemote() throws Exception { List infos = client().execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest()).get().getInfos(); assertThat(infos.size(), equalTo(1)); - assertThat(infos.get(0).getNumNodesConnected(), equalTo(1)); + assertTrue(infos.get(0).isConnected()); } @Before diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java index ed3d1abb35770..88370aed0db71 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java @@ -23,7 +23,6 @@ import static java.util.Collections.singletonMap; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; public class RestartIndexFollowingIT extends CcrIntegTestCase { @@ -96,7 +95,7 @@ private void setupRemoteCluster() throws Exception { List infos = followerClient().execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest()).get().getInfos(); assertThat(infos.size(), equalTo(1)); - assertThat(infos.get(0).getNumNodesConnected(), greaterThanOrEqualTo(1)); + assertTrue(infos.get(0).isConnected()); } private void cleanRemoteCluster() throws Exception { From e4fc39ff08ebda4dba855760ec58dba2bff83003 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 6 Dec 2019 17:43:05 -0700 Subject: [PATCH 2/4] WIP --- .../transport/RemoteClusterConnection.java | 15 +- .../transport/RemoteConnectionInfo.java | 1 + .../transport/RemoteConnectionStrategy.java | 10 +- .../transport/SimpleConnectionStrategy.java | 6 +- .../transport/SniffConnectionStrategy.java | 4 +- .../RemoteClusterConnectionTests.java | 193 ++++++++++-------- 6 files changed, 116 insertions(+), 113 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 4535aa8481f09..ae697bba95b49 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -205,20 +205,7 @@ boolean isNodeConnected(final DiscoveryNode node) { * Get the information about remote nodes to be rendered on {@code _remote/info} requests. */ public RemoteConnectionInfo getConnectionInfo() { - if (connectionStrategy instanceof SniffConnectionStrategy) { - SniffConnectionStrategy sniffStrategy = (SniffConnectionStrategy) this.connectionStrategy; - return new RemoteConnectionInfo( - clusterAlias, - null, - initialConnectionTimeout, - skipUnavailable); - } else { - return new RemoteConnectionInfo( - clusterAlias, - null, - initialConnectionTimeout, - skipUnavailable); - } + return new RemoteConnectionInfo(clusterAlias, connectionStrategy.getModeInfo(), initialConnectionTimeout, skipUnavailable); } int getNumNodesConnected() { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java index 9ae214395d9ed..e721a0b617fd1 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java @@ -82,6 +82,7 @@ public void writeTo(StreamOutput out) throws IOException { // TODO: Change to 7.6 after backport if (out.getVersion().onOrAfter(Version.V_8_0_0)) { out.writeEnum(modeInfo.modeType()); + modeInfo.writeTo(out); out.writeTimeValue(initialConnectionTimeout); } else { if (modeInfo.modeType() == RemoteConnectionStrategy.ConnectionStrategy.SNIFF) { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index 2a316246da2e3..3d994f35de224 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -59,14 +59,14 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis enum ConnectionStrategy { SNIFF(SniffConnectionStrategy.CHANNELS_PER_CONNECTION, SniffConnectionStrategy::enablementSettings, - SniffConnectionStrategy.infoReader()) { + SniffConnectionStrategy::infoReader) { @Override public String toString() { return "sniff"; } }, SIMPLE(SimpleConnectionStrategy.CHANNELS_PER_CONNECTION, SimpleConnectionStrategy::enablementSettings, - SimpleConnectionStrategy.infoReader()) { + SimpleConnectionStrategy::infoReader) { @Override public String toString() { return "simple"; @@ -75,10 +75,10 @@ public String toString() { private final int numberOfChannels; private final Supplier>> enablementSettings; - private final Writeable.Reader reader; + private final Supplier> reader; ConnectionStrategy(int numberOfChannels, Supplier>> enablementSettings, - Writeable.Reader reader) { + Supplier> reader) { this.numberOfChannels = numberOfChannels; this.enablementSettings = enablementSettings; this.reader = reader; @@ -89,7 +89,7 @@ public int getNumberOfChannels() { } public Writeable.Reader getReader() { - return reader; + return reader.get(); } } diff --git a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java index b44b45009f6ca..0250ff73e4e8c 100644 --- a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java @@ -255,13 +255,13 @@ private boolean addressesChanged(final List oldAddresses, final List addresses; private final int maxSocketConnections; private final int numSocketsConnected; - private SimpleModeInfo(List addresses, int maxSocketConnections, int numSocketsConnected) { + SimpleModeInfo(List addresses, int maxSocketConnections, int numSocketsConnected) { this.addresses = addresses; this.maxSocketConnections = maxSocketConnections; this.numSocketsConnected = numSocketsConnected; @@ -294,7 +294,7 @@ public void writeTo(StreamOutput out) throws IOException { @Override public boolean isConnected() { - return false; + return numSocketsConnected > 0; } @Override diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index edcca75ced19a..0028c813e7f49 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -198,7 +198,7 @@ static Stream> enablementSettings() { } static Writeable.Reader infoReader() { - return null; + return SniffModeInfo::new; } @Override @@ -227,7 +227,7 @@ protected void connectImpl(ActionListener listener) { @Override protected RemoteConnectionInfo.ModeInfo getModeInfo() { - return null; + return new SniffModeInfo(configuredSeedNodes, maxNumRemoteConnections, connectionManager.size()); } private void collectRemoteNodes(Iterator> seedNodes, ActionListener listener) { diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 78b70860cdfda..934cf81a503ea 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -44,6 +44,9 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.mocksocket.MockServerSocket; @@ -306,72 +309,78 @@ public void run() { } } -// public void testGetConnectionInfo() throws Exception { -// List knownNodes = new CopyOnWriteArrayList<>(); -// try (MockTransportService transport1 = startTransport("seed_node", knownNodes, Version.CURRENT); -// MockTransportService transport2 = startTransport("seed_node_1", knownNodes, Version.CURRENT); -// MockTransportService transport3 = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { -// DiscoveryNode node1 = transport1.getLocalDiscoNode(); -// DiscoveryNode node2 = transport3.getLocalDiscoNode(); -// DiscoveryNode node3 = transport2.getLocalDiscoNode(); -// knownNodes.add(transport1.getLocalDiscoNode()); -// knownNodes.add(transport3.getLocalDiscoNode()); -// knownNodes.add(transport2.getLocalDiscoNode()); -// Collections.shuffle(knownNodes, random()); -// List seedNodes = addresses(node3, node1, node2); -// Collections.shuffle(seedNodes, random()); -// -// try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, -// threadPool, null)) { -// service.start(); -// service.acceptIncomingRequests(); -// int maxNumConnections = randomIntBetween(1, 5); -// String clusterAlias = "test-cluster"; -// Settings settings = Settings.builder().put(buildSniffSettings(clusterAlias, seedNodes)) -// .put(SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER.getKey(), maxNumConnections).build(); -// try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service)) { -// // test no nodes connected -// RemoteConnectionInfo remoteConnectionInfo = assertSerialization(connection.getConnectionInfo()); -// assertNotNull(remoteConnectionInfo); -// assertEquals(0, remoteConnectionInfo.numNodesConnected); -// assertEquals(3, remoteConnectionInfo.seedNodes.size()); -// assertEquals(maxNumConnections, remoteConnectionInfo.connectionsPerCluster); -// assertEquals(clusterAlias, remoteConnectionInfo.clusterAlias); -// } -// } -// } -// } -// -// public void testRemoteConnectionInfo() throws IOException { -// RemoteConnectionInfo stats = -// new RemoteConnectionInfo("test_cluster", null, Arrays.asList("seed:1"), 4, 3, TimeValue.timeValueMinutes(30), false); -// assertSerialization(stats); -// -// RemoteConnectionInfo stats1 = -// new RemoteConnectionInfo("test_cluster", null, Arrays.asList("seed:1"), 4, 4, TimeValue.timeValueMinutes(30), true); -// assertSerialization(stats1); -// assertNotEquals(stats, stats1); -// -// stats1 = new RemoteConnectionInfo("test_cluster_1", null, Arrays.asList("seed:1"), 4, 3, TimeValue.timeValueMinutes(30), false); -// assertSerialization(stats1); -// assertNotEquals(stats, stats1); -// -// stats1 = new RemoteConnectionInfo("test_cluster", null, Arrays.asList("seed:15"), 4, 3, TimeValue.timeValueMinutes(30), false); -// assertSerialization(stats1); -// assertNotEquals(stats, stats1); -// -// stats1 = new RemoteConnectionInfo("test_cluster", null, Arrays.asList("seed:1"), 4, 3, TimeValue.timeValueMinutes(30), true); -// assertSerialization(stats1); -// assertNotEquals(stats, stats1); -// -// stats1 = new RemoteConnectionInfo("test_cluster", null, Arrays.asList("seed:1"), 4, 3, TimeValue.timeValueMinutes(325), true); -// assertSerialization(stats1); -// assertNotEquals(stats, stats1); -// -// stats1 = new RemoteConnectionInfo("test_cluster", null, Arrays.asList("seed:1"), 5, 3, TimeValue.timeValueMinutes(30), false); -// assertSerialization(stats1); -// assertNotEquals(stats, stats1); -// } + public void testGetConnectionInfo() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService transport1 = startTransport("seed_node", knownNodes, Version.CURRENT); + MockTransportService transport2 = startTransport("seed_node_1", knownNodes, Version.CURRENT); + MockTransportService transport3 = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { + DiscoveryNode node1 = transport1.getLocalDiscoNode(); + DiscoveryNode node2 = transport3.getLocalDiscoNode(); + DiscoveryNode node3 = transport2.getLocalDiscoNode(); + knownNodes.add(transport1.getLocalDiscoNode()); + knownNodes.add(transport3.getLocalDiscoNode()); + knownNodes.add(transport2.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + List seedNodes = addresses(node3, node1, node2); + Collections.shuffle(seedNodes, random()); + + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, + threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + int maxNumConnections = randomIntBetween(1, 5); + String clusterAlias = "test-cluster"; + Settings settings = Settings.builder().put(buildSniffSettings(clusterAlias, seedNodes)) + .put(SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER.getKey(), maxNumConnections).build(); + try (RemoteClusterConnection connection = new RemoteClusterConnection(settings, clusterAlias, service)) { + // test no nodes connected + RemoteConnectionInfo remoteConnectionInfo = assertSerialization(connection.getConnectionInfo()); + assertNotNull(remoteConnectionInfo); + SniffConnectionStrategy.SniffModeInfo sniffInfo = (SniffConnectionStrategy.SniffModeInfo) remoteConnectionInfo.modeInfo; + assertEquals(0, sniffInfo.numNodesConnected); + assertEquals(3, sniffInfo.seedNodes.size()); + assertEquals(maxNumConnections, sniffInfo.maxConnectionsPerCluster); + assertEquals(clusterAlias, remoteConnectionInfo.clusterAlias); + } + } + } + } + + public void testRemoteConnectionInfo() throws IOException { + List remoteAddresses = Collections.singletonList("seed:1"); + + RemoteConnectionInfo.ModeInfo modeInfo1; + RemoteConnectionInfo.ModeInfo modeInfo2; + + if (randomBoolean()) { + modeInfo1 = new SniffConnectionStrategy.SniffModeInfo(remoteAddresses, 4, 4); + modeInfo2 = new SniffConnectionStrategy.SniffModeInfo(remoteAddresses, 4, 3); + } else { + modeInfo1 = new SimpleConnectionStrategy.SimpleModeInfo(remoteAddresses, 18, 18); + modeInfo2 = new SniffConnectionStrategy.SniffModeInfo(remoteAddresses, 18, 17); + } + + RemoteConnectionInfo stats = + new RemoteConnectionInfo("test_cluster", modeInfo1, TimeValue.timeValueMinutes(30), false); + assertSerialization(stats); + + RemoteConnectionInfo stats1 = + new RemoteConnectionInfo("test_cluster", modeInfo1, TimeValue.timeValueMinutes(30), true); + assertSerialization(stats1); + assertNotEquals(stats, stats1); + + stats1 = new RemoteConnectionInfo("test_cluster_1", modeInfo1, TimeValue.timeValueMinutes(30), false); + assertSerialization(stats1); + assertNotEquals(stats, stats1); + + stats1 = new RemoteConnectionInfo("test_cluster", modeInfo1, TimeValue.timeValueMinutes(325), false); + assertSerialization(stats1); + assertNotEquals(stats, stats1); + + stats1 = new RemoteConnectionInfo("test_cluster", modeInfo2, TimeValue.timeValueMinutes(30), false); + assertSerialization(stats1); + assertNotEquals(stats, stats1); + } private static RemoteConnectionInfo assertSerialization(RemoteConnectionInfo info) throws IOException { try (BytesStreamOutput out = new BytesStreamOutput()) { @@ -386,29 +395,35 @@ private static RemoteConnectionInfo assertSerialization(RemoteConnectionInfo inf } } -// public void testRenderConnectionInfoXContent() throws IOException { -// RemoteConnectionInfo stats = -// new RemoteConnectionInfo("test_cluster", null, Arrays.asList("seed:1"), 4, 3, TimeValue.timeValueMinutes(30), true); -// stats = assertSerialization(stats); -// XContentBuilder builder = XContentFactory.jsonBuilder(); -// builder.startObject(); -// stats.toXContent(builder, null); -// builder.endObject(); -// assertEquals("{\"test_cluster\":{\"seeds\":[\"seed:1\"],\"connected\":true," + -// "\"num_nodes_connected\":3,\"max_connections_per_cluster\":4,\"initial_connect_timeout\":\"30m\"," + -// "\"skip_unavailable\":true}}", Strings.toString(builder)); -// -// stats = new RemoteConnectionInfo( -// "some_other_cluster", null, Arrays.asList("seed:1", "seed:2"), 2, 0, TimeValue.timeValueSeconds(30), false); -// stats = assertSerialization(stats); -// builder = XContentFactory.jsonBuilder(); -// builder.startObject(); -// stats.toXContent(builder, null); -// builder.endObject(); -// assertEquals("{\"some_other_cluster\":{\"seeds\":[\"seed:1\",\"seed:2\"]," -// + "\"connected\":false,\"num_nodes_connected\":0,\"max_connections_per_cluster\":2,\"initial_connect_timeout\":\"30s\"," + -// "\"skip_unavailable\":false}}", Strings.toString(builder)); -// } + public void testRenderConnectionInfoXContent() throws IOException { + List remoteAddresses = Arrays.asList("seed:1", "seed:2"); + + RemoteConnectionInfo.ModeInfo modeInfo; + + boolean sniff = randomBoolean(); + if (sniff) { + modeInfo = new SniffConnectionStrategy.SniffModeInfo(remoteAddresses, 3, 2); + } else { + modeInfo = new SimpleConnectionStrategy.SimpleModeInfo(remoteAddresses, 18, 16); + } + + RemoteConnectionInfo stats = new RemoteConnectionInfo("test_cluster", modeInfo, TimeValue.timeValueMinutes(30), true); + stats = assertSerialization(stats); + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.startObject(); + stats.toXContent(builder, null); + builder.endObject(); + + if (sniff) { + assertEquals("{\"test_cluster\":{\"connected\":true,\"mode\":\"sniff\",\"seeds\":[\"seed:1\",\"seed:2\"]," + + "\"num_nodes_connected\":2,\"max_connections_per_cluster\":3,\"initial_connect_timeout\":\"30m\"," + + "\"skip_unavailable\":true}}", Strings.toString(builder)); + } else { + assertEquals("{\"test_cluster\":{\"connected\":true,\"mode\":\"simple\",\"addresses\":[\"seed:1\",\"seed:2\"]," + + "\"num_sockets_connected\":16,\"max_socket_connections\":18,\"initial_connect_timeout\":\"30m\"," + + "\"skip_unavailable\":true}}", Strings.toString(builder)); + } + } public void testCollectNodes() throws Exception { List knownNodes = new CopyOnWriteArrayList<>(); From 99502a46239e9e5e6167d2f10fc1a26d339187f6 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 10 Dec 2019 18:06:36 -0700 Subject: [PATCH 3/4] Fix test --- docs/reference/ccr/getting-started.asciidoc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/reference/ccr/getting-started.asciidoc b/docs/reference/ccr/getting-started.asciidoc index 41d013f62f3a7..0793051646cfe 100644 --- a/docs/reference/ccr/getting-started.asciidoc +++ b/docs/reference/ccr/getting-started.asciidoc @@ -135,7 +135,8 @@ remote cluster. "num_nodes_connected" : 1, <2> "max_connections_per_cluster" : 3, "initial_connect_timeout" : "30s", - "skip_unavailable" : false + "skip_unavailable" : false, + "mode" : "sniff" } } -------------------------------------------------- @@ -146,7 +147,7 @@ remote cluster. alias `leader` <2> This shows the number of nodes in the remote cluster the local cluster is connected to. - + Alternatively, you can manage remote clusters on the *Management / Elasticsearch / Remote Clusters* page in {kib}: From 45612eb96e9972ea97339270701f9530fee32776 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 12 Dec 2019 18:06:50 -0700 Subject: [PATCH 4/4] Implement tests --- ...l => 15_connection_mode_configuration.yml} | 0 .../test/multi_cluster/20_info.yml | 47 ++++++++++++++++--- 2 files changed, 41 insertions(+), 6 deletions(-) rename qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/{100_connection_mode_configuration.yml => 15_connection_mode_configuration.yml} (100%) diff --git a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/100_connection_mode_configuration.yml b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/15_connection_mode_configuration.yml similarity index 100% rename from qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/100_connection_mode_configuration.yml rename to qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/15_connection_mode_configuration.yml diff --git a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/20_info.yml b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/20_info.yml index 59657e2012c8a..761526a7bea60 100644 --- a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/20_info.yml +++ b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/20_info.yml @@ -1,5 +1,5 @@ --- -"Fetch remote cluster info for existing cluster": +"Fetch remote cluster sniff info for existing cluster": - do: cluster.remote_info: {} @@ -7,6 +7,7 @@ - match: { my_remote_cluster.num_nodes_connected: 1} - match: { my_remote_cluster.max_connections_per_cluster: 1} - match: { my_remote_cluster.initial_connect_timeout: "30s" } + - match: { my_remote_cluster.mode: "sniff" } --- "Add transient remote cluster based on the preset cluster and check remote info": @@ -21,9 +22,13 @@ flat_settings: true body: transient: - cluster.remote.test_remote_cluster.seeds: $remote_ip + cluster.remote.test_remote_cluster.mode: "sniff" + cluster.remote.test_remote_cluster.sniff.node_connections: "2" + cluster.remote.test_remote_cluster.sniff.seeds: $remote_ip - - match: {transient: {cluster.remote.test_remote_cluster.seeds: $remote_ip}} + - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "sniff"} + - match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.node_connections: "2"} + - match: {transient.cluster\.remote\.test_remote_cluster\.sniff\.seeds: $remote_ip} # we do another search here since this will enforce the connection to be established # otherwise the cluster might not have been connected yet. @@ -45,19 +50,49 @@ - match: { my_remote_cluster.seeds.0: $remote_ip } - match: { my_remote_cluster.num_nodes_connected: 1} - - match: { test_remote_cluster.num_nodes_connected: 1} + - gt: { test_remote_cluster.num_nodes_connected: 0} - match: { my_remote_cluster.max_connections_per_cluster: 1} - - match: { test_remote_cluster.max_connections_per_cluster: 1} + - match: { test_remote_cluster.max_connections_per_cluster: 2} - match: { my_remote_cluster.initial_connect_timeout: "30s" } - match: { test_remote_cluster.initial_connect_timeout: "30s" } + - match: { my_remote_cluster.mode: "sniff" } + - match: { test_remote_cluster.mode: "sniff" } + + - do: + cluster.put_settings: + flat_settings: true + body: + transient: + cluster.remote.test_remote_cluster.mode: "simple" + cluster.remote.test_remote_cluster.sniff.seeds: null + cluster.remote.test_remote_cluster.sniff.node_connections: null + cluster.remote.test_remote_cluster.simple.socket_connections: "10" + cluster.remote.test_remote_cluster.simple.addresses: $remote_ip + + - match: {transient.cluster\.remote\.test_remote_cluster\.mode: "simple"} + - match: {transient.cluster\.remote\.test_remote_cluster\.simple\.socket_connections: "10"} + - match: {transient.cluster\.remote\.test_remote_cluster\.simple\.addresses: $remote_ip} + + - do: + cluster.remote_info: {} + + - match: { test_remote_cluster.connected: true } + - match: { test_remote_cluster.addresses.0: $remote_ip } + - gt: { test_remote_cluster.num_sockets_connected: 0} + - match: { test_remote_cluster.max_socket_connections: 10} + - match: { test_remote_cluster.initial_connect_timeout: "30s" } + - match: { test_remote_cluster.mode: "simple" } + - do: cluster.put_settings: body: transient: - cluster.remote.test_remote_cluster.seeds: null + cluster.remote.test_remote_cluster.mode: null + cluster.remote.test_remote_cluster.simple.socket_connections: null + cluster.remote.test_remote_cluster.simple.addresses: null --- "skip_unavailable is returned as part of _remote/info response":