Skip to content

Commit 1f3eb06

Browse files
authored
Add infrastructure to manage network connections outside of Transport/TransportService (#22194)
Some expert users like UnicastZenPing today establishes real connections to nodes during it's ping phase that can be used by other parts of the system. Yet, this is potentially dangerous and undesirable unless the nodes have been fully verified and should be connected to in the case of a cluster state update or if we join a newly elected master. For use-cases like this, this change adds the infrastructure to manually handle connections that are not publicly available on the node ie. should not be managed by `Transport`/`TransportSerivce`
1 parent 0b338bf commit 1f3eb06

File tree

5 files changed

+102
-16
lines changed

5 files changed

+102
-16
lines changed

core/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,9 @@ public List<String> getLocalAddresses() {
290290
return transport.getLocalAddresses();
291291
}
292292

293+
/**
294+
* Returns <code>true</code> iff the given node is already connected.
295+
*/
293296
public boolean nodeConnected(DiscoveryNode node) {
294297
return node.equals(localNode) || transport.nodeConnected(node);
295298
}
@@ -311,6 +314,20 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection
311314
transport.connectToNode(node, connectionProfile);
312315
}
313316

317+
/**
318+
* Establishes and returns a new connection to the given node. The connection is NOT maintained by this service, it's the callers
319+
* responsibility to close the connection once it goes out of scope.
320+
* @param node the node to connect to
321+
* @param profile the connection profile to use
322+
*/
323+
public Transport.Connection openConnection(final DiscoveryNode node, ConnectionProfile profile) throws IOException {
324+
if (node.equals(localNode)) {
325+
return localNodeConnection;
326+
} else {
327+
return transport.openConnection(node, profile);
328+
}
329+
}
330+
314331
/**
315332
* Lightly connect to the specified node, returning updated node
316333
* information. The handshake will fail if the cluster name on the
@@ -337,7 +354,19 @@ public DiscoveryNode connectToNodeAndHandshake(
337354
return handshakeNode;
338355
}
339356

340-
private DiscoveryNode handshake(
357+
/**
358+
* Executes a high-level handshake using the given connection
359+
* and returns the discovery node of the node the connection
360+
* was established with. The handshake will fail if the cluster
361+
* name on the target node mismatches the local cluster name.
362+
*
363+
* @param connection the connection to a specific node
364+
* @param handshakeTimeout handshake timeout
365+
* @return the connected node
366+
* @throws ConnectTransportException if the connection failed
367+
* @throws IllegalStateException if the handshake failed
368+
*/
369+
public DiscoveryNode handshake(
341370
final Transport.Connection connection,
342371
final long handshakeTimeout) throws ConnectTransportException {
343372
final HandshakeResponse response;
@@ -465,7 +494,7 @@ public final <T extends TransportResponse> void sendRequest(final DiscoveryNode
465494
}
466495
}
467496

468-
final <T extends TransportResponse> void sendRequest(final Transport.Connection connection, final String action,
497+
public final <T extends TransportResponse> void sendRequest(final Transport.Connection connection, final String action,
469498
final TransportRequest request,
470499
final TransportRequestOptions options,
471500
TransportResponseHandler<T> handler) {
@@ -477,7 +506,7 @@ final <T extends TransportResponse> void sendRequest(final Transport.Connection
477506
* Returns either a real transport connection or a local node connection if we are using the local node optimization.
478507
* @throws NodeNotConnectedException if the given node is not connected
479508
*/
480-
private Transport.Connection getConnection(DiscoveryNode node) {
509+
public Transport.Connection getConnection(DiscoveryNode node) {
481510
if (Objects.requireNonNull(node, "node must be non-null").equals(localNode)) {
482511
return localNodeConnection;
483512
} else {

core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.test.ESTestCase;
4141
import org.elasticsearch.test.VersionUtils;
4242
import org.elasticsearch.test.junit.annotations.TestLogging;
43+
import org.elasticsearch.test.transport.MockTransportService;
4344
import org.elasticsearch.threadpool.TestThreadPool;
4445
import org.elasticsearch.threadpool.ThreadPool;
4546
import org.elasticsearch.transport.MockTcpTransport;
@@ -571,7 +572,7 @@ private NetworkHandle startServices(
571572
final BiFunction<Settings, Version, Transport> supplier) {
572573
final Transport transport = supplier.apply(settings, version);
573574
final TransportService transportService =
574-
new TransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
575+
new MockTransportService(settings, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
575576
transportService.start();
576577
transportService.acceptIncomingRequests();
577578
final ConcurrentMap<TransportAddress, AtomicInteger> counters = ConcurrentCollections.newConcurrentMap();

core/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -113,14 +113,24 @@ public void testConnectToNodeLight() throws IOException {
113113
emptyMap(),
114114
emptySet(),
115115
Version.CURRENT.minimumCompatibilityVersion());
116+
try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode, ConnectionProfile.LIGHT_PROFILE)){
117+
DiscoveryNode connectedNode = handleA.transportService.handshake(connection, timeout);
118+
assertNotNull(connectedNode);
119+
// the name and version should be updated
120+
assertEquals(connectedNode.getName(), "TS_B");
121+
assertEquals(connectedNode.getVersion(), handleB.discoveryNode.getVersion());
122+
assertFalse(handleA.transportService.nodeConnected(discoveryNode));
123+
}
124+
116125
DiscoveryNode connectedNode =
117-
handleA.transportService.connectToNodeAndHandshake(discoveryNode, timeout);
126+
handleA.transportService.connectToNodeAndHandshake(discoveryNode, timeout);
118127
assertNotNull(connectedNode);
119128

120129
// the name and version should be updated
121130
assertEquals(connectedNode.getName(), "TS_B");
122131
assertEquals(connectedNode.getVersion(), handleB.discoveryNode.getVersion());
123132
assertTrue(handleA.transportService.nodeConnected(discoveryNode));
133+
124134
}
125135

126136
public void testMismatchedClusterName() {
@@ -133,8 +143,12 @@ public void testMismatchedClusterName() {
133143
emptyMap(),
134144
emptySet(),
135145
Version.CURRENT.minimumCompatibilityVersion());
136-
IllegalStateException ex = expectThrows(IllegalStateException.class, () -> handleA.transportService.connectToNodeAndHandshake(
137-
discoveryNode, timeout));
146+
IllegalStateException ex = expectThrows(IllegalStateException.class, () -> {
147+
try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode,
148+
ConnectionProfile.LIGHT_PROFILE)) {
149+
handleA.transportService.handshake(connection, timeout);
150+
}
151+
});
138152
assertThat(ex.getMessage(), containsString("handshake failed, mismatched cluster name [Cluster [b]]"));
139153
assertFalse(handleA.transportService.nodeConnected(discoveryNode));
140154
}
@@ -150,8 +164,12 @@ public void testIncompatibleVersions() {
150164
emptyMap(),
151165
emptySet(),
152166
Version.CURRENT.minimumCompatibilityVersion());
153-
IllegalStateException ex = expectThrows(IllegalStateException.class, () -> handleA.transportService.connectToNodeAndHandshake(
154-
discoveryNode, timeout));
167+
IllegalStateException ex = expectThrows(IllegalStateException.class, () -> {
168+
try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode,
169+
ConnectionProfile.LIGHT_PROFILE)) {
170+
handleA.transportService.handshake(connection, timeout);
171+
}
172+
});
155173
assertThat(ex.getMessage(), containsString("handshake failed, incompatible version"));
156174
assertFalse(handleA.transportService.nodeConnected(discoveryNode));
157175
}

