Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,9 @@ void collectRemoteNodes(Iterator<DiscoveryNode> seedNodes,
ClusterStateRequest request = new ClusterStateRequest();
request.clear();
request.nodes(true);
// here we pass on the connection since we can only close it once the sendRequest returns otherwise
// due to the async nature (it will return before it's actually sent) this can cause the request to fail
// due to an already closed connection.
transportService.sendRequest(connection,
ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY,
new SniffClusterStateResponseHandler(transportService, connection, listener, seedNodes,
Expand Down Expand Up @@ -443,24 +446,30 @@ public ClusterStateResponse newInstance() {
@Override
public void handleResponse(ClusterStateResponse response) {
try {
cancellableThreads.executeIO(() -> {
DiscoveryNodes nodes = response.getState().nodes();
Iterable<DiscoveryNode> nodesIter = nodes.getNodes()::valuesIt;
for (DiscoveryNode node : nodesIter) {
if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) {
try {
transportService.connectToNode(node, remoteProfile); // noop if node is connected
connectedNodes.add(node);
} catch (ConnectTransportException | IllegalStateException ex) {
// ISE if we fail the handshake with an version incompatible node
// fair enough we can't connect just move on
logger.debug((Supplier<?>)
() -> new ParameterizedMessage("failed to connect to node {}", node), ex);
try (Closeable theConnection = connection) { // the connection is unused - see comment in #collectRemoteNodes
// we have to close this connection before we notify listeners - this is mainly needed for test correctness
// since if we do it afterwards we might fail assertions that check if all high level connections are closed.
// from a code correctness perspective we could also close it afterwards. This try/with block will
// maintain the possibly exceptions thrown from within the try block and suppress the ones that are possible thrown
// by closing the connection
cancellableThreads.executeIO(() -> {
DiscoveryNodes nodes = response.getState().nodes();
Iterable<DiscoveryNode> nodesIter = nodes.getNodes()::valuesIt;
for (DiscoveryNode node : nodesIter) {
if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) {
try {
transportService.connectToNode(node, remoteProfile); // noop if node is connected
connectedNodes.add(node);
} catch (ConnectTransportException | IllegalStateException ex) {
// ISE if we fail the handshake with an version incompatible node
// fair enough we can't connect just move on
logger.debug((Supplier<?>)
() -> new ParameterizedMessage("failed to connect to node {}", node), ex);
}
}
}
}
});
connection.close();
});
}
listener.onResponse(null);
} catch (CancellableThreads.ExecutionCancelledException ex) {
listener.onFailure(ex); // we got canceled - fail the listener and step out
Expand All @@ -469,9 +478,6 @@ public void handleResponse(ClusterStateResponse response) {
() -> new ParameterizedMessage("fetching nodes from external cluster {} failed",
clusterAlias), ex);
collectRemoteNodes(seedNodes, transportService, listener);
} finally {
// just to make sure we don't leak anything we close the connection here again even if we managed to do so before
IOUtils.closeWhileHandlingException(connection);
}
}

Expand Down