Skip to content

Commit fafb3d0

Browse files
committed
Add infrastructure to manage network connections outside of Transport/TransportService (#22194)
Some expert users like UnicastZenPing today establishes real connections to nodes during it's ping phase that can be used by other parts of the system. Yet, this is potentially dangerous and undesirable unless the nodes have been fully verified and should be connected to in the case of a cluster state update or if we join a newly elected master. For use-cases like this, this change adds the infrastructure to manually handle connections that are not publicly available on the node ie. should not be managed by `Transport`/`TransportSerivce`
1 parent 39cbcf8 commit fafb3d0

File tree

5 files changed

+103
-16
lines changed

5 files changed

+103
-16
lines changed

core/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,9 @@ public List<String> getLocalAddresses() {
294294
return transport.getLocalAddresses();
295295
}
296296

297+
/**
298+
* Returns <code>true</code> iff the given node is already connected.
299+
*/
297300
public boolean nodeConnected(DiscoveryNode node) {
298301
return node.equals(localNode) || transport.nodeConnected(node);
299302
}
@@ -315,6 +318,20 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection
315318
transport.connectToNode(node, connectionProfile);
316319
}
317320

321+
/**
322+
* Establishes and returns a new connection to the given node. The connection is NOT maintained by this service, it's the callers
323+
* responsibility to close the connection once it goes out of scope.
324+
* @param node the node to connect to
325+
* @param profile the connection profile to use
326+
*/
327+
public Transport.Connection openConnection(final DiscoveryNode node, ConnectionProfile profile) throws IOException {
328+
if (node.equals(localNode)) {
329+
return localNodeConnection;
330+
} else {
331+
return transport.openConnection(node, profile);
332+
}
333+
}
334+
318335
/**
319336
* Lightly connect to the specified node, returning updated node
320337
* information. The handshake will fail if the cluster name on the
@@ -341,7 +358,19 @@ public DiscoveryNode connectToNodeAndHandshake(
341358
return handshakeNode;
342359
}
343360

344-
private DiscoveryNode handshake(
361+
/**
362+
* Executes a high-level handshake using the given connection
363+
* and returns the discovery node of the node the connection
364+
* was established with. The handshake will fail if the cluster
365+
* name on the target node mismatches the local cluster name.
366+
*
367+
* @param connection the connection to a specific node
368+
* @param handshakeTimeout handshake timeout
369+
* @return the connected node
370+
* @throws ConnectTransportException if the connection failed
371+
* @throws IllegalStateException if the handshake failed
372+
*/
373+
public DiscoveryNode handshake(
345374
final Transport.Connection connection,
346375
final long handshakeTimeout) throws ConnectTransportException {
347376
final HandshakeResponse response;
@@ -474,7 +503,7 @@ public final <T extends TransportResponse> void sendRequest(final DiscoveryNode
474503
}
475504
}
476505

477-
final <T extends TransportResponse> void sendRequest(final Transport.Connection connection, final String action,
506+
public final <T extends TransportResponse> void sendRequest(final Transport.Connection connection, final String action,
478507
final TransportRequest request,
479508
final TransportRequestOptions options,
480509
TransportResponseHandler<T> handler) {
@@ -486,7 +515,7 @@ final <T extends TransportResponse> void sendRequest(final Transport.Connection
486515
* Returns either a real transport connection or a local node connection if we are using the local node optimization.
487516
* @throws NodeNotConnectedException if the given node is not connected
488517
*/
489-
private Transport.Connection getConnection(DiscoveryNode node) {
518+
public Transport.Connection getConnection(DiscoveryNode node) {
490519
if (Objects.requireNonNull(node, "node must be non-null").equals(localNode)) {
491520
return localNodeConnection;
492521
} else {

core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.elasticsearch.test.ESTestCase;
4242
import org.elasticsearch.test.VersionUtils;
4343
import org.elasticsearch.test.junit.annotations.TestLogging;
44+
import org.elasticsearch.test.transport.MockTransportService;
4445
import org.elasticsearch.threadpool.TestThreadPool;
4546
import org.elasticsearch.threadpool.ThreadPool;
4647
import org.elasticsearch.transport.MockTcpTransport;
@@ -573,7 +574,7 @@ private NetworkHandle startServices(
573574
final BiFunction<Settings, Version, Transport> supplier) {
574575
final Transport transport = supplier.apply(settings, version);
575576
final TransportService transportService =
576-
new TransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
577+
new MockTransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
577578
transportService.start();
578579
transportService.acceptIncomingRequests();
579580
final ConcurrentMap<TransportAddress, AtomicInteger> counters = ConcurrentCollections.newConcurrentMap();

core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -113,14 +113,24 @@ public void testConnectToNodeLight() throws IOException {
113113
emptyMap(),
114114
emptySet(),
115115
Version.CURRENT.minimumCompatibilityVersion());
116+
try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode, ConnectionProfile.LIGHT_PROFILE)){
117+
DiscoveryNode connectedNode = handleA.transportService.handshake(connection, timeout);
118+
assertNotNull(connectedNode);
119+
// the name and version should be updated
120+
assertEquals(connectedNode.getName(), "TS_B");
121+
assertEquals(connectedNode.getVersion(), handleB.discoveryNode.getVersion());
122+
assertFalse(handleA.transportService.nodeConnected(discoveryNode));
123+
}
124+
116125
DiscoveryNode connectedNode =
117-
handleA.transportService.connectToNodeAndHandshake(discoveryNode, timeout);
126+
handleA.transportService.connectToNodeAndHandshake(discoveryNode, timeout);
118127
assertNotNull(connectedNode);
119128

120129
// the name and version should be updated
121130
assertEquals(connectedNode.getName(), "TS_B");
122131
assertEquals(connectedNode.getVersion(), handleB.discoveryNode.getVersion());
123132
assertTrue(handleA.transportService.nodeConnected(discoveryNode));
133+
124134
}
125135

126136
public void testMismatchedClusterName() {
@@ -133,8 +143,12 @@ public void testMismatchedClusterName() {
133143
emptyMap(),
134144
emptySet(),
135145
Version.CURRENT.minimumCompatibilityVersion());
136-
IllegalStateException ex = expectThrows(IllegalStateException.class, () -> handleA.transportService.connectToNodeAndHandshake(
137-
discoveryNode, timeout));
146+
IllegalStateException ex = expectThrows(IllegalStateException.class, () -> {
147+
try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode,
148+
ConnectionProfile.LIGHT_PROFILE)) {
149+
handleA.transportService.handshake(connection, timeout);
150+
}
151+
});
138152
assertThat(ex.getMessage(), containsString("handshake failed, mismatched cluster name [Cluster [b]]"));
139153
assertFalse(handleA.transportService.nodeConnected(discoveryNode));
140154
}
@@ -150,8 +164,12 @@ public void testIncompatibleVersions() {
150164
emptyMap(),
151165
emptySet(),
152166
Version.CURRENT.minimumCompatibilityVersion());
153-
IllegalStateException ex = expectThrows(IllegalStateException.class, () -> handleA.transportService.connectToNodeAndHandshake(
154-
discoveryNode, timeout));
167+
IllegalStateException ex = expectThrows(IllegalStateException.class, () -> {
168+
try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode,
169+
ConnectionProfile.LIGHT_PROFILE)) {
170+
handleA.transportService.handshake(connection, timeout);
171+
}
172+
});
155173
assertThat(ex.getMessage(), containsString("handshake failed, incompatible version"));
156174
assertFalse(handleA.transportService.nodeConnected(discoveryNode));
157175
}

