From 86e9f39f9ceecc57aba648579a41e0f3e1afe059 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 26 Jan 2017 11:31:44 +0100 Subject: [PATCH 1/2] Improve connection closing in `RemoteClusterConnection` Some tests verify that all connection have been closed but due to the async / concurrent nature of `RemoteClusterConnection` there are situations where we notify listeners that trigger tests to finish before we actually closed all connections. The race is very very small and has no impact on the code correctness. This commit documents and improves the way we close connections to ensure test won't fail with false positives. Closes #22803 --- .../search/RemoteClusterConnection.java | 44 +++++++++++-------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java b/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java index cce03cc4ed4a0..01d296f65c744 100644 --- a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java +++ b/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java @@ -370,6 +370,9 @@ void collectRemoteNodes(Iterator seedNodes, ClusterStateRequest request = new ClusterStateRequest(); request.clear(); request.nodes(true); + // here we pass on the connection since we can only close it once the sendRequest returns otherwise + // due to the async nature (it will return before it's actually send) this can cause the request to fail + // due to an already closed connection. transportService.sendRequest(connection, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY, new SniffClusterStateResponseHandler(transportService, connection, listener, seedNodes, @@ -443,24 +446,30 @@ public ClusterStateResponse newInstance() { @Override public void handleResponse(ClusterStateResponse response) { try { - cancellableThreads.executeIO(() -> { - DiscoveryNodes nodes = response.getState().nodes(); - Iterable nodesIter = nodes.getNodes()::valuesIt; - for (DiscoveryNode node : nodesIter) { - if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) { - try { - transportService.connectToNode(node, remoteProfile); // noop if node is connected - connectedNodes.add(node); - } catch (ConnectTransportException | IllegalStateException ex) { - // ISE if we fail the handshake with an version incompatible node - // fair enough we can't connect just move on - logger.debug((Supplier) - () -> new ParameterizedMessage("failed to connect to node {}", node), ex); + try (Closeable theConnection = connection) { // the connection is unused - see comment in #collectRemoteNodes + // we have to close this connection before we notify listeners - this is mainly needed for test correctness + // since if we do it afterwards we might fail assertions that check if all high level connections are closed. + // from a code correctness perspective we could also close it afterwards. This try/with block will + // maintain the actual exceptions thrown from within the try block and suppress the ones that are possible thrown + // by closing hte connection + cancellableThreads.executeIO(() -> { + DiscoveryNodes nodes = response.getState().nodes(); + Iterable nodesIter = nodes.getNodes()::valuesIt; + for (DiscoveryNode node : nodesIter) { + if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) { + try { + transportService.connectToNode(node, remoteProfile); // noop if node is connected + connectedNodes.add(node); + } catch (ConnectTransportException | IllegalStateException ex) { + // ISE if we fail the handshake with an version incompatible node + // fair enough we can't connect just move on + logger.debug((Supplier) + () -> new ParameterizedMessage("failed to connect to node {}", node), ex); + } } } - } - }); - connection.close(); + }); + } listener.onResponse(null); } catch (CancellableThreads.ExecutionCancelledException ex) { listener.onFailure(ex); // we got canceled - fail the listener and step out @@ -469,9 +478,6 @@ public void handleResponse(ClusterStateResponse response) { () -> new ParameterizedMessage("fetching nodes from external cluster {} failed", clusterAlias), ex); collectRemoteNodes(seedNodes, transportService, listener); - } finally { - // just to make sure we don't leak anything we close the connection here again even if we managed to do so before - IOUtils.closeWhileHandlingException(connection); } } From 4a592afa3097486d8abba604825cdf589b3061a1 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 26 Jan 2017 11:50:32 +0100 Subject: [PATCH 2/2] fix typo --- .../action/search/RemoteClusterConnection.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java b/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java index 01d296f65c744..73bc4f2ee7ed3 100644 --- a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java +++ b/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java @@ -371,7 +371,7 @@ void collectRemoteNodes(Iterator seedNodes, request.clear(); request.nodes(true); // here we pass on the connection since we can only close it once the sendRequest returns otherwise - // due to the async nature (it will return before it's actually send) this can cause the request to fail + // due to the async nature (it will return before it's actually sent) this can cause the request to fail // due to an already closed connection. transportService.sendRequest(connection, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY, @@ -450,8 +450,8 @@ public void handleResponse(ClusterStateResponse response) { // we have to close this connection before we notify listeners - this is mainly needed for test correctness // since if we do it afterwards we might fail assertions that check if all high level connections are closed. // from a code correctness perspective we could also close it afterwards. This try/with block will - // maintain the actual exceptions thrown from within the try block and suppress the ones that are possible thrown - // by closing hte connection + // maintain the possibly exceptions thrown from within the try block and suppress the ones that are possible thrown + // by closing the connection cancellableThreads.executeIO(() -> { DiscoveryNodes nodes = response.getState().nodes(); Iterable nodesIter = nodes.getNodes()::valuesIt;