Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -87,13 +86,6 @@ protected MockTransportService build(Settings settings, Version version, Cluster
return transportService;
}

@Override
protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException {
final Netty4Transport t = (Netty4Transport) transport;
final TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) connection;
CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true);
}

public void testConnectException() throws UnknownHostException {
try {
serviceA.connectToNode(new DiscoveryNode("C", new TransportAddress(InetAddress.getByName("localhost"), 9876),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -93,12 +92,6 @@ protected MockTransportService build(Settings settings, Version version, Cluster
return transportService;
}

@Override
protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException {
TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) connection;
CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true);
}

public void testConnectException() throws UnknownHostException {
try {
serviceA.connectToNode(new DiscoveryNode("C", new TransportAddress(InetAddress.getByName("localhost"), 9876),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ public void removeListener(TransportConnectionListener listener) {
}

public Transport.Connection openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
return transport.openConnection(node, ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile));
ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile);
return internalOpenConnection(node, resolvedProfile);
}

/**
Expand All @@ -115,7 +116,7 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil
}
boolean success = false;
try {
connection = transport.openConnection(node, resolvedProfile);
connection = internalOpenConnection(node, resolvedProfile);
connectionValidator.accept(connection, resolvedProfile);
// we acquire a connection lock, so no way there is an existing connection
connectedNodes.put(node, connection);
Expand Down Expand Up @@ -227,6 +228,19 @@ public void close() {
}
}

private Transport.Connection internalOpenConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
Transport.Connection connection = transport.openConnection(node, connectionProfile);
try {
connectionListener.onConnectionOpened(connection);
} finally {
connection.addCloseListener(ActionListener.wrap(() -> connectionListener.onConnectionClosed(connection)));
}
if (connection.isClosed()) {
throw new ConnectTransportException(node, "a channel closed while connecting");
}
return connection;
}

private void ensureOpen() {
if (lifecycle.started() == false) {
throw new IllegalStateException("connection manager is closed");
Expand Down Expand Up @@ -289,6 +303,20 @@ public void onNodeConnected(DiscoveryNode node) {
listener.onNodeConnected(node);
}
}

@Override
public void onConnectionOpened(Transport.Connection connection) {
for (TransportConnectionListener listener : listeners) {
listener.onConnectionOpened(connection);
}
}

@Override
public void onConnectionClosed(Transport.Connection connection) {
for (TransportConnectionListener listener : listeners) {
listener.onConnectionClosed(connection);
}
}
}

static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
Expand Down
78 changes: 18 additions & 60 deletions server/src/main/java/org/elasticsearch/transport/TcpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
protected final NetworkService networkService;
protected final Set<ProfileSettings> profileSettings;

private final DelegatingTransportConnectionListener transportListener = new DelegatingTransportConnectionListener();
private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener();

private final ConcurrentMap<String, BoundTransportAddress> profileBoundAddresses = newConcurrentMap();
private final Map<String, List<TcpServerChannel>> serverChannels = newConcurrentMap();
Expand Down Expand Up @@ -248,14 +248,12 @@ public TcpTransport(String transportName, Settings settings, ThreadPool threadPo
protected void doStart() {
}

@Override
public void addConnectionListener(TransportConnectionListener listener) {
transportListener.listeners.add(listener);
public void addMessageListener(TransportMessageListener listener) {
messageListener.listeners.add(listener);
}

@Override
public boolean removeConnectionListener(TransportConnectionListener listener) {
return transportListener.listeners.remove(listener);
public boolean removeMessageListener(TransportMessageListener listener) {
return messageListener.listeners.remove(listener);
}

@Override
Expand Down Expand Up @@ -344,10 +342,6 @@ public TcpChannel channel(TransportRequestOptions.Type type) {
return connectionTypeHandle.getChannel(channels);
}

boolean allChannelsOpen() {
return channels.stream().allMatch(TcpChannel::isOpen);
}

@Override
public boolean sendPing() {
for (TcpChannel channel : channels) {
Expand Down Expand Up @@ -481,22 +475,13 @@ public NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connect
// underlying channels.
nodeChannels = new NodeChannels(node, channels, connectionProfile, version);
final NodeChannels finalNodeChannels = nodeChannels;
try {
transportListener.onConnectionOpened(nodeChannels);
} finally {
nodeChannels.addCloseListener(ActionListener.wrap(() -> transportListener.onConnectionClosed(finalNodeChannels)));
}

Consumer<TcpChannel> onClose = c -> {
assert c.isOpen() == false : "channel is still open when onClose is called";
finalNodeChannels.close();
};

nodeChannels.channels.forEach(ch -> ch.addCloseListener(ActionListener.wrap(() -> onClose.accept(ch))));

if (nodeChannels.allChannelsOpen() == false) {
throw new ConnectTransportException(node, "a channel closed while connecting");
}
success = true;
return nodeChannels;
} catch (ConnectTransportException e) {
Expand Down Expand Up @@ -907,7 +892,7 @@ private void sendRequestToChannel(final DiscoveryNode node, final TcpChannel cha
final TransportRequestOptions finalOptions = options;
// this might be called in a different thread
SendListener onRequestSent = new SendListener(channel, stream,
() -> transportListener.onRequestSent(node, requestId, action, request, finalOptions), message.length());
() -> messageListener.onRequestSent(node, requestId, action, request, finalOptions), message.length());
internalSendMessage(channel, message, onRequestSent);
addedReleaseListener = true;
} finally {
Expand Down Expand Up @@ -961,7 +946,7 @@ public void sendErrorResponse(
final BytesReference header = buildHeader(requestId, status, nodeVersion, bytes.length());
CompositeBytesReference message = new CompositeBytesReference(header, bytes);
SendListener onResponseSent = new SendListener(channel, null,
() -> transportListener.onResponseSent(requestId, action, error), message.length());
() -> messageListener.onResponseSent(requestId, action, error), message.length());
internalSendMessage(channel, message, onResponseSent);
}
}
Expand Down Expand Up @@ -1010,7 +995,7 @@ private void sendResponse(
final TransportResponseOptions finalOptions = options;
// this might be called in a different thread
SendListener listener = new SendListener(channel, stream,
() -> transportListener.onResponseSent(requestId, action, response, finalOptions), message.length());
() -> messageListener.onResponseSent(requestId, action, response, finalOptions), message.length());
internalSendMessage(channel, message, listener);
addedReleaseListener = true;
} finally {
Expand Down Expand Up @@ -1266,7 +1251,7 @@ public final void messageReceived(BytesReference reference, TcpChannel channel)
if (isHandshake) {
handler = pendingHandshakes.remove(requestId);
} else {
TransportResponseHandler theHandler = responseHandlers.onResponseReceived(requestId, transportListener);
TransportResponseHandler theHandler = responseHandlers.onResponseReceived(requestId, messageListener);
if (theHandler == null && TransportStatus.isError(status)) {
handler = pendingHandshakes.remove(requestId);
} else {
Expand Down Expand Up @@ -1373,7 +1358,7 @@ protected String handleRequest(TcpChannel channel, String profileName, final Str
features = Collections.emptySet();
}
final String action = stream.readString();
transportListener.onRequestReceived(requestId, action);
messageListener.onRequestReceived(requestId, action);
TransportChannel transportChannel = null;
try {
if (TransportStatus.isHandshake(status)) {
Expand Down Expand Up @@ -1682,69 +1667,42 @@ public ProfileSettings(Settings settings, String profileName) {
}
}

private static final class DelegatingTransportConnectionListener implements TransportConnectionListener {
private final List<TransportConnectionListener> listeners = new CopyOnWriteArrayList<>();
private static final class DelegatingTransportMessageListener implements TransportMessageListener {

private final List<TransportMessageListener> listeners = new CopyOnWriteArrayList<>();

@Override
public void onRequestReceived(long requestId, String action) {
for (TransportConnectionListener listener : listeners) {
for (TransportMessageListener listener : listeners) {
listener.onRequestReceived(requestId, action);
}
}

@Override
public void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions finalOptions) {
for (TransportConnectionListener listener : listeners) {
for (TransportMessageListener listener : listeners) {
listener.onResponseSent(requestId, action, response, finalOptions);
}
}

@Override
public void onResponseSent(long requestId, String action, Exception error) {
for (TransportConnectionListener listener : listeners) {
for (TransportMessageListener listener : listeners) {
listener.onResponseSent(requestId, action, error);
}
}

@Override
public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request,
TransportRequestOptions finalOptions) {
for (TransportConnectionListener listener : listeners) {
for (TransportMessageListener listener : listeners) {
listener.onRequestSent(node, requestId, action, request, finalOptions);
}
}

@Override
public void onNodeDisconnected(DiscoveryNode key) {
for (TransportConnectionListener listener : listeners) {
listener.onNodeDisconnected(key);
}
}

@Override
public void onConnectionOpened(Connection nodeChannels) {
for (TransportConnectionListener listener : listeners) {
listener.onConnectionOpened(nodeChannels);
}
}

@Override
public void onNodeConnected(DiscoveryNode node) {
for (TransportConnectionListener listener : listeners) {
listener.onNodeConnected(node);
}
}

@Override
public void onConnectionClosed(Connection nodeChannels) {
for (TransportConnectionListener listener : listeners) {
listener.onConnectionClosed(nodeChannels);
}
}

@Override
public void onResponseReceived(long requestId, ResponseContext holder) {
for (TransportConnectionListener listener : listeners) {
for (TransportMessageListener listener : listeners) {
listener.onResponseReceived(requestId, holder);
}
}
Expand Down
15 changes: 3 additions & 12 deletions server/src/main/java/org/elasticsearch/transport/Transport.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,9 @@ public interface Transport extends LifecycleComponent {
*/
RequestHandlerRegistry getRequestHandler(String action);

/**
* Adds a new event listener
* @param listener the listener to add
*/
void addConnectionListener(TransportConnectionListener listener);
void addMessageListener(TransportMessageListener listener);

/**
* Removes an event listener
* @param listener the listener to remove
* @return <code>true</code> iff the listener was removed otherwise <code>false</code>
*/
boolean removeConnectionListener(TransportConnectionListener listener);
boolean removeMessageListener(TransportMessageListener listener);

/**
* The address the transport is bound on.
Expand Down Expand Up @@ -254,7 +245,7 @@ public List<ResponseContext> prune(Predicate<ResponseContext> predicate) {
* sent request (before any processing or deserialization was done). Returns the appropriate response handler or null if not
* found.
*/
public TransportResponseHandler onResponseReceived(final long requestId, TransportConnectionListener listener) {
public TransportResponseHandler onResponseReceived(final long requestId, TransportMessageListener listener) {
ResponseContext context = handlers.remove(requestId);
listener.onResponseReceived(requestId, context);
if (context == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,42 +28,6 @@
*/
public interface TransportConnectionListener {

/**
* Called once a request is received
* @param requestId the internal request ID
* @param action the request action
*
*/
default void onRequestReceived(long requestId, String action) {}

/**
* Called for every action response sent after the response has been passed to the underlying network implementation.
* @param requestId the request ID (unique per client)
* @param action the request action
* @param response the response send
* @param finalOptions the response options
*/
default void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions finalOptions) {}

/***
* Called for every failed action response after the response has been passed to the underlying network implementation.
* @param requestId the request ID (unique per client)
* @param action the request action
* @param error the error sent back to the caller
*/
default void onResponseSent(long requestId, String action, Exception error) {}

/**
* Called for every request sent to a server after the request has been passed to the underlying network implementation
* @param node the node the request was sent to
* @param requestId the internal request id
* @param action the action name
* @param request the actual request
* @param finalOptions the request options
*/
default void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request,
TransportRequestOptions finalOptions) {}

/**
* Called once a connection was opened
* @param connection the connection
Expand All @@ -76,13 +40,6 @@ default void onConnectionOpened(Transport.Connection connection) {}
*/
default void onConnectionClosed(Transport.Connection connection) {}

/**
* Called for every response received
* @param requestId the request id for this reponse
* @param context the response context or null if the context was already processed ie. due to a timeout.
*/
default void onResponseReceived(long requestId, Transport.ResponseContext context) {}

/**
* Called once a node connection is opened and registered.
*/
Expand Down
Loading