Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.lease.Releasable;
Expand Down Expand Up @@ -218,7 +219,18 @@ public void close() {
}

private Transport.Connection internalOpenConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
Transport.Connection connection = transport.openConnection(node, connectionProfile);
PlainActionFuture<Transport.Connection> future = PlainActionFuture.newFuture();
Releasable pendingConnection = transport.openConnection(node, connectionProfile, future);
Transport.Connection connection;
try {
connection = future.actionGet();
} catch (IllegalStateException e) {
// If the future was interrupted we must cancel the pending connection to avoid channels leaking
if (e.getCause() instanceof InterruptedException) {
pendingConnection.close();
}
throw e;
}
try {
connectionListener.onConnectionOpened(connection);
} finally {
Expand Down
30 changes: 10 additions & 20 deletions server/src/main/java/org/elasticsearch/transport/TcpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NotifyOnceListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
Expand All @@ -46,6 +45,7 @@
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkAddress;
Expand Down Expand Up @@ -349,34 +349,24 @@ protected ConnectionProfile maybeOverrideConnectionProfile(ConnectionProfile con
}

@Override
public NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
Objects.requireNonNull(connectionProfile, "connection profile cannot be null");
public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener<Transport.Connection> listener) {
Objects.requireNonNull(profile, "connection profile cannot be null");
if (node == null) {
throw new ConnectTransportException(null, "can't open connection to a null node");
}
connectionProfile = maybeOverrideConnectionProfile(connectionProfile);
ConnectionProfile finalProfile = maybeOverrideConnectionProfile(profile);
closeLock.readLock().lock(); // ensure we don't open connections while we are closing
try {
ensureOpen();
PlainActionFuture<NodeChannels> connectionFuture = PlainActionFuture.newFuture();
List<TcpChannel> pendingChannels = initiateConnection(node, connectionProfile, connectionFuture);

try {
return connectionFuture.actionGet();
} catch (IllegalStateException e) {
// If the future was interrupted we can close the channels to improve the shutdown of the MockTcpTransport
if (e.getCause() instanceof InterruptedException) {
CloseableChannel.closeChannels(pendingChannels, false);
}
throw e;
}
List<TcpChannel> pendingChannels = initiateConnection(node, finalProfile, listener);
return () -> CloseableChannel.closeChannels(pendingChannels, false);
} finally {
closeLock.readLock().unlock();
}
}

