From 9efa296a9f015dfb5a83a33470d320b830b84c10 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 2 Oct 2019 14:23:05 -0600 Subject: [PATCH 1/5] WIP --- .../transport/RemoteConnectionStrategy.java | 35 +++- .../transport/SimpleConnectionStrategy.java | 183 ++++++++++++++++++ .../transport/SniffConnectionStrategy.java | 33 +--- 3 files changed, 220 insertions(+), 31 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index 191484deff2f5..42062284ed084 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -23,8 +23,10 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.threadpool.ThreadPool; @@ -36,20 +38,24 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Predicate; public abstract class RemoteConnectionStrategy implements TransportConnectionListener, Closeable { - protected static final Logger logger = LogManager.getLogger(RemoteConnectionStrategy.class); + private static final Logger logger = LogManager.getLogger(RemoteConnectionStrategy.class); private static final int MAX_LISTENERS = 100; private final AtomicBoolean closed = new AtomicBoolean(false); private final Object mutex = new Object(); - private final ThreadPool threadPool; - protected final RemoteConnectionManager connectionManager; private List> listeners = new ArrayList<>(); - RemoteConnectionStrategy(ThreadPool threadPool, RemoteConnectionManager connectionManager) { - this.threadPool = threadPool; + protected final TransportService transportService; + protected final RemoteConnectionManager connectionManager; + protected final String clusterAlias; + + RemoteConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager) { + this.clusterAlias = clusterAlias; + this.transportService = transportService; this.connectionManager = connectionManager; connectionManager.getConnectionManager().addListener(this); } @@ -61,7 +67,7 @@ public abstract class RemoteConnectionStrategy implements TransportConnectionLis void connect(ActionListener connectListener) { boolean runConnect = false; final ActionListener listener = - ContextPreservingActionListener.wrapPreservingContext(connectListener, threadPool.getThreadContext()); + ContextPreservingActionListener.wrapPreservingContext(connectListener, transportService.getThreadPool().getThreadContext()); boolean closed; synchronized (mutex) { closed = this.closed.get(); @@ -83,7 +89,7 @@ void connect(ActionListener connectListener) { return; } if (runConnect) { - ExecutorService executor = threadPool.executor(ThreadPool.Names.MANAGEMENT); + ExecutorService executor = transportService.getThreadPool().executor(ThreadPool.Names.MANAGEMENT); executor.submit(new AbstractRunnable() { @Override public void onFailure(Exception e) { @@ -161,4 +167,19 @@ private List> getAndClearListeners() { } return result; } + + static Predicate getRemoteClusterNamePredicate(SetOnce remoteClusterName) { + return new Predicate<>() { + @Override + public boolean test(ClusterName c) { + return remoteClusterName.get() == null || c.equals(remoteClusterName.get()); + } + + @Override + public String toString() { + return remoteClusterName.get() == null ? "any cluster name" + : "expected remote cluster name [" + remoteClusterName.get().value() + "]"; + } + }; + } } diff --git a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java new file mode 100644 index 0000000000000..564a4611822dc --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java @@ -0,0 +1,183 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.StepListener; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.core.internal.io.IOUtils; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class SimpleConnectionStrategy extends RemoteConnectionStrategy { + + private static final Logger logger = LogManager.getLogger(SimpleConnectionStrategy.class); + + private final int maxNumRemoteConnections; + private final AtomicLong counter = new AtomicLong(); + private final List> addresses; + private final SetOnce remoteClusterName = new SetOnce<>(); + + SimpleConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, + int maxNumRemoteConnections, List> addresses) { + super(clusterAlias, transportService, connectionManager); + this.maxNumRemoteConnections = maxNumRemoteConnections; + this.addresses = addresses; + } + + @Override + protected boolean shouldOpenMoreConnections() { + return connectionManager.size() < maxNumRemoteConnections; + } + + @Override + protected void connectImpl(ActionListener listener) { + performSimpleConnectionProcess(addresses.iterator(), listener); + } + + private void performSimpleConnectionProcess(Iterator> addressIter, ActionListener listener) { + final Consumer onFailure = e -> { + if (e instanceof ConnectTransportException || e instanceof IOException || e instanceof IllegalStateException) { + // ISE if we fail the handshake with an version incompatible node + if (addressIter.hasNext()) { + logger.debug(() -> new ParameterizedMessage( + "handshaking with external cluster [{}] failed moving to next address", clusterAlias), e); + performSimpleConnectionProcess(addressIter, listener); + return; + } + } + logger.warn(() -> new ParameterizedMessage("handshaking with external cluster [{}] failed", clusterAlias), e); + listener.onFailure(e); + }; + + + final StepListener handshakeStep = new StepListener<>(); + + if (remoteClusterName.get() == null) { + final StepListener openConnectionStep = new StepListener<>(); + final ConnectionProfile profile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG); + TransportAddress address = addressIter.next().get(); + DiscoveryNode handshakeNode = new DiscoveryNode(clusterAlias + "#" + address, address, + Version.CURRENT.minimumCompatibilityVersion()); + connectionManager.openConnection(handshakeNode, profile, openConnectionStep); + + openConnectionStep.whenComplete(connection -> { + ConnectionProfile connectionProfile = connectionManager.getConnectionManager().getConnectionProfile(); + transportService.handshake(connection, connectionProfile.getHandshakeTimeout().millis(), + getRemoteClusterNamePredicate(remoteClusterName), new ActionListener<>() { + @Override + public void onResponse(TransportService.HandshakeResponse handshakeResponse) { + IOUtils.closeWhileHandlingException(connection); + handshakeStep.onResponse(null); + } + + @Override + public void onFailure(Exception e) { + IOUtils.closeWhileHandlingException(connection); + handshakeStep.onFailure(e); + } + }); + }, onFailure); + } else { + handshakeStep.onResponse(null); + } + + handshakeStep.whenComplete(v -> openConnections(listener, 1), onFailure); + + } + + private void openConnections(ActionListener finished, int attemptNumber) { + if (attemptNumber <= 3) { + List resolved = addresses.stream().map(Supplier::get).collect(Collectors.toList()); + + int remaining = maxNumRemoteConnections - connectionManager.size(); + ActionListener compositeListener = new ActionListener<>() { + + private final AtomicInteger successfulConnections = new AtomicInteger(0); + private final CountDown countDown = new CountDown(remaining); + + @Override + public void onResponse(Void v) { + successfulConnections.incrementAndGet(); + if (countDown.countDown()) { + if (shouldOpenMoreConnections()) { + openConnections(finished, attemptNumber + 1); + } else { + finished.onResponse(v); + } + } + } + + @Override + public void onFailure(Exception e) { + if (countDown.countDown()) { + openConnections(finished, attemptNumber + 1); + } + } + }; + + + for (int i = 0; i < remaining; ++i) { + TransportAddress address = nextAddress(resolved); + DiscoveryNode node = new DiscoveryNode(clusterAlias + "#" + address, address, Version.CURRENT.minimumCompatibilityVersion()); + connectionManager.connectToNode(node, null, (connection, profile, listener1) -> listener1.onResponse(null), + new ActionListener<>() { + @Override + public void onResponse(Void v) { + compositeListener.onResponse(v); + } + + @Override + public void onFailure(Exception e) { + logger.debug(() -> new ParameterizedMessage("failed to open remote connection to address {}", address), e); + compositeListener.onFailure(e); + } + }); + } + } else { + if (connectionManager.size() == 0) { + finished.onFailure(new IllegalStateException("Unable to open any simple connections to remote cluster")); + } else { + finished.onResponse(null); + } + } + } + + private TransportAddress nextAddress(List resolvedAddresses) { + long curr; + while ((curr = counter.incrementAndGet()) == Long.MIN_VALUE) ; + return resolvedAddresses.get(Math.floorMod(curr, resolvedAddresses.size())); + } +} diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index ce820b744bda8..5a62da7edfcd6 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -19,6 +19,8 @@ package org.elasticsearch.transport; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; @@ -45,9 +47,9 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { - private final String clusterAlias; + private static final Logger logger = LogManager.getLogger(SniffConnectionStrategy.class); + private final List>> seedNodes; - private final TransportService transportService; private final int maxNumRemoteConnections; private final Predicate nodePredicate; private final SetOnce remoteClusterName = new SetOnce<>(); @@ -56,9 +58,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy { SniffConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, String proxyAddress, int maxNumRemoteConnections, Predicate nodePredicate, List>> seedNodes) { - super(transportService.getThreadPool(), connectionManager); - this.clusterAlias = clusterAlias; - this.transportService = transportService; + super(clusterAlias, transportService, connectionManager); this.proxyAddress = proxyAddress; this.maxNumRemoteConnections = maxNumRemoteConnections; this.nodePredicate = nodePredicate; @@ -109,15 +109,15 @@ private void collectRemoteNodes(Iterator> seedNodes, Act onFailure.accept(e); } - final StepListener handShakeStep = new StepListener<>(); + final StepListener handshakeStep = new StepListener<>(); openConnectionStep.whenComplete(connection -> { ConnectionProfile connectionProfile = connectionManager.getConnectionManager().getConnectionProfile(); transportService.handshake(connection, connectionProfile.getHandshakeTimeout().millis(), - getRemoteClusterNamePredicate(), handShakeStep); + getRemoteClusterNamePredicate(remoteClusterName), handshakeStep); }, onFailure); final StepListener fullConnectionStep = new StepListener<>(); - handShakeStep.whenComplete(handshakeResponse -> { + handshakeStep.whenComplete(handshakeResponse -> { final DiscoveryNode handshakeNode = maybeAddProxyAddress(proxyAddress, handshakeResponse.getDiscoveryNode()); if (nodePredicate.test(handshakeNode) && shouldOpenMoreConnections()) { @@ -135,7 +135,7 @@ private void collectRemoteNodes(Iterator> seedNodes, Act fullConnectionStep.whenComplete(aVoid -> { if (remoteClusterName.get() == null) { - TransportService.HandshakeResponse handshakeResponse = handShakeStep.result(); + TransportService.HandshakeResponse handshakeResponse = handshakeStep.result(); assert handshakeResponse.getClusterName().value() != null; remoteClusterName.set(handshakeResponse.getClusterName()); } @@ -246,21 +246,6 @@ public String executor() { } } - private Predicate getRemoteClusterNamePredicate() { - return new Predicate<>() { - @Override - public boolean test(ClusterName c) { - return remoteClusterName.get() == null || c.equals(remoteClusterName.get()); - } - - @Override - public String toString() { - return remoteClusterName.get() == null ? "any cluster name" - : "expected remote cluster name [" + remoteClusterName.get().value() + "]"; - } - }; - } - private static DiscoveryNode maybeAddProxyAddress(String proxyAddress, DiscoveryNode node) { if (proxyAddress == null || proxyAddress.isEmpty()) { return node; From 84d3f810016aa3ae5ed501bc4379ade7a1676f01 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 2 Oct 2019 17:44:21 -0600 Subject: [PATCH 2/5] Tests --- .../transport/ConnectionManager.java | 6 + .../transport/RemoteConnectionStrategy.java | 9 +- .../transport/SimpleConnectionStrategy.java | 20 +- .../transport/SniffConnectionStrategy.java | 2 +- .../transport/TransportService.java | 12 + .../SimpleConnectionStrategyTests.java | 253 ++++++++++++++++++ 6 files changed, 292 insertions(+), 10 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java index c11afa088aa53..110053bcee77b 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java @@ -31,8 +31,10 @@ import org.elasticsearch.core.internal.io.IOUtils; import java.io.Closeable; +import java.util.Collections; import java.util.Iterator; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -216,6 +218,10 @@ public int size() { return connectedNodes.size(); } + public Set getAllConnectedNodes() { + return Collections.unmodifiableSet(connectedNodes.keySet()); + } + @Override public void close() { internalClose(true); diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index 42062284ed084..53dfb1b1c3176 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -23,7 +23,6 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.cluster.ClusterName; @@ -168,17 +167,17 @@ private List> getAndClearListeners() { return result; } - static Predicate getRemoteClusterNamePredicate(SetOnce remoteClusterName) { + static Predicate getRemoteClusterNamePredicate(ClusterName remoteClusterName) { return new Predicate<>() { @Override public boolean test(ClusterName c) { - return remoteClusterName.get() == null || c.equals(remoteClusterName.get()); + return remoteClusterName == null || c.equals(remoteClusterName); } @Override public String toString() { - return remoteClusterName.get() == null ? "any cluster name" - : "expected remote cluster name [" + remoteClusterName.get().value() + "]"; + return remoteClusterName == null ? "any cluster name" + : "expected remote cluster name [" + remoteClusterName.value() + "]"; } }; } diff --git a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java index 564a4611822dc..eac3b71dfd383 100644 --- a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java @@ -49,12 +49,18 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy { private final AtomicLong counter = new AtomicLong(); private final List> addresses; private final SetOnce remoteClusterName = new SetOnce<>(); + private final ConnectionProfile profile; SimpleConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, int maxNumRemoteConnections, List> addresses) { super(clusterAlias, transportService, connectionManager); this.maxNumRemoteConnections = maxNumRemoteConnections; this.addresses = addresses; + // TODO: Move into the ConnectionManager + this.profile = new ConnectionProfile.Builder() + .addConnections(1, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING) + .addConnections(0, TransportRequestOptions.Type.BULK, TransportRequestOptions.Type.STATE, TransportRequestOptions.Type.RECOVERY) + .build(); } @Override @@ -96,9 +102,13 @@ private void performSimpleConnectionProcess(Iterator> openConnectionStep.whenComplete(connection -> { ConnectionProfile connectionProfile = connectionManager.getConnectionManager().getConnectionProfile(); transportService.handshake(connection, connectionProfile.getHandshakeTimeout().millis(), - getRemoteClusterNamePredicate(remoteClusterName), new ActionListener<>() { + getRemoteClusterNamePredicate(remoteClusterName.get()), new ActionListener<>() { @Override public void onResponse(TransportService.HandshakeResponse handshakeResponse) { + if (remoteClusterName.get() == null) { + assert handshakeResponse.getClusterName().value() != null; + remoteClusterName.set(handshakeResponse.getClusterName()); + } IOUtils.closeWhileHandlingException(connection); handshakeStep.onResponse(null); } @@ -151,9 +161,11 @@ public void onFailure(Exception e) { for (int i = 0; i < remaining; ++i) { TransportAddress address = nextAddress(resolved); - DiscoveryNode node = new DiscoveryNode(clusterAlias + "#" + address, address, Version.CURRENT.minimumCompatibilityVersion()); - connectionManager.connectToNode(node, null, (connection, profile, listener1) -> listener1.onResponse(null), - new ActionListener<>() { + String id = clusterAlias + "#" + address; + DiscoveryNode node = new DiscoveryNode(id, address, Version.CURRENT.minimumCompatibilityVersion()); + + ConnectionManager.ConnectionValidator validator = transportService.clusterNameOnlyValidator(node, remoteClusterName.get()); + connectionManager.connectToNode(node, profile, validator, new ActionListener<>() { @Override public void onResponse(Void v) { compositeListener.onResponse(v); diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index 5a62da7edfcd6..165c15df41e63 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -113,7 +113,7 @@ private void collectRemoteNodes(Iterator> seedNodes, Act openConnectionStep.whenComplete(connection -> { ConnectionProfile connectionProfile = connectionManager.getConnectionManager().getConnectionProfile(); transportService.handshake(connection, connectionProfile.getHandshakeTimeout().millis(), - getRemoteClusterNamePredicate(remoteClusterName), handshakeStep); + getRemoteClusterNamePredicate(remoteClusterName.get()), handshakeStep); }, onFailure); final StepListener fullConnectionStep = new StepListener<>(); diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index ddd7a0d4cab19..a8b87828b02a9 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -374,6 +374,18 @@ public ConnectionManager.ConnectionValidator connectionValidator(DiscoveryNode n }; } + public ConnectionManager.ConnectionValidator clusterNameOnlyValidator(DiscoveryNode node, ClusterName clusterName) { + return (newConnection, actualProfile, listener) -> { + handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true, ActionListener.map(listener, resp -> { + ClusterName remote = resp.clusterName; + if (clusterName.equals(remote) == false) { + throw new ConnectTransportException(node, "handshake failed. unexpected remote cluster name " + remote); + } + return null; + })); + }; + } + /** * Establishes and returns a new connection to the given node. The connection is NOT maintained by this service, it's the callers * responsibility to close the connection once it goes out of scope. diff --git a/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java new file mode 100644 index 0000000000000..6a68337bd3029 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java @@ -0,0 +1,253 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.Version; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class SimpleConnectionStrategyTests extends ESTestCase { + + private final String clusterAlias = "cluster-alias"; + private final ConnectionProfile profile = RemoteClusterService.buildConnectionProfileFromSettings(Settings.EMPTY, "cluster"); + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @Override + public void tearDown() throws Exception { + super.tearDown(); + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + + private MockTransportService startTransport(String id, Version version) { + return startTransport(id, version, Settings.EMPTY); + } + + public MockTransportService startTransport(final String id, final Version version, final Settings settings) { + boolean success = false; + final Settings s = Settings.builder() + .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), clusterAlias) + .put("node.name", id) + .put(settings) + .build(); + MockTransportService newService = MockTransportService.createNewService(settings, version, threadPool); + try { + newService.start(); + newService.acceptIncomingRequests(); + success = true; + return newService; + } finally { + if (success == false) { + newService.close(); + } + } + } + + public void testSimpleStrategyWillOpenExpectedNumberOfConnectionsToAddresses() { + try (MockTransportService transport1 = startTransport("node1", Version.CURRENT); + MockTransportService transport2 = startTransport("node2", Version.CURRENT)) { + TransportAddress address1 = transport1.boundAddress().publishAddress(); + TransportAddress address2 = transport2.boundAddress().publishAddress(); + + try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) { + localService.start(); + localService.acceptIncomingRequests(); + + ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); + int numOfConnections = randomIntBetween(4, 8); + try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + SimpleConnectionStrategy strategy = new SimpleConnectionStrategy(clusterAlias, localService, remoteConnectionManager, + numOfConnections, addresses(address1, address2))) { + assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); + assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); + + PlainActionFuture connectFuture = PlainActionFuture.newFuture(); + strategy.connect(connectFuture); + connectFuture.actionGet(); + + assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); + assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); + assertEquals(numOfConnections, connectionManager.size()); + assertTrue(strategy.assertNoRunningConnections()); + } + } + } + } + + public void testSimpleStrategyWillOpenNewConnectionsOnDisconnect() throws Exception { + try (MockTransportService transport1 = startTransport("node1", Version.CURRENT); + MockTransportService transport2 = startTransport("node2", Version.CURRENT)) { + TransportAddress address1 = transport1.boundAddress().publishAddress(); + TransportAddress address2 = transport2.boundAddress().publishAddress(); + + try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) { + localService.start(); + localService.acceptIncomingRequests(); + + ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); + int numOfConnections = randomIntBetween(4, 8); + try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + SimpleConnectionStrategy strategy = new SimpleConnectionStrategy(clusterAlias, localService, remoteConnectionManager, + numOfConnections, addresses(address1, address2))) { + assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); + assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); + + PlainActionFuture connectFuture = PlainActionFuture.newFuture(); + strategy.connect(connectFuture); + connectFuture.actionGet(); + + assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); + long initialConnectionsToTransport2 = connectionManager.getAllConnectedNodes().stream() + .filter(n -> n.getAddress().equals(address2)) + .count(); + assertNotEquals(0, initialConnectionsToTransport2); + assertEquals(numOfConnections, connectionManager.size()); + assertTrue(strategy.assertNoRunningConnections()); + + transport1.close(); + + assertBusy(() -> { + assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); + // More connections now pointing to transport2 + long finalConnectionsToTransport2 = connectionManager.getAllConnectedNodes().stream() + .filter(n -> n.getAddress().equals(address2)) + .count(); + assertTrue(finalConnectionsToTransport2 > initialConnectionsToTransport2); + assertTrue(strategy.assertNoRunningConnections()); + }); + } + } + } + } + + public void testConnectWithSingleIncompatibleNode() { + Version incompatibleVersion = Version.CURRENT.minimumCompatibilityVersion().minimumCompatibilityVersion(); + try (MockTransportService transport1 = startTransport("compatible-node", Version.CURRENT); + MockTransportService transport2 = startTransport("incompatible-node", incompatibleVersion)) { + TransportAddress address1 = transport1.boundAddress().publishAddress(); + TransportAddress address2 = transport2.boundAddress().publishAddress(); + + try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) { + localService.start(); + localService.acceptIncomingRequests(); + + ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); + int numOfConnections = randomIntBetween(4, 8); + try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + SimpleConnectionStrategy strategy = new SimpleConnectionStrategy(clusterAlias, localService, remoteConnectionManager, + numOfConnections, addresses(address1, address2))) { + assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); + assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); + + PlainActionFuture connectFuture = PlainActionFuture.newFuture(); + strategy.connect(connectFuture); + connectFuture.actionGet(); + + assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); + assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); + assertTrue(strategy.assertNoRunningConnections()); + } + } + } + } + + public void testConnectFailsWithIncompatibleNodes() { + Version incompatibleVersion = Version.CURRENT.minimumCompatibilityVersion().minimumCompatibilityVersion(); + try (MockTransportService transport1 = startTransport("incompatible-node", incompatibleVersion)) { + TransportAddress address1 = transport1.boundAddress().publishAddress(); + + try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) { + localService.start(); + localService.acceptIncomingRequests(); + + ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); + int numOfConnections = randomIntBetween(4, 8); + try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + SimpleConnectionStrategy strategy = new SimpleConnectionStrategy(clusterAlias, localService, remoteConnectionManager, + numOfConnections, addresses(address1))) { + + PlainActionFuture connectFuture = PlainActionFuture.newFuture(); + strategy.connect(connectFuture); + expectThrows(Exception.class, connectFuture::actionGet); + + assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); + assertEquals(0, connectionManager.size()); + assertTrue(strategy.assertNoRunningConnections()); + } + } + } + } + + public void testClusterNameValidationPreventConnectingToDifferentClusters() throws Exception { + Settings otherSettings = Settings.builder().put("cluster.name", "otherCluster").build(); + + try (MockTransportService transport1 = startTransport("cluster1", Version.CURRENT); + MockTransportService transport2 = startTransport("cluster2", Version.CURRENT, otherSettings)) { + TransportAddress address1 = transport1.boundAddress().publishAddress(); + TransportAddress address2 = transport2.boundAddress().publishAddress(); + + try (MockTransportService localService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool)) { + localService.start(); + localService.acceptIncomingRequests(); + + ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); + int numOfConnections = randomIntBetween(4, 8); + try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); + SimpleConnectionStrategy strategy = new SimpleConnectionStrategy(clusterAlias, localService, remoteConnectionManager, + numOfConnections, addresses(address1, address2))) { + assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); + assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); + + PlainActionFuture connectFuture = PlainActionFuture.newFuture(); + strategy.connect(connectFuture); + connectFuture.actionGet(); + + assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); + assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); + assertTrue(strategy.assertNoRunningConnections()); + } + } + } + } + + private static List> addresses(final TransportAddress... addresses) { + if (addresses.length == 0) { + return Collections.emptyList(); + } else if (addresses.length == 1) { + return Collections.singletonList(() -> addresses[0]); + } else { + return Arrays.stream(addresses) + .map(s -> (Supplier) () -> s) + .collect(Collectors.toList()); + } + } +} From 74a5748e8471d2fc5afcaaf084528edb03e354f4 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 7 Oct 2019 13:29:47 -0600 Subject: [PATCH 3/5] Review changes --- .../transport/SimpleConnectionStrategy.java | 9 +++++++-- .../transport/SimpleConnectionStrategyTests.java | 10 +--------- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java index eac3b71dfd383..40637f1c13839 100644 --- a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java @@ -43,6 +43,7 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy { + private static final int MAX_CONNECT_ATTEMPTS_PER_RUN = 3; private static final Logger logger = LogManager.getLogger(SimpleConnectionStrategy.class); private final int maxNumRemoteConnections; @@ -55,6 +56,7 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy { int maxNumRemoteConnections, List> addresses) { super(clusterAlias, transportService, connectionManager); this.maxNumRemoteConnections = maxNumRemoteConnections; + assert addresses.isEmpty() == false : "Cannot use simple connection strategy with no configured addresses"; this.addresses = addresses; // TODO: Move into the ConnectionManager this.profile = new ConnectionProfile.Builder() @@ -129,7 +131,7 @@ public void onFailure(Exception e) { } private void openConnections(ActionListener finished, int attemptNumber) { - if (attemptNumber <= 3) { + if (attemptNumber <= MAX_CONNECT_ATTEMPTS_PER_RUN) { List resolved = addresses.stream().map(Supplier::get).collect(Collectors.toList()); int remaining = maxNumRemoteConnections - connectionManager.size(); @@ -179,9 +181,12 @@ public void onFailure(Exception e) { }); } } else { - if (connectionManager.size() == 0) { + int openConnections = connectionManager.size(); + if (openConnections == 0) { finished.onFailure(new IllegalStateException("Unable to open any simple connections to remote cluster")); } else { + logger.trace("unable to open maximum number of connections [opened: {}, maximum: {}]", openConnections, + maxNumRemoteConnections); finished.onResponse(null); } } diff --git a/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java index 6a68337bd3029..0a813633fe91b 100644 --- a/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java @@ -240,14 +240,6 @@ numOfConnections, addresses(address1, address2))) { } private static List> addresses(final TransportAddress... addresses) { - if (addresses.length == 0) { - return Collections.emptyList(); - } else if (addresses.length == 1) { - return Collections.singletonList(() -> addresses[0]); - } else { - return Arrays.stream(addresses) - .map(s -> (Supplier) () -> s) - .collect(Collectors.toList()); - } + return Arrays.stream(addresses).map(s -> (Supplier) () -> s).collect(Collectors.toList()); } } From ba2fc95d0aa83408e168a0844349268bd261118a Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 7 Oct 2019 16:19:33 -0600 Subject: [PATCH 4/5] Review changes --- .../transport/SimpleConnectionStrategy.java | 102 ++++++------------ .../transport/TransportService.java | 12 --- .../SimpleConnectionStrategyTests.java | 35 ++++-- 3 files changed, 61 insertions(+), 88 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java index 40637f1c13839..a8350d09f9527 100644 --- a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java @@ -22,22 +22,18 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.StepListener; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.CountDown; -import org.elasticsearch.core.internal.io.IOUtils; -import java.io.IOException; import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -47,9 +43,9 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy { private static final Logger logger = LogManager.getLogger(SimpleConnectionStrategy.class); private final int maxNumRemoteConnections; - private final AtomicLong counter = new AtomicLong(); + private final AtomicLong counter = new AtomicLong(0); private final List> addresses; - private final SetOnce remoteClusterName = new SetOnce<>(); + private final AtomicReference remoteClusterName = new AtomicReference<>(); private final ConnectionProfile profile; SimpleConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, @@ -76,57 +72,7 @@ protected void connectImpl(ActionListener listener) { } private void performSimpleConnectionProcess(Iterator> addressIter, ActionListener listener) { - final Consumer onFailure = e -> { - if (e instanceof ConnectTransportException || e instanceof IOException || e instanceof IllegalStateException) { - // ISE if we fail the handshake with an version incompatible node - if (addressIter.hasNext()) { - logger.debug(() -> new ParameterizedMessage( - "handshaking with external cluster [{}] failed moving to next address", clusterAlias), e); - performSimpleConnectionProcess(addressIter, listener); - return; - } - } - logger.warn(() -> new ParameterizedMessage("handshaking with external cluster [{}] failed", clusterAlias), e); - listener.onFailure(e); - }; - - - final StepListener handshakeStep = new StepListener<>(); - - if (remoteClusterName.get() == null) { - final StepListener openConnectionStep = new StepListener<>(); - final ConnectionProfile profile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG); - TransportAddress address = addressIter.next().get(); - DiscoveryNode handshakeNode = new DiscoveryNode(clusterAlias + "#" + address, address, - Version.CURRENT.minimumCompatibilityVersion()); - connectionManager.openConnection(handshakeNode, profile, openConnectionStep); - - openConnectionStep.whenComplete(connection -> { - ConnectionProfile connectionProfile = connectionManager.getConnectionManager().getConnectionProfile(); - transportService.handshake(connection, connectionProfile.getHandshakeTimeout().millis(), - getRemoteClusterNamePredicate(remoteClusterName.get()), new ActionListener<>() { - @Override - public void onResponse(TransportService.HandshakeResponse handshakeResponse) { - if (remoteClusterName.get() == null) { - assert handshakeResponse.getClusterName().value() != null; - remoteClusterName.set(handshakeResponse.getClusterName()); - } - IOUtils.closeWhileHandlingException(connection); - handshakeStep.onResponse(null); - } - - @Override - public void onFailure(Exception e) { - IOUtils.closeWhileHandlingException(connection); - handshakeStep.onFailure(e); - } - }); - }, onFailure); - } else { - handshakeStep.onResponse(null); - } - - handshakeStep.whenComplete(v -> openConnections(listener, 1), onFailure); + openConnections(listener, 1); } @@ -166,19 +112,19 @@ public void onFailure(Exception e) { String id = clusterAlias + "#" + address; DiscoveryNode node = new DiscoveryNode(id, address, Version.CURRENT.minimumCompatibilityVersion()); - ConnectionManager.ConnectionValidator validator = transportService.clusterNameOnlyValidator(node, remoteClusterName.get()); + ConnectionManager.ConnectionValidator validator = clusterNameValidator(node); connectionManager.connectToNode(node, profile, validator, new ActionListener<>() { - @Override - public void onResponse(Void v) { - compositeListener.onResponse(v); - } + @Override + public void onResponse(Void v) { + compositeListener.onResponse(v); + } - @Override - public void onFailure(Exception e) { - logger.debug(() -> new ParameterizedMessage("failed to open remote connection to address {}", address), e); - compositeListener.onFailure(e); - } - }); + @Override + public void onFailure(Exception e) { + logger.debug(() -> new ParameterizedMessage("failed to open remote connection to address {}", address), e); + compositeListener.onFailure(e); + } + }); } } else { int openConnections = connectionManager.size(); @@ -194,7 +140,23 @@ public void onFailure(Exception e) { private TransportAddress nextAddress(List resolvedAddresses) { long curr; - while ((curr = counter.incrementAndGet()) == Long.MIN_VALUE) ; + while ((curr = counter.getAndIncrement()) == Long.MIN_VALUE) ; return resolvedAddresses.get(Math.floorMod(curr, resolvedAddresses.size())); } + + private ConnectionManager.ConnectionValidator clusterNameValidator(DiscoveryNode node) { + return (newConnection, actualProfile, listener) -> + transportService.handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true, + ActionListener.map(listener, resp -> { + ClusterName remote = resp.getClusterName(); + if (remoteClusterName.compareAndSet(null, remote)) { + return null; + } else { + if (remoteClusterName.get().equals(remote) == false) { + throw new ConnectTransportException(node, "handshake failed. unexpected remote cluster name " + remote); + } + return null; + } + })); + } } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index a8b87828b02a9..ddd7a0d4cab19 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -374,18 +374,6 @@ public ConnectionManager.ConnectionValidator connectionValidator(DiscoveryNode n }; } - public ConnectionManager.ConnectionValidator clusterNameOnlyValidator(DiscoveryNode node, ClusterName clusterName) { - return (newConnection, actualProfile, listener) -> { - handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true, ActionListener.map(listener, resp -> { - ClusterName remote = resp.clusterName; - if (clusterName.equals(remote) == false) { - throw new ConnectTransportException(node, "handshake failed. unexpected remote cluster name " + remote); - } - return null; - })); - }; - } - /** * Establishes and returns a new connection to the given node. The connection is NOT maintained by this service, it's the callers * responsibility to close the connection once it goes out of scope. diff --git a/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java index 0a813633fe91b..68e2622c040fd 100644 --- a/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/SimpleConnectionStrategyTests.java @@ -26,13 +26,14 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.test.transport.StubbableTransport; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -160,8 +161,22 @@ public void testConnectWithSingleIncompatibleNode() { localService.start(); localService.acceptIncomingRequests(); - ConnectionManager connectionManager = new ConnectionManager(profile, localService.transport); - int numOfConnections = randomIntBetween(4, 8); + StubbableTransport stubbableTransport = new StubbableTransport(localService.transport); + ConnectionManager connectionManager = new ConnectionManager(profile, stubbableTransport); + AtomicInteger address1Attempts = new AtomicInteger(0); + AtomicInteger address2Attempts = new AtomicInteger(0); + stubbableTransport.setDefaultConnectBehavior((transport, discoveryNode, profile, listener) -> { + if (discoveryNode.getAddress().equals(address1)) { + address1Attempts.incrementAndGet(); + transport.openConnection(discoveryNode, profile, listener); + } else if (discoveryNode.getAddress().equals(address2)) { + address2Attempts.incrementAndGet(); + transport.openConnection(discoveryNode, profile, listener); + } else { + throw new AssertionError("Unexpected address"); + } + }); + int numOfConnections = 5; try (RemoteConnectionManager remoteConnectionManager = new RemoteConnectionManager(clusterAlias, connectionManager); SimpleConnectionStrategy strategy = new SimpleConnectionStrategy(clusterAlias, localService, remoteConnectionManager, numOfConnections, addresses(address1, address2))) { @@ -172,7 +187,12 @@ numOfConnections, addresses(address1, address2))) { strategy.connect(connectFuture); connectFuture.actionGet(); - assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); + assertEquals(4 ,connectionManager.size()); + assertEquals(4 ,connectionManager.getAllConnectedNodes().stream().map(n -> n.getAddress().equals(address1)).count()); + // Three attempts on first round, one attempts on second round, zero attempts on third round + assertEquals(4, address1Attempts.get()); + // Two attempts on first round, one attempt on second round, one attempt on third round + assertEquals(4, address2Attempts.get()); assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); assertTrue(strategy.assertNoRunningConnections()); } @@ -231,8 +251,11 @@ numOfConnections, addresses(address1, address2))) { strategy.connect(connectFuture); connectFuture.actionGet(); - assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))); - assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); + if (connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address1))) { + assertFalse(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); + } else { + assertTrue(connectionManager.getAllConnectedNodes().stream().anyMatch(n -> n.getAddress().equals(address2))); + } assertTrue(strategy.assertNoRunningConnections()); } } From 95ce8fde5b6bbc73eaebfe2eb8b6e333a179dc11 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 8 Oct 2019 11:02:12 -0600 Subject: [PATCH 5/5] Changes --- .../transport/RemoteConnectionStrategy.java | 17 ------- .../transport/SimpleConnectionStrategy.java | 47 +++++++++---------- .../transport/SniffConnectionStrategy.java | 17 ++++++- 3 files changed, 39 insertions(+), 42 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index 53dfb1b1c3176..d327a171920e0 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -25,7 +25,6 @@ import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; -import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.threadpool.ThreadPool; @@ -37,7 +36,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Predicate; public abstract class RemoteConnectionStrategy implements TransportConnectionListener, Closeable { @@ -166,19 +164,4 @@ private List> getAndClearListeners() { } return result; } - - static Predicate getRemoteClusterNamePredicate(ClusterName remoteClusterName) { - return new Predicate<>() { - @Override - public boolean test(ClusterName c) { - return remoteClusterName == null || c.equals(remoteClusterName); - } - - @Override - public String toString() { - return remoteClusterName == null ? "any cluster name" - : "expected remote cluster name [" + remoteClusterName.value() + "]"; - } - }; - } } diff --git a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java index a8350d09f9527..24e9e18c8dc10 100644 --- a/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SimpleConnectionStrategy.java @@ -47,6 +47,7 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy { private final List> addresses; private final AtomicReference remoteClusterName = new AtomicReference<>(); private final ConnectionProfile profile; + private final ConnectionManager.ConnectionValidator clusterNameValidator; SimpleConnectionStrategy(String clusterAlias, TransportService transportService, RemoteConnectionManager connectionManager, int maxNumRemoteConnections, List> addresses) { @@ -59,6 +60,20 @@ public class SimpleConnectionStrategy extends RemoteConnectionStrategy { .addConnections(1, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING) .addConnections(0, TransportRequestOptions.Type.BULK, TransportRequestOptions.Type.STATE, TransportRequestOptions.Type.RECOVERY) .build(); + this.clusterNameValidator = (newConnection, actualProfile, listener) -> + transportService.handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true, + ActionListener.map(listener, resp -> { + ClusterName remote = resp.getClusterName(); + if (remoteClusterName.compareAndSet(null, remote)) { + return null; + } else { + if (remoteClusterName.get().equals(remote) == false) { + DiscoveryNode node = newConnection.getNode(); + throw new ConnectTransportException(node, "handshake failed. unexpected remote cluster name " + remote); + } + return null; + } + })); } @Override @@ -73,7 +88,6 @@ protected void connectImpl(ActionListener listener) { private void performSimpleConnectionProcess(Iterator> addressIter, ActionListener listener) { openConnections(listener, 1); - } private void openConnections(ActionListener finished, int attemptNumber) { @@ -111,9 +125,8 @@ public void onFailure(Exception e) { TransportAddress address = nextAddress(resolved); String id = clusterAlias + "#" + address; DiscoveryNode node = new DiscoveryNode(id, address, Version.CURRENT.minimumCompatibilityVersion()); - - ConnectionManager.ConnectionValidator validator = clusterNameValidator(node); - connectionManager.connectToNode(node, profile, validator, new ActionListener<>() { + + connectionManager.connectToNode(node, profile, clusterNameValidator, new ActionListener<>() { @Override public void onResponse(Void v) { compositeListener.onResponse(v); @@ -121,7 +134,8 @@ public void onResponse(Void v) { @Override public void onFailure(Exception e) { - logger.debug(() -> new ParameterizedMessage("failed to open remote connection to address {}", address), e); + logger.debug(new ParameterizedMessage("failed to open remote connection [remote cluster: {}, address: {}]", + clusterAlias, address), e); compositeListener.onFailure(e); } }); @@ -129,10 +143,11 @@ public void onFailure(Exception e) { } else { int openConnections = connectionManager.size(); if (openConnections == 0) { - finished.onFailure(new IllegalStateException("Unable to open any simple connections to remote cluster")); + finished.onFailure(new IllegalStateException("Unable to open any simple connections to remote cluster [" + clusterAlias + + "]")); } else { - logger.trace("unable to open maximum number of connections [opened: {}, maximum: {}]", openConnections, - maxNumRemoteConnections); + logger.debug("unable to open maximum number of connections [remote cluster: {}, opened: {}, maximum: {}]", clusterAlias, + openConnections, maxNumRemoteConnections); finished.onResponse(null); } } @@ -143,20 +158,4 @@ private TransportAddress nextAddress(List resolvedAddresses) { while ((curr = counter.getAndIncrement()) == Long.MIN_VALUE) ; return resolvedAddresses.get(Math.floorMod(curr, resolvedAddresses.size())); } - - private ConnectionManager.ConnectionValidator clusterNameValidator(DiscoveryNode node) { - return (newConnection, actualProfile, listener) -> - transportService.handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true, - ActionListener.map(listener, resp -> { - ClusterName remote = resp.getClusterName(); - if (remoteClusterName.compareAndSet(null, remote)) { - return null; - } else { - if (remoteClusterName.get().equals(remote) == false) { - throw new ConnectTransportException(node, "handshake failed. unexpected remote cluster name " + remote); - } - return null; - } - })); - } } diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index 165c15df41e63..f71ce576a3c22 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -113,7 +113,7 @@ private void collectRemoteNodes(Iterator> seedNodes, Act openConnectionStep.whenComplete(connection -> { ConnectionProfile connectionProfile = connectionManager.getConnectionManager().getConnectionProfile(); transportService.handshake(connection, connectionProfile.getHandshakeTimeout().millis(), - getRemoteClusterNamePredicate(remoteClusterName.get()), handshakeStep); + getRemoteClusterNamePredicate(), handshakeStep); }, onFailure); final StepListener fullConnectionStep = new StepListener<>(); @@ -246,6 +246,21 @@ public String executor() { } } + private Predicate getRemoteClusterNamePredicate() { + return new Predicate<>() { + @Override + public boolean test(ClusterName c) { + return remoteClusterName.get() == null || c.equals(remoteClusterName.get()); + } + + @Override + public String toString() { + return remoteClusterName.get() == null ? "any cluster name" + : "expected remote cluster name [" + remoteClusterName.get().value() + "]"; + } + }; + } + private static DiscoveryNode maybeAddProxyAddress(String proxyAddress, DiscoveryNode node) { if (proxyAddress == null || proxyAddress.isEmpty()) { return node;