test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,13 @@
5656
import java.net.UnknownHostException;
5757
import java.util.Arrays;
5858
import java.util.Collections;
59+
import java.util.HashMap;
5960
import java.util.HashSet;
6061
import java.util.List;
6162
import java.util.Map;
6263
import java.util.Queue;
6364
import java.util.Set;
65+
import java.util.concurrent.ConcurrentHashMap;
6466
import java.util.concurrent.ConcurrentMap;
6567
import java.util.concurrent.CopyOnWriteArrayList;
6668
import java.util.concurrent.LinkedBlockingDeque;
@@ -79,6 +81,8 @@
7981
*/
8082
public final class MockTransportService extends TransportService {
8183

84+
private final Map<DiscoveryNode, List<Transport.Connection>> openConnections = new HashMap<>();
85+
8286
public static class TestPlugin extends Plugin {
8387
@Override
8488
public List<Setting<?>> getSettings() {
@@ -560,9 +564,7 @@ public void stop() {
560564
}
561565

562566
@Override
563-
public void close() {
564-
transport.close();
565-
}
567+
public void close() { transport.close(); }
566568

567569
@Override
568570
public Map<String, BoundTransportAddress> profileBoundAddresses() {
@@ -708,4 +710,41 @@ public Transport getOriginalTransport() {
708710
}
709711
return transport;
710712
}
713+
714+
@Override
715+
public Transport.Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
716+
FilteredConnection filteredConnection = new FilteredConnection(super.openConnection(node, profile)) {
717+
final AtomicBoolean closed = new AtomicBoolean(false);
718+
@Override
719+
public void close() throws IOException {
720+
try {
721+
super.close();
722+
} finally {
723+
if (closed.compareAndSet(false, true)) {
724+
synchronized (openConnections) {
725+
List<Transport.Connection> connections = openConnections.get(node);
726+
boolean remove = connections.remove(this);
727+
assert remove;
728+
if (connections.isEmpty()) {
729+
openConnections.remove(node);
730+
}
731+
}
732+
}
733+
}
734+
735+
}
736+
};
737+
synchronized (openConnections) {
738+
List<Transport.Connection> connections = openConnections.computeIfAbsent(node,
739+
(n) -> new CopyOnWriteArrayList<>());
740+
connections.add(filteredConnection);
741+
}
742+
return filteredConnection;
743+
}
744+
745+
@Override
746+
protected void doClose() {
747+
super.doClose();
748+
assert openConnections.size() == 0 : "still open connections: " + openConnections;
749+
}
711750
}

test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1347,8 +1347,8 @@ public void handleException(TransportException exp) {
13471347
// all is well
13481348
}
13491349

1350-
try {
1351-
serviceB.connectToNodeAndHandshake(nodeA, 100);
1350+
try (Transport.Connection connection = serviceB.openConnection(nodeA, ConnectionProfile.LIGHT_PROFILE)){
1351+
serviceB.handshake(connection, 100);
13521352
fail("exception should be thrown");
13531353
} catch (IllegalStateException e) {
13541354
// all is well
@@ -1405,8 +1405,8 @@ public void handleException(TransportException exp) {
14051405
// all is well
14061406
}
14071407

1408-
try {
1409-
serviceB.connectToNodeAndHandshake(nodeA, 100);
1408+
try (Transport.Connection connection = serviceB.openConnection(nodeA, ConnectionProfile.LIGHT_PROFILE)){
1409+
serviceB.handshake(connection, 100);
14101410
fail("exception should be thrown");
14111411
} catch (IllegalStateException e) {
14121412
// all is well

0 commit comments

Comments
 (0)