Skip to content

Commit 2c75504

Browse files
authored
Move connection listener to ConnectionManager (#32992)
This is a followup to #31886. After that commit the TransportConnectionListener had to be propogated to both the Transport and the ConnectionManager. This commit moves that listener to completely live in the ConnectionManager. The request and response related methods are moved to a TransportMessageListener. That listener continues to live in the Transport class.
1 parent 50426a6 commit 2c75504

File tree

14 files changed

+163
-170
lines changed

14 files changed

+163
-170
lines changed

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -86,14 +86,6 @@ protected MockTransportService build(Settings settings, Version version, Cluster
8686
return transportService;
8787
}
8888

89-
@Override
90-
protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException {
91-
final Netty4Transport t = (Netty4Transport) transport;
92-
@SuppressWarnings("unchecked")
93-
final TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) connection;
94-
TcpChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true);
95-
}
96-
9789
public void testConnectException() throws UnknownHostException {
9890
try {
9991
serviceA.connectToNode(new DiscoveryNode("C", new TransportAddress(InetAddress.getByName("localhost"), 9876),

server/src/main/java/org/elasticsearch/transport/ConnectionManager.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ public void removeListener(TransportConnectionListener listener) {
9191
}
9292

9393
public Transport.Connection openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
94-
return transport.openConnection(node, ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile));
94+
ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile);
95+
return internalOpenConnection(node, resolvedProfile);
9596
}
9697

9798
/**
@@ -115,7 +116,7 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil
115116
}
116117
boolean success = false;
117118
try {
118-
connection = transport.openConnection(node, resolvedProfile);
119+
connection = internalOpenConnection(node, resolvedProfile);
119120
connectionValidator.accept(connection, resolvedProfile);
120121
// we acquire a connection lock, so no way there is an existing connection
121122
connectedNodes.put(node, connection);
@@ -227,6 +228,19 @@ public void close() {
227228
}
228229
}
229230

231+
private Transport.Connection internalOpenConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
232+
Transport.Connection connection = transport.openConnection(node, connectionProfile);
233+
try {
234+
connectionListener.onConnectionOpened(connection);
235+
} finally {
236+
connection.addCloseListener(ActionListener.wrap(() -> connectionListener.onConnectionClosed(connection)));
237+
}
238+
if (connection.isClosed()) {
239+
throw new ConnectTransportException(node, "a channel closed while connecting");
240+
}
241+
return connection;
242+
}
243+
230244
private void ensureOpen() {
231245
if (lifecycle.started() == false) {
232246
throw new IllegalStateException("connection manager is closed");
@@ -289,6 +303,20 @@ public void onNodeConnected(DiscoveryNode node) {
289303
listener.onNodeConnected(node);
290304
}
291305
}
306+
307+
@Override
308+
public void onConnectionOpened(Transport.Connection connection) {
309+
for (TransportConnectionListener listener : listeners) {
310+
listener.onConnectionOpened(connection);
311+
}
312+
}
313+
314+
@Override
315+
public void onConnectionClosed(Transport.Connection connection) {
316+
for (TransportConnectionListener listener : listeners) {
317+
listener.onConnectionClosed(connection);
318+
}
319+
}
292320
}
293321

294322
static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {

server/src/main/java/org/elasticsearch/transport/TcpTransport.java

Lines changed: 18 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
180180
protected final NetworkService networkService;
181181
protected final Set<ProfileSettings> profileSettings;
182182

183-
private final DelegatingTransportConnectionListener transportListener = new DelegatingTransportConnectionListener();
183+
private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener();
184184

185185
private final ConcurrentMap<String, BoundTransportAddress> profileBoundAddresses = newConcurrentMap();
186186

@@ -246,14 +246,12 @@ public TcpTransport(String transportName, Settings settings, ThreadPool threadPo
246246
protected void doStart() {
247247
}
248248

249-
@Override
250-
public void addConnectionListener(TransportConnectionListener listener) {
251-
transportListener.listeners.add(listener);
249+
public void addMessageListener(TransportMessageListener listener) {
250+
messageListener.listeners.add(listener);
252251
}
253252

254-
@Override
255-
public boolean removeConnectionListener(TransportConnectionListener listener) {
256-
return transportListener.listeners.remove(listener);
253+
public boolean removeMessageListener(TransportMessageListener listener) {
254+
return messageListener.listeners.remove(listener);
257255
}
258256

259257
@Override
@@ -342,10 +340,6 @@ public TcpChannel channel(TransportRequestOptions.Type type) {
342340
return connectionTypeHandle.getChannel(channels);
343341
}
344342

345-
boolean allChannelsOpen() {
346-
return channels.stream().allMatch(TcpChannel::isOpen);
347-
}
348-
349343
@Override
350344
public boolean sendPing() {
351345
for (TcpChannel channel : channels) {
@@ -479,22 +473,13 @@ public NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connect
479473
// underlying channels.
480474
nodeChannels = new NodeChannels(node, channels, connectionProfile, version);
481475
final NodeChannels finalNodeChannels = nodeChannels;
482-
try {
483-
transportListener.onConnectionOpened(nodeChannels);
484-
} finally {
485-
nodeChannels.addCloseListener(ActionListener.wrap(() -> transportListener.onConnectionClosed(finalNodeChannels)));
486-
}
487476

488477
Consumer<TcpChannel> onClose = c -> {
489478
assert c.isOpen() == false : "channel is still open when onClose is called";
490479
finalNodeChannels.close();
491480
};
492481

493482
nodeChannels.channels.forEach(ch -> ch.addCloseListener(ActionListener.wrap(() -> onClose.accept(ch))));
494-
495-
if (nodeChannels.allChannelsOpen() == false) {
496-
throw new ConnectTransportException(node, "a channel closed while connecting");
497-
}
498483
success = true;
499484
return nodeChannels;
500485
} catch (ConnectTransportException e) {
@@ -895,7 +880,7 @@ private void sendRequestToChannel(final DiscoveryNode node, final TcpChannel cha
895880
final TransportRequestOptions finalOptions = options;
896881
// this might be called in a different thread
897882
SendListener onRequestSent = new SendListener(channel, stream,
898-
() -> transportListener.onRequestSent(node, requestId, action, request, finalOptions), message.length());
883+
() -> messageListener.onRequestSent(node, requestId, action, request, finalOptions), message.length());
899884
internalSendMessage(channel, message, onRequestSent);
900885
addedReleaseListener = true;
901886
} finally {
@@ -949,7 +934,7 @@ public void sendErrorResponse(
949934
final BytesReference header = buildHeader(requestId, status, nodeVersion, bytes.length());
950935
CompositeBytesReference message = new CompositeBytesReference(header, bytes);
951936
SendListener onResponseSent = new SendListener(channel, null,
952-
() -> transportListener.onResponseSent(requestId, action, error), message.length());
937+
() -> messageListener.onResponseSent(requestId, action, error), message.length());
953938
internalSendMessage(channel, message, onResponseSent);
954939
}
955940
}
@@ -998,7 +983,7 @@ private void sendResponse(
998983
final TransportResponseOptions finalOptions = options;
999984
// this might be called in a different thread
1000985
SendListener listener = new SendListener(channel, stream,
1001-
() -> transportListener.onResponseSent(requestId, action, response, finalOptions), message.length());
986+
() -> messageListener.onResponseSent(requestId, action, response, finalOptions), message.length());
1002987
internalSendMessage(channel, message, listener);
1003988
addedReleaseListener = true;
1004989
} finally {
@@ -1193,7 +1178,7 @@ public final void messageReceived(BytesReference reference, TcpChannel channel,
11931178
if (isHandshake) {
11941179
handler = pendingHandshakes.remove(requestId);
11951180
} else {
1196-
TransportResponseHandler theHandler = responseHandlers.onResponseReceived(requestId, transportListener);
1181+
TransportResponseHandler theHandler = responseHandlers.onResponseReceived(requestId, messageListener);
11971182
if (theHandler == null && TransportStatus.isError(status)) {
11981183
handler = pendingHandshakes.remove(requestId);
11991184
} else {
@@ -1300,7 +1285,7 @@ protected String handleRequest(TcpChannel channel, String profileName, final Str
13001285
features = Collections.emptySet();
13011286
}
13021287
final String action = stream.readString();
1303-
transportListener.onRequestReceived(requestId, action);
1288+
messageListener.onRequestReceived(requestId, action);
13041289
TransportChannel transportChannel = null;
13051290
try {
13061291
if (TransportStatus.isHandshake(status)) {
@@ -1609,69 +1594,42 @@ public ProfileSettings(Settings settings, String profileName) {
16091594
}
16101595
}
16111596

1612-
private static final class DelegatingTransportConnectionListener implements TransportConnectionListener {
1613-
private final List<TransportConnectionListener> listeners = new CopyOnWriteArrayList<>();
1597+
private static final class DelegatingTransportMessageListener implements TransportMessageListener {
1598+
1599+
private final List<TransportMessageListener> listeners = new CopyOnWriteArrayList<>();
16141600

16151601
@Override
16161602
public void onRequestReceived(long requestId, String action) {
1617-
for (TransportConnectionListener listener : listeners) {
1603+
for (TransportMessageListener listener : listeners) {
16181604
listener.onRequestReceived(requestId, action);
16191605
}
16201606
}
16211607

16221608
@Override
16231609
public void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions finalOptions) {
1624-
for (TransportConnectionListener listener : listeners) {
1610+
for (TransportMessageListener listener : listeners) {
16251611
listener.onResponseSent(requestId, action, response, finalOptions);
16261612
}
16271613
}
16281614

16291615
@Override
16301616
public void onResponseSent(long requestId, String action, Exception error) {
1631-
for (TransportConnectionListener listener : listeners) {
1617+
for (TransportMessageListener listener : listeners) {
16321618
listener.onResponseSent(requestId, action, error);
16331619
}
16341620
}
16351621

16361622
@Override
16371623
public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request,
16381624
TransportRequestOptions finalOptions) {
1639-
for (TransportConnectionListener listener : listeners) {
1625+
for (TransportMessageListener listener : listeners) {
16401626
listener.onRequestSent(node, requestId, action, request, finalOptions);
16411627
}
16421628
}
16431629

1644-
@Override
1645-
public void onNodeDisconnected(DiscoveryNode key) {
1646-
for (TransportConnectionListener listener : listeners) {
1647-
listener.onNodeDisconnected(key);
1648-
}
1649-
}
1650-
1651-
@Override
1652-
public void onConnectionOpened(Connection nodeChannels) {
1653-
for (TransportConnectionListener listener : listeners) {
1654-
listener.onConnectionOpened(nodeChannels);
1655-
}
1656-
}
1657-
1658-
@Override
1659-
public void onNodeConnected(DiscoveryNode node) {
1660-
for (TransportConnectionListener listener : listeners) {
1661-
listener.onNodeConnected(node);
1662-
}
1663-
}
1664-
1665-
@Override
1666-
public void onConnectionClosed(Connection nodeChannels) {
1667-
for (TransportConnectionListener listener : listeners) {
1668-
listener.onConnectionClosed(nodeChannels);
1669-
}
1670-
}
1671-
16721630
@Override
16731631
public void onResponseReceived(long requestId, ResponseContext holder) {
1674-
for (TransportConnectionListener listener : listeners) {
1632+
for (TransportMessageListener listener : listeners) {
16751633
listener.onResponseReceived(requestId, holder);
16761634
}
16771635
}

server/src/main/java/org/elasticsearch/transport/Transport.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -56,18 +56,9 @@ public interface Transport extends LifecycleComponent {
5656
*/
5757
RequestHandlerRegistry getRequestHandler(String action);
5858

59-
/**
60-
* Adds a new event listener
61-
* @param listener the listener to add
62-
*/
63-
void addConnectionListener(TransportConnectionListener listener);
59+
void addMessageListener(TransportMessageListener listener);
6460

65-
/**
66-
* Removes an event listener
67-
* @param listener the listener to remove
68-
* @return <code>true</code> iff the listener was removed otherwise <code>false</code>
69-
*/
70-
boolean removeConnectionListener(TransportConnectionListener listener);
61+
boolean removeMessageListener(TransportMessageListener listener);
7162

7263
/**
7364
* The address the transport is bound on.
@@ -254,7 +245,7 @@ public List<ResponseContext> prune(Predicate<ResponseContext> predicate) {
254245
* sent request (before any processing or deserialization was done). Returns the appropriate response handler or null if not
255246
* found.
256247
*/
257-
public TransportResponseHandler onResponseReceived(final long requestId, TransportConnectionListener listener) {
248+
public TransportResponseHandler onResponseReceived(final long requestId, TransportMessageListener listener) {
258249
ResponseContext context = handlers.remove(requestId);
259250
listener.onResponseReceived(requestId, context);
260251
if (context == null) {

server/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -28,42 +28,6 @@
2828
*/
2929
public interface TransportConnectionListener {
3030

31-
/**
32-
* Called once a request is received
33-
* @param requestId the internal request ID
34-
* @param action the request action
35-
*
36-
*/
37-
default void onRequestReceived(long requestId, String action) {}
38-
39-
/**
40-
* Called for every action response sent after the response has been passed to the underlying network implementation.
41-
* @param requestId the request ID (unique per client)
42-
* @param action the request action
43-
* @param response the response send
44-
* @param finalOptions the response options
45-
*/
46-
default void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions finalOptions) {}
47-
48-
/***
49-
* Called for every failed action response after the response has been passed to the underlying network implementation.
50-
* @param requestId the request ID (unique per client)
51-
* @param action the request action
52-
* @param error the error sent back to the caller
53-
*/
54-
default void onResponseSent(long requestId, String action, Exception error) {}
55-
56-
/**
57-
* Called for every request sent to a server after the request has been passed to the underlying network implementation
58-
* @param node the node the request was sent to
59-
* @param requestId the internal request id
60-
* @param action the action name
61-
* @param request the actual request
62-
* @param finalOptions the request options
63-
*/
64-
default void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request,
65-
TransportRequestOptions finalOptions) {}
66-
6731
/**
6832
* Called once a connection was opened
6933
* @param connection the connection
@@ -76,13 +40,6 @@ default void onConnectionOpened(Transport.Connection connection) {}
7640
*/
7741
default void onConnectionClosed(Transport.Connection connection) {}
7842

79-
/**
80-
* Called for every response received
81-
* @param requestId the request id for this reponse
82-
* @param context the response context or null if the context was already processed ie. due to a timeout.
83-
*/
84-
default void onResponseReceived(long requestId, Transport.ResponseContext context) {}
85-
8643
/**
8744
* Called once a node connection is opened and registered.
8845
*/

0 commit comments

Comments
 (0)