test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,13 @@
5757
import java.net.UnknownHostException;
5858
import java.util.Arrays;
5959
import java.util.Collections;
60+
import java.util.HashMap;
6061
import java.util.HashSet;
6162
import java.util.List;
6263
import java.util.Map;
6364
import java.util.Queue;
6465
import java.util.Set;
66+
import java.util.concurrent.ConcurrentHashMap;
6567
import java.util.concurrent.ConcurrentMap;
6668
import java.util.concurrent.CopyOnWriteArrayList;
6769
import java.util.concurrent.LinkedBlockingDeque;
@@ -80,6 +82,7 @@
8082
*/
8183
public final class MockTransportService extends TransportService {
8284

85+
private final Map<DiscoveryNode, List<Transport.Connection>> openConnections = new HashMap<>();
8386

8487
public static class TestPlugin extends Plugin {
8588
@Override
@@ -553,9 +556,7 @@ public void stop() {
553556
}
554557

555558
@Override
556-
public void close() {
557-
transport.close();
558-
}
559+
public void close() { transport.close(); }
559560

560561
@Override
561562
public Map<String, BoundTransportAddress> profileBoundAddresses() {
@@ -701,4 +702,41 @@ public Transport getOriginalTransport() {
701702
}
702703
return transport;
703704
}
705+
706+
@Override
707+
public Transport.Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
708+
FilteredConnection filteredConnection = new FilteredConnection(super.openConnection(node, profile)) {
709+
final AtomicBoolean closed = new AtomicBoolean(false);
710+
@Override
711+
public void close() throws IOException {
712+
try {
713+
super.close();
714+
} finally {
715+
if (closed.compareAndSet(false, true)) {
716+
synchronized (openConnections) {
717+
List<Transport.Connection> connections = openConnections.get(node);
718+
boolean remove = connections.remove(this);
719+
assert remove;
720+
if (connections.isEmpty()) {
721+
openConnections.remove(node);
722+
}
723+
}
724+
}
725+
}
726+
727+
}
728+
};
729+
synchronized (openConnections) {
730+
List<Transport.Connection> connections = openConnections.computeIfAbsent(node,
731+
(n) -> new CopyOnWriteArrayList<>());
732+
connections.add(filteredConnection);
733+
}
734+
return filteredConnection;
735+
}
736+
737+
@Override
738+
protected void doClose() {
739+
super.doClose();
740+
assert openConnections.size() == 0 : "still open connections: " + openConnections;
741+
}
704742
}

test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1351,8 +1351,8 @@ public void handleException(TransportException exp) {
13511351
// all is well
13521352
}
13531353

1354-
try {
1355-
serviceB.connectToNodeAndHandshake(nodeA, 100);
1354+
try (Transport.Connection connection = serviceB.openConnection(nodeA, ConnectionProfile.LIGHT_PROFILE)){
1355+
serviceB.handshake(connection, 100);
13561356
fail("exception should be thrown");
13571357
} catch (IllegalStateException e) {
13581358
// all is well
@@ -1409,8 +1409,8 @@ public void handleException(TransportException exp) {
14091409
// all is well
14101410
}
14111411

1412-
try {
1413-
serviceB.connectToNodeAndHandshake(nodeA, 100);
1412+
try (Transport.Connection connection = serviceB.openConnection(nodeA, ConnectionProfile.LIGHT_PROFILE)){
1413+
serviceB.handshake(connection, 100);
14141414
fail("exception should be thrown");
14151415
} catch (IllegalStateException e) {
14161416
// all is well

0 commit comments

Comments
 (0)