private List<TcpChannel> initiateConnection(DiscoveryNode node, ConnectionProfile connectionProfile,
ActionListener<NodeChannels> listener) {
ActionListener<Transport.Connection> listener) {
int numConnections = connectionProfile.getNumConnections();
assert numConnections > 0 : "A connection profile must be configured with at least one connection";

Expand Down Expand Up @@ -432,7 +422,7 @@ public List<String> getLocalAddresses() {

protected void bindServer(ProfileSettings profileSettings) {
// Bind and start to accept incoming connections.
InetAddress hostAddresses[];
InetAddress[] hostAddresses;
List<String> profileBindHosts = profileSettings.bindHosts;
try {
hostAddresses = networkService.resolveBindHostAddresses(profileBindHosts.toArray(Strings.EMPTY_ARRAY));
Expand Down Expand Up @@ -1581,11 +1571,11 @@ private final class ChannelsConnectedListener implements ActionListener<Void> {
private final DiscoveryNode node;
private final ConnectionProfile connectionProfile;
private final List<TcpChannel> channels;
private final ActionListener<NodeChannels> listener;
private final ActionListener<Transport.Connection> listener;
private final CountDown countDown;

private ChannelsConnectedListener(DiscoveryNode node, ConnectionProfile connectionProfile, List<TcpChannel> channels,
ActionListener<NodeChannels> listener) {
ActionListener<Transport.Connection> listener) {
this.node = node;
this.connectionProfile = connectionProfile;
this.channels = channels;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.transport.BoundTransportAddress;
Expand Down Expand Up @@ -86,10 +87,12 @@ default CircuitBreaker getInFlightRequestBreaker() {
}

/**
* Opens a new connection to the given node and returns it. The returned connection is not managed by
* the transport implementation. This connection must be closed once it's not needed anymore.
* Opens a new connection to the given node. When the connection is fully connected, the listener is
* called. A {@link Releasable} is returned representing the pending connection. If the caller of this
* method decides to move on before the listener is called with the completed connection, they should
* release the pending connection to prevent hanging connections.
*/
Connection openConnection(DiscoveryNode node, ConnectionProfile profile);
Releasable openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener<Transport.Connection> listener);

TransportStats getStats();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.client.transport;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse;
import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
Expand All @@ -30,6 +31,7 @@
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
Expand Down Expand Up @@ -79,8 +81,8 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
protected abstract ClusterState getMockClusterState(DiscoveryNode node);

@Override
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) {
return new CloseableConnection() {
public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener<Connection> connectionListener) {
connectionListener.onResponse(new CloseableConnection() {

@Override
public DiscoveryNode getNode() {
Expand Down Expand Up @@ -134,7 +136,9 @@ public void sendRequest(long requestId, String action, TransportRequest request,
}
}
}
};
});

return () -> {};
}

protected abstract Response newResponse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -167,7 +168,9 @@ public <T extends TransportResponse> void sendRequest(Transport.Connection conne
transportService.addNodeConnectedBehavior((connectionManager, discoveryNode) -> false);
transportService.addGetConnectionBehavior((connectionManager, discoveryNode) -> {
// The FailAndRetryTransport does not use the connection profile
return transport.openConnection(discoveryNode, null);
PlainActionFuture<Transport.Connection> future = PlainActionFuture.newFuture();
transport.openConnection(discoveryNode, null, future);
return future.actionGet();
});
transportService.start();
transportService.acceptIncomingRequests();
Expand Down Expand Up @@ -358,11 +361,19 @@ public void testSniffNodesSamplerClosesConnections() throws Exception {
try (MockTransportService clientService = createNewService(clientSettings, Version.CURRENT, threadPool, null)) {
final List<Transport.Connection> establishedConnections = new CopyOnWriteArrayList<>();

clientService.addConnectBehavior(remoteService, (transport, discoveryNode, profile) -> {
Transport.Connection connection = transport.openConnection(discoveryNode, profile);
establishedConnections.add(connection);
return connection;
});
clientService.addConnectBehavior(remoteService, (transport, discoveryNode, profile, listener) ->
transport.openConnection(discoveryNode, profile, new ActionListener<Transport.Connection>() {
@Override
public void onResponse(Transport.Connection connection) {
establishedConnections.add(connection);
listener.onResponse(connection);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}));


clientService.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
Expand Down Expand Up @@ -187,8 +188,6 @@ public HandshakeResponse handshake(Transport.Connection connection, long timeout
private final class MockTransport implements Transport {
private ResponseHandlers responseHandlers = new ResponseHandlers();
private volatile boolean randomConnectionExceptions = false;
private TransportMessageListener listener = new TransportMessageListener() {
};

@Override
public <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) {
Expand All @@ -201,7 +200,6 @@ public RequestHandlerRegistry getRequestHandler(String action) {

@Override
public void addMessageListener(TransportMessageListener listener) {
this.listener = listener;
}

@Override
Expand All @@ -225,13 +223,14 @@ public TransportAddress[] addressesFromString(String address, int perAddressLimi
}

@Override
public Connection openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
if (connectionProfile == null) {
public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener<Connection> listener) {
if (profile == null) {
if (randomConnectionExceptions && randomBoolean()) {
throw new ConnectTransportException(node, "simulated");
listener.onFailure(new ConnectTransportException(node, "simulated"));
return () -> {};
}
}
Connection connection = new Connection() {
listener.onResponse(new Connection() {
@Override
public DiscoveryNode getNode() {
return node;
Expand All @@ -257,8 +256,8 @@ public void close() {
public boolean isClosed() {
return false;
}
};
return connection;
});
return () -> {};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.transport;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -35,8 +36,10 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class ConnectionManagerTests extends ESTestCase {

Expand Down Expand Up @@ -82,7 +85,11 @@ public void onNodeDisconnected(DiscoveryNode node) {

DiscoveryNode node = new DiscoveryNode("", new TransportAddress(InetAddress.getLoopbackAddress(), 0), Version.CURRENT);
Transport.Connection connection = new TestConnect(node);
when(transport.openConnection(node, connectionProfile)).thenReturn(connection);
doAnswer(invocationOnMock -> {
ActionListener<Transport.Connection> listener = (ActionListener<Transport.Connection>) invocationOnMock.getArguments()[2];
listener.onResponse(connection);
return null;
}).when(transport).openConnection(eq(node), eq(connectionProfile), any(ActionListener.class));

assertFalse(connectionManager.nodeConnected(node));

Expand Down Expand Up @@ -126,7 +133,11 @@ public void onNodeDisconnected(DiscoveryNode node) {

DiscoveryNode node = new DiscoveryNode("", new TransportAddress(InetAddress.getLoopbackAddress(), 0), Version.CURRENT);
Transport.Connection connection = new TestConnect(node);
when(transport.openConnection(node, connectionProfile)).thenReturn(connection);
doAnswer(invocationOnMock -> {
ActionListener<Transport.Connection> listener = (ActionListener<Transport.Connection>) invocationOnMock.getArguments()[2];
listener.onResponse(connection);
return null;
}).when(transport).openConnection(eq(node), eq(connectionProfile), any(ActionListener.class));

assertFalse(connectionManager.nodeConnected(node));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1441,7 +1441,7 @@ public static Transport getProxyTransport(ThreadPool threadPool, Map<String, Map

StubbableTransport stubbableTransport = new StubbableTransport(MockTransportService.newMockTransport(Settings.EMPTY, Version
.CURRENT, threadPool));
stubbableTransport.setDefaultConnectBehavior((t, node, profile) -> {
stubbableTransport.setDefaultConnectBehavior((t, node, profile, listener) -> {
Map<String, DiscoveryNode> proxyMapping = nodeMap.get(node.getAddress().toString());
if (proxyMapping == null) {
throw new IllegalStateException("no proxy mapping for node: " + node);
Expand All @@ -1455,34 +1455,44 @@ public static Transport getProxyTransport(ThreadPool threadPool, Map<String, Map
// route by seed hostname
proxyNode = proxyMapping.get(node.getHostName());
}
Transport.Connection connection = t.openConnection(proxyNode, profile);
return new Transport.Connection() {
return t.openConnection(proxyNode, profile, new ActionListener<Transport.Connection>() {
@Override
public DiscoveryNode getNode() {
return node;
}
public void onResponse(Transport.Connection connection) {
Transport.Connection proxyConnection = new Transport.Connection() {
@Override
public DiscoveryNode getNode() {
return node;
}

@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
connection.sendRequest(requestId, action, request, options);
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException, TransportException {
connection.sendRequest(requestId, action, request, options);
}

@Override
public void addCloseListener(ActionListener<Void> listener) {
connection.addCloseListener(listener);
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
connection.addCloseListener(listener);
}

@Override
public boolean isClosed() {
return connection.isClosed();
@Override
public boolean isClosed() {
return connection.isClosed();
}

@Override
public void close() {
connection.close();
}
};
listener.onResponse(proxyConnection);
}

@Override
public void close() {
connection.close();
public void onFailure(Exception e) {
listener.onFailure(e);
}
};
});
});
return stubbableTransport;
}
Expand Down
Loading