From ed8ef79d2b7fa8e29971a31635fa6017fef8ff9c Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 15 Aug 2018 16:51:47 -0600 Subject: [PATCH 1/8] WIP --- .../elasticsearch/transport/TcpTransport.java | 50 +++++++++----- .../elasticsearch/transport/Transport.java | 11 ++- .../TransportConnectionListener.java | 36 ---------- .../transport/TransportMessageListener.java | 67 +++++++++++++++++++ .../transport/TransportService.java | 4 +- .../transport/FailAndRetryMockTransport.java | 16 ++++- .../test/transport/CapturingTransport.java | 25 ++++++- 7 files changed, 147 insertions(+), 62 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/transport/TransportMessageListener.java diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 0b82417cfaa04..8f713ec0ac176 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -185,6 +185,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements protected final Set profileSettings; private final DelegatingTransportConnectionListener transportListener = new DelegatingTransportConnectionListener(); + private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener(); private final ConcurrentMap profileBoundAddresses = newConcurrentMap(); private final Map> serverChannels = newConcurrentMap(); @@ -248,6 +249,14 @@ public TcpTransport(String transportName, Settings settings, ThreadPool threadPo protected void doStart() { } + public void addMessageListener(TransportMessageListener listener) { + messageListener.listeners.add(listener); + } + + public boolean removeMessageListener(TransportMessageListener listener) { + return messageListener.listeners.remove(listener); + } + @Override public void addConnectionListener(TransportConnectionListener listener) { transportListener.listeners.add(listener); @@ -907,7 +916,7 @@ private void sendRequestToChannel(final DiscoveryNode node, final TcpChannel cha final TransportRequestOptions finalOptions = options; // this might be called in a different thread SendListener onRequestSent = new SendListener(channel, stream, - () -> transportListener.onRequestSent(node, requestId, action, request, finalOptions), message.length()); + () -> messageListener.onRequestSent(node, requestId, action, request, finalOptions), message.length()); internalSendMessage(channel, message, onRequestSent); addedReleaseListener = true; } finally { @@ -961,7 +970,7 @@ public void sendErrorResponse( final BytesReference header = buildHeader(requestId, status, nodeVersion, bytes.length()); CompositeBytesReference message = new CompositeBytesReference(header, bytes); SendListener onResponseSent = new SendListener(channel, null, - () -> transportListener.onResponseSent(requestId, action, error), message.length()); + () -> messageListener.onResponseSent(requestId, action, error), message.length()); internalSendMessage(channel, message, onResponseSent); } } @@ -1010,7 +1019,7 @@ private void sendResponse( final TransportResponseOptions finalOptions = options; // this might be called in a different thread SendListener listener = new SendListener(channel, stream, - () -> transportListener.onResponseSent(requestId, action, response, finalOptions), message.length()); + () -> messageListener.onResponseSent(requestId, action, response, finalOptions), message.length()); internalSendMessage(channel, message, listener); addedReleaseListener = true; } finally { @@ -1266,7 +1275,7 @@ public final void messageReceived(BytesReference reference, TcpChannel channel) if (isHandshake) { handler = pendingHandshakes.remove(requestId); } else { - TransportResponseHandler theHandler = responseHandlers.onResponseReceived(requestId, transportListener); + TransportResponseHandler theHandler = responseHandlers.onResponseReceived(requestId, messageListener); if (theHandler == null && TransportStatus.isError(status)) { handler = pendingHandshakes.remove(requestId); } else { @@ -1373,7 +1382,7 @@ protected String handleRequest(TcpChannel channel, String profileName, final Str features = Collections.emptySet(); } final String action = stream.readString(); - transportListener.onRequestReceived(requestId, action); + messageListener.onRequestReceived(requestId, action); TransportChannel transportChannel = null; try { if (TransportStatus.isHandshake(status)) { @@ -1682,26 +1691,27 @@ public ProfileSettings(Settings settings, String profileName) { } } - private static final class DelegatingTransportConnectionListener implements TransportConnectionListener { - private final List listeners = new CopyOnWriteArrayList<>(); + private static final class DelegatingTransportMessageListener implements TransportMessageListener { + + private final List listeners = new CopyOnWriteArrayList<>(); @Override public void onRequestReceived(long requestId, String action) { - for (TransportConnectionListener listener : listeners) { + for (TransportMessageListener listener : listeners) { listener.onRequestReceived(requestId, action); } } @Override public void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions finalOptions) { - for (TransportConnectionListener listener : listeners) { + for (TransportMessageListener listener : listeners) { listener.onResponseSent(requestId, action, response, finalOptions); } } @Override public void onResponseSent(long requestId, String action, Exception error) { - for (TransportConnectionListener listener : listeners) { + for (TransportMessageListener listener : listeners) { listener.onResponseSent(requestId, action, error); } } @@ -1709,11 +1719,22 @@ public void onResponseSent(long requestId, String action, Exception error) { @Override public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions finalOptions) { - for (TransportConnectionListener listener : listeners) { + for (TransportMessageListener listener : listeners) { listener.onRequestSent(node, requestId, action, request, finalOptions); } } + @Override + public void onResponseReceived(long requestId, ResponseContext holder) { + for (TransportMessageListener listener : listeners) { + listener.onResponseReceived(requestId, holder); + } + } + } + + private static final class DelegatingTransportConnectionListener implements TransportConnectionListener { + private final List listeners = new CopyOnWriteArrayList<>(); + @Override public void onNodeDisconnected(DiscoveryNode key) { for (TransportConnectionListener listener : listeners) { @@ -1741,13 +1762,6 @@ public void onConnectionClosed(Connection nodeChannels) { listener.onConnectionClosed(nodeChannels); } } - - @Override - public void onResponseReceived(long requestId, ResponseContext holder) { - for (TransportConnectionListener listener : listeners) { - listener.onResponseReceived(requestId, holder); - } - } } @Override diff --git a/server/src/main/java/org/elasticsearch/transport/Transport.java b/server/src/main/java/org/elasticsearch/transport/Transport.java index 9538119f43b8a..a1267a1f8f7de 100644 --- a/server/src/main/java/org/elasticsearch/transport/Transport.java +++ b/server/src/main/java/org/elasticsearch/transport/Transport.java @@ -56,6 +56,15 @@ public interface Transport extends LifecycleComponent { */ RequestHandlerRegistry getRequestHandler(String action); + default void addMessageListener(TransportMessageListener listener) { + // TODO: Not default + } + + default boolean removeMessageListener(TransportMessageListener listener) { + // TODO: Not default + return false; + } + /** * Adds a new event listener * @param listener the listener to add @@ -254,7 +263,7 @@ public List prune(Predicate predicate) { * sent request (before any processing or deserialization was done). Returns the appropriate response handler or null if not * found. */ - public TransportResponseHandler onResponseReceived(final long requestId, TransportConnectionListener listener) { + public TransportResponseHandler onResponseReceived(final long requestId, TransportMessageListener listener) { ResponseContext context = handlers.remove(requestId); listener.onResponseReceived(requestId, context); if (context == null) { diff --git a/server/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java b/server/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java index 0ee2ed5828d44..bd4c3405158db 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java @@ -28,42 +28,6 @@ */ public interface TransportConnectionListener { - /** - * Called once a request is received - * @param requestId the internal request ID - * @param action the request action - * - */ - default void onRequestReceived(long requestId, String action) {} - - /** - * Called for every action response sent after the response has been passed to the underlying network implementation. - * @param requestId the request ID (unique per client) - * @param action the request action - * @param response the response send - * @param finalOptions the response options - */ - default void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions finalOptions) {} - - /*** - * Called for every failed action response after the response has been passed to the underlying network implementation. - * @param requestId the request ID (unique per client) - * @param action the request action - * @param error the error sent back to the caller - */ - default void onResponseSent(long requestId, String action, Exception error) {} - - /** - * Called for every request sent to a server after the request has been passed to the underlying network implementation - * @param node the node the request was sent to - * @param requestId the internal request id - * @param action the action name - * @param request the actual request - * @param finalOptions the request options - */ - default void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request, - TransportRequestOptions finalOptions) {} - /** * Called once a connection was opened * @param connection the connection diff --git a/server/src/main/java/org/elasticsearch/transport/TransportMessageListener.java b/server/src/main/java/org/elasticsearch/transport/TransportMessageListener.java new file mode 100644 index 0000000000000..a872c761b36d0 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/TransportMessageListener.java @@ -0,0 +1,67 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.transport; + +import org.elasticsearch.cluster.node.DiscoveryNode; + +public interface TransportMessageListener { + + /** + * Called once a request is received + * @param requestId the internal request ID + * @param action the request action + * + */ + default void onRequestReceived(long requestId, String action) {} + + /** + * Called for every action response sent after the response has been passed to the underlying network implementation. + * @param requestId the request ID (unique per client) + * @param action the request action + * @param response the response send + * @param finalOptions the response options + */ + default void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions finalOptions) {} + + /*** + * Called for every failed action response after the response has been passed to the underlying network implementation. + * @param requestId the request ID (unique per client) + * @param action the request action + * @param error the error sent back to the caller + */ + default void onResponseSent(long requestId, String action, Exception error) {} + + /** + * Called for every request sent to a server after the request has been passed to the underlying network implementation + * @param node the node the request was sent to + * @param requestId the internal request id + * @param action the action name + * @param request the actual request + * @param finalOptions the request options + */ + default void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request, + TransportRequestOptions finalOptions) {} + + /** + * Called for every response received + * @param requestId the request id for this reponse + * @param context the response context or null if the context was already processed ie. due to a timeout. + */ + default void onResponseReceived(long requestId, Transport.ResponseContext context) {} +} diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 8eca6504b70be..c9885422fb8dd 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -77,7 +77,7 @@ import static org.elasticsearch.common.settings.Setting.listSetting; import static org.elasticsearch.common.settings.Setting.timeSetting; -public class TransportService extends AbstractLifecycleComponent implements TransportConnectionListener { +public class TransportService extends AbstractLifecycleComponent implements TransportMessageListener, TransportConnectionListener { public static final Setting CONNECTIONS_PER_NODE_RECOVERY = intSetting("transport.connections_per_node.recovery", 2, 1, Setting.Property.NodeScope); @@ -248,7 +248,7 @@ void setTracerLogExclude(List tracerLogExclude) { @Override protected void doStart() { - transport.addConnectionListener(this); + transport.addMessageListener(this); transport.start(); if (transport.boundAddress() != null && logger.isInfoEnabled()) { logger.info("{}", transport.boundAddress()); diff --git a/server/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java b/server/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java index 513f07b733cda..2909373f45b03 100644 --- a/server/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java +++ b/server/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java @@ -40,6 +40,7 @@ import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportMessageListener; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; @@ -62,7 +63,7 @@ abstract class FailAndRetryMockTransport imp private volatile Map requestHandlers = Collections.emptyMap(); private final Object requestHandlerMutex = new Object(); private final ResponseHandlers responseHandlers = new ResponseHandlers(); - private TransportConnectionListener listener; + private TransportMessageListener listener; private boolean connectMode = true; @@ -223,11 +224,22 @@ public RequestHandlerRegistry getRequestHandler(String action) { return requestHandlers.get(action); } + @Override - public void addConnectionListener(TransportConnectionListener listener) { + public void addMessageListener(TransportMessageListener listener) { this.listener = listener; } + @Override + public boolean removeMessageListener(TransportMessageListener listener) { + throw new UnsupportedOperationException(); + } + + @Override + public void addConnectionListener(TransportConnectionListener listener) { + // TODO: Remove + } + @Override public boolean removeConnectionListener(TransportConnectionListener listener) { throw new UnsupportedOperationException(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java index a7d3e85d3009d..634f22e57915b 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java @@ -44,6 +44,7 @@ import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportInterceptor; +import org.elasticsearch.transport.TransportMessageListener; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; @@ -72,7 +73,7 @@ public class CapturingTransport implements Transport { private volatile Map requestHandlers = Collections.emptyMap(); final Object requestHandlerMutex = new Object(); private final ResponseHandlers responseHandlers = new ResponseHandlers(); - private TransportConnectionListener listener; + private TransportMessageListener listener; public static class CapturedRequest { public final DiscoveryNode node; @@ -341,7 +342,7 @@ public RequestHandlerRegistry getRequestHandler(String action) { } @Override - public void addConnectionListener(TransportConnectionListener listener) { + public void addMessageListener(TransportMessageListener listener) { if (this.listener != null) { throw new IllegalStateException("listener already set"); } @@ -349,11 +350,29 @@ public void addConnectionListener(TransportConnectionListener listener) { } @Override - public boolean removeConnectionListener(TransportConnectionListener listener) { + public boolean removeMessageListener(TransportMessageListener listener) { if (listener == this.listener) { this.listener = null; return true; } return false; } + + @Override + public void addConnectionListener(TransportConnectionListener listener) { +// if (this.listener != null) { +// throw new IllegalStateException("listener already set"); +// } +// this.listener = listener; + } + + @Override + public boolean removeConnectionListener(TransportConnectionListener listener) { +// if (listener == this.listener) { +// this.listener = null; +// return true; +// } +// return false; + return false; + } } From fa8060b35824ebb4aa944e60d86ccef2be1798d6 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 16 Aug 2018 16:40:42 -0600 Subject: [PATCH 2/8] Remove connection listener from transport --- .../transport/ConnectionManager.java | 24 ++++++++++ .../elasticsearch/transport/TcpTransport.java | 48 ------------------- .../elasticsearch/transport/Transport.java | 13 ----- .../TransportConnectionListener.java | 7 --- .../transport/TransportService.java | 2 - .../transport/FailAndRetryMockTransport.java | 10 ---- .../cluster/NodeConnectionsServiceTests.java | 10 ---- .../test/transport/CapturingTransport.java | 18 ------- .../test/transport/StubbableTransport.java | 11 ----- .../AbstractSimpleTransportTestCase.java | 2 +- 10 files changed, 25 insertions(+), 120 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java index 1ff8b701a83e1..c659e283d9647 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java @@ -227,6 +227,16 @@ public void close() { } } + private Transport.Connection internalOpenConnection(DiscoveryNode node, ConnectionProfile connectionProfile) { + Transport.Connection connection = transport.openConnection(node, ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile)); + try { + connectionListener.onConnectionOpened(connection); + } finally { + connection.addCloseListener(ActionListener.wrap(() -> connectionListener.onConnectionClosed(connection))); + } + return connection; + } + private void ensureOpen() { if (lifecycle.started() == false) { throw new IllegalStateException("connection manager is closed"); @@ -289,6 +299,20 @@ public void onNodeConnected(DiscoveryNode node) { listener.onNodeConnected(node); } } + + @Override + public void onConnectionOpened(Transport.Connection connection) { + for (TransportConnectionListener listener : listeners) { + listener.onConnectionOpened(connection); + } + } + + @Override + public void onConnectionClosed(Transport.Connection connection) { + for (TransportConnectionListener listener : listeners) { + listener.onConnectionClosed(connection); + } + } } static ConnectionProfile buildDefaultConnectionProfile(Settings settings) { diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 8f713ec0ac176..81ae687b9071b 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -184,7 +184,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements protected final NetworkService networkService; protected final Set profileSettings; - private final DelegatingTransportConnectionListener transportListener = new DelegatingTransportConnectionListener(); private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener(); private final ConcurrentMap profileBoundAddresses = newConcurrentMap(); @@ -257,16 +256,6 @@ public boolean removeMessageListener(TransportMessageListener listener) { return messageListener.listeners.remove(listener); } - @Override - public void addConnectionListener(TransportConnectionListener listener) { - transportListener.listeners.add(listener); - } - - @Override - public boolean removeConnectionListener(TransportConnectionListener listener) { - return transportListener.listeners.remove(listener); - } - @Override public CircuitBreaker getInFlightRequestBreaker() { // We always obtain a fresh breaker to reflect changes to the breaker configuration. @@ -490,11 +479,6 @@ public NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connect // underlying channels. nodeChannels = new NodeChannels(node, channels, connectionProfile, version); final NodeChannels finalNodeChannels = nodeChannels; - try { - transportListener.onConnectionOpened(nodeChannels); - } finally { - nodeChannels.addCloseListener(ActionListener.wrap(() -> transportListener.onConnectionClosed(finalNodeChannels))); - } Consumer onClose = c -> { assert c.isOpen() == false : "channel is still open when onClose is called"; @@ -1732,38 +1716,6 @@ public void onResponseReceived(long requestId, ResponseContext holder) { } } - private static final class DelegatingTransportConnectionListener implements TransportConnectionListener { - private final List listeners = new CopyOnWriteArrayList<>(); - - @Override - public void onNodeDisconnected(DiscoveryNode key) { - for (TransportConnectionListener listener : listeners) { - listener.onNodeDisconnected(key); - } - } - - @Override - public void onConnectionOpened(Connection nodeChannels) { - for (TransportConnectionListener listener : listeners) { - listener.onConnectionOpened(nodeChannels); - } - } - - @Override - public void onNodeConnected(DiscoveryNode node) { - for (TransportConnectionListener listener : listeners) { - listener.onNodeConnected(node); - } - } - - @Override - public void onConnectionClosed(Connection nodeChannels) { - for (TransportConnectionListener listener : listeners) { - listener.onConnectionClosed(nodeChannels); - } - } - } - @Override public final ResponseHandlers getResponseHandlers() { return responseHandlers; diff --git a/server/src/main/java/org/elasticsearch/transport/Transport.java b/server/src/main/java/org/elasticsearch/transport/Transport.java index a1267a1f8f7de..90eff2f4df509 100644 --- a/server/src/main/java/org/elasticsearch/transport/Transport.java +++ b/server/src/main/java/org/elasticsearch/transport/Transport.java @@ -65,19 +65,6 @@ default boolean removeMessageListener(TransportMessageListener listener) { return false; } - /** - * Adds a new event listener - * @param listener the listener to add - */ - void addConnectionListener(TransportConnectionListener listener); - - /** - * Removes an event listener - * @param listener the listener to remove - * @return true iff the listener was removed otherwise false - */ - boolean removeConnectionListener(TransportConnectionListener listener); - /** * The address the transport is bound on. */ diff --git a/server/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java b/server/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java index bd4c3405158db..c41a328637c22 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java @@ -40,13 +40,6 @@ default void onConnectionOpened(Transport.Connection connection) {} */ default void onConnectionClosed(Transport.Connection connection) {} - /** - * Called for every response received - * @param requestId the request id for this reponse - * @param context the response context or null if the context was already processed ie. due to a timeout. - */ - default void onResponseReceived(long requestId, Transport.ResponseContext context) {} - /** * Called once a node connection is opened and registered. */ diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index c9885422fb8dd..9007406181227 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -506,12 +506,10 @@ public void disconnectFromNode(DiscoveryNode node) { } public void addConnectionListener(TransportConnectionListener listener) { - transport.addConnectionListener(listener); connectionManager.addListener(listener); } public void removeConnectionListener(TransportConnectionListener listener) { - transport.removeConnectionListener(listener); connectionManager.removeListener(listener); } diff --git a/server/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java b/server/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java index 2909373f45b03..e9ff4048bfe15 100644 --- a/server/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java +++ b/server/src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java @@ -38,7 +38,6 @@ import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.RequestHandlerRegistry; import org.elasticsearch.transport.Transport; -import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportMessageListener; import org.elasticsearch.transport.TransportRequest; @@ -235,13 +234,4 @@ public boolean removeMessageListener(TransportMessageListener listener) { throw new UnsupportedOperationException(); } - @Override - public void addConnectionListener(TransportConnectionListener listener) { - // TODO: Remove - } - - @Override - public boolean removeConnectionListener(TransportConnectionListener listener) { - throw new UnsupportedOperationException(); - } } diff --git a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index 92c1016d3e615..c7471a5f4aa71 100644 --- a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -200,16 +200,6 @@ public RequestHandlerRegistry getRequestHandler(String action) { return null; } - @Override - public void addConnectionListener(TransportConnectionListener listener) { - this.listener = listener; - } - - @Override - public boolean removeConnectionListener(TransportConnectionListener listener) { - throw new UnsupportedOperationException(); - } - @Override public BoundTransportAddress boundAddress() { return null; diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java index 634f22e57915b..60133a16a10af 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java @@ -41,7 +41,6 @@ import org.elasticsearch.transport.RequestHandlerRegistry; import org.elasticsearch.transport.SendRequestTransportException; import org.elasticsearch.transport.Transport; -import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportInterceptor; import org.elasticsearch.transport.TransportMessageListener; @@ -358,21 +357,4 @@ public boolean removeMessageListener(TransportMessageListener listener) { return false; } - @Override - public void addConnectionListener(TransportConnectionListener listener) { -// if (this.listener != null) { -// throw new IllegalStateException("listener already set"); -// } -// this.listener = listener; - } - - @Override - public boolean removeConnectionListener(TransportConnectionListener listener) { -// if (listener == this.listener) { -// this.listener = null; -// return true; -// } -// return false; - return false; - } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java index 5a0dd3b7f6d53..5962997fc89f5 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java @@ -29,7 +29,6 @@ import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.RequestHandlerRegistry; import org.elasticsearch.transport.Transport; -import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; @@ -85,16 +84,6 @@ Transport getDelegate() { return delegate; } - @Override - public void addConnectionListener(TransportConnectionListener listener) { - delegate.addConnectionListener(listener); - } - - @Override - public boolean removeConnectionListener(TransportConnectionListener listener) { - return delegate.removeConnectionListener(listener); - } - @Override public void registerRequestHandler(RequestHandlerRegistry reg) { delegate.registerRequestHandler(reg); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 29997b16ba071..3db864a3db324 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -2642,7 +2642,7 @@ public void testProfilesIncludesDefault() { public void testChannelCloseWhileConnecting() { try (MockTransportService service = build(Settings.builder().put("name", "close").build(), version0, null, true)) { - service.transport.addConnectionListener(new TransportConnectionListener() { + service.addConnectionListener(new TransportConnectionListener() { @Override public void onConnectionOpened(final Transport.Connection connection) { try { From f1a46c5319e0e9f17d39e317a9f515b9e2b7f111 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 17 Aug 2018 10:23:03 -0600 Subject: [PATCH 3/8] Add methods --- .../org/elasticsearch/transport/Transport.java | 9 ++------- .../cluster/NodeConnectionsServiceTests.java | 17 ++++++++++++----- .../test/transport/StubbableTransport.java | 11 +++++++++++ 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/Transport.java b/server/src/main/java/org/elasticsearch/transport/Transport.java index 90eff2f4df509..90adf2ab9e7d4 100644 --- a/server/src/main/java/org/elasticsearch/transport/Transport.java +++ b/server/src/main/java/org/elasticsearch/transport/Transport.java @@ -56,14 +56,9 @@ public interface Transport extends LifecycleComponent { */ RequestHandlerRegistry getRequestHandler(String action); - default void addMessageListener(TransportMessageListener listener) { - // TODO: Not default - } + void addMessageListener(TransportMessageListener listener); - default boolean removeMessageListener(TransportMessageListener listener) { - // TODO: Not default - return false; - } + boolean removeMessageListener(TransportMessageListener listener); /** * The address the transport is bound on. diff --git a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index c7471a5f4aa71..473f5152e8fd7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -37,9 +37,9 @@ import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.RequestHandlerRegistry; import org.elasticsearch.transport.Transport; -import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportInterceptor; +import org.elasticsearch.transport.TransportMessageListener; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; @@ -107,7 +107,6 @@ public void testConnectAndDisconnect() { assertConnectedExactlyToNodes(event.state()); } - public void testReconnect() { List nodes = generateNodes(); NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService); @@ -188,7 +187,7 @@ public HandshakeResponse handshake(Transport.Connection connection, long timeout private final class MockTransport implements Transport { private ResponseHandlers responseHandlers = new ResponseHandlers(); private volatile boolean randomConnectionExceptions = false; - private TransportConnectionListener listener = new TransportConnectionListener() { + private TransportMessageListener listener = new TransportMessageListener() { }; @Override @@ -200,6 +199,16 @@ public RequestHandlerRegistry getRequestHandler(String action) { return null; } + @Override + public void addMessageListener(TransportMessageListener listener) { + this.listener = listener; + } + + @Override + public boolean removeMessageListener(TransportMessageListener listener) { + throw new UnsupportedOperationException(); + } + @Override public BoundTransportAddress boundAddress() { return null; @@ -221,7 +230,6 @@ public Connection openConnection(DiscoveryNode node, ConnectionProfile connectio if (randomConnectionExceptions && randomBoolean()) { throw new ConnectTransportException(node, "simulated"); } - listener.onNodeConnected(node); } Connection connection = new Connection() { @Override @@ -250,7 +258,6 @@ public boolean isClosed() { return false; } }; - listener.onConnectionOpened(connection); return connection; } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java index 5962997fc89f5..59b2a5f6dd6c4 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java @@ -30,6 +30,7 @@ import org.elasticsearch.transport.RequestHandlerRegistry; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportMessageListener; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportStats; @@ -84,6 +85,16 @@ Transport getDelegate() { return delegate; } + @Override + public void addMessageListener(TransportMessageListener listener) { + delegate.addMessageListener(listener); + } + + @Override + public boolean removeMessageListener(TransportMessageListener listener) { + return delegate.removeMessageListener(listener); + } + @Override public void registerRequestHandler(RequestHandlerRegistry reg) { delegate.registerRequestHandler(reg); From d2ac1480a4f688d5a61a41f3d40c793e6efcc4f2 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 17 Aug 2018 10:30:24 -0600 Subject: [PATCH 4/8] Fix checkstyle --- .../org/elasticsearch/transport/ConnectionManager.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java index c659e283d9647..2ac5a352f2f7b 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java @@ -91,7 +91,8 @@ public void removeListener(TransportConnectionListener listener) { } public Transport.Connection openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) { - return transport.openConnection(node, ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile)); + ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile); + return internalOpenConnection(node, resolvedProfile); } /** @@ -115,7 +116,7 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil } boolean success = false; try { - connection = transport.openConnection(node, resolvedProfile); + connection = internalOpenConnection(node, resolvedProfile); connectionValidator.accept(connection, resolvedProfile); // we acquire a connection lock, so no way there is an existing connection connectedNodes.put(node, connection); @@ -228,7 +229,7 @@ public void close() { } private Transport.Connection internalOpenConnection(DiscoveryNode node, ConnectionProfile connectionProfile) { - Transport.Connection connection = transport.openConnection(node, ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile)); + Transport.Connection connection = transport.openConnection(node, connectionProfile); try { connectionListener.onConnectionOpened(connection); } finally { From f9d9cc98bb14b9440e7ba71edcf2b3f9612e2478 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 17 Aug 2018 11:24:45 -0600 Subject: [PATCH 5/8] Fix tests --- .../netty4/SimpleNetty4TransportTests.java | 8 -------- .../transport/nio/SimpleNioTransportTests.java | 7 ------- .../elasticsearch/transport/ConnectionManager.java | 3 +++ .../org/elasticsearch/transport/TcpTransport.java | 8 -------- .../elasticsearch/transport/TransportService.java | 1 + .../test/transport/StubbableTransport.java | 6 +++++- .../transport/AbstractSimpleTransportTestCase.java | 14 +++++++++++++- .../transport/MockTcpTransportTests.java | 10 ---------- .../transport/nio/SimpleMockNioTransportTests.java | 7 ------- .../nio/SimpleSecurityNioTransportTests.java | 14 +++----------- 10 files changed, 25 insertions(+), 53 deletions(-) diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java index 8d628ace2ee38..9d6f016086c26 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java @@ -22,7 +22,6 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; @@ -87,13 +86,6 @@ protected MockTransportService build(Settings settings, Version version, Cluster return transportService; } - @Override - protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException { - final Netty4Transport t = (Netty4Transport) transport; - final TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) connection; - CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true); - } - public void testConnectException() throws UnknownHostException { try { serviceA.connectToNode(new DiscoveryNode("C", new TransportAddress(InetAddress.getByName("localhost"), 9876), diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java index 9322bfd71222a..baae00f81a3be 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java @@ -22,7 +22,6 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; @@ -93,12 +92,6 @@ protected MockTransportService build(Settings settings, Version version, Cluster return transportService; } - @Override - protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException { - TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) connection; - CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true); - } - public void testConnectException() throws UnknownHostException { try { serviceA.connectToNode(new DiscoveryNode("C", new TransportAddress(InetAddress.getByName("localhost"), 9876), diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java index 2ac5a352f2f7b..84c337399d5b4 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java @@ -235,6 +235,9 @@ private Transport.Connection internalOpenConnection(DiscoveryNode node, Connecti } finally { connection.addCloseListener(ActionListener.wrap(() -> connectionListener.onConnectionClosed(connection))); } + if (connection.isClosed()) { + throw new ConnectTransportException(node, "a channel closed while connecting"); + } return connection; } diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 81ae687b9071b..6d4ab80a89282 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -342,10 +342,6 @@ public TcpChannel channel(TransportRequestOptions.Type type) { return connectionTypeHandle.getChannel(channels); } - boolean allChannelsOpen() { - return channels.stream().allMatch(TcpChannel::isOpen); - } - @Override public boolean sendPing() { for (TcpChannel channel : channels) { @@ -486,10 +482,6 @@ public NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connect }; nodeChannels.channels.forEach(ch -> ch.addCloseListener(ActionListener.wrap(() -> onClose.accept(ch)))); - - if (nodeChannels.allChannelsOpen() == false) { - throw new ConnectTransportException(node, "a channel closed while connecting"); - } success = true; return nodeChannels; } catch (ConnectTransportException e) { diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 9007406181227..fb14ae96dbf20 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -249,6 +249,7 @@ void setTracerLogExclude(List tracerLogExclude) { @Override protected void doStart() { transport.addMessageListener(this); + connectionManager.addListener(this); transport.start(); if (transport.boundAddress() != null && logger.isInfoEnabled()) { logger.info("{}", transport.boundAddress()); diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java index 59b2a5f6dd6c4..2e78f8a9a4f04 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java @@ -179,7 +179,7 @@ public Map profileBoundAddresses() { return delegate.profileBoundAddresses(); } - private class WrappedConnection implements Transport.Connection { + public class WrappedConnection implements Transport.Connection { private final Transport.Connection connection; @@ -234,6 +234,10 @@ public Object getCacheKey() { public void close() { connection.close(); } + + public Transport.Connection getConnection() { + return connection; + } } @FunctionalInterface diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 3db864a3db324..44d77ef6ba789 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkUtils; import org.elasticsearch.common.settings.ClusterSettings; @@ -52,6 +53,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.test.transport.StubbableTransport; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; @@ -2642,6 +2644,7 @@ public void testProfilesIncludesDefault() { public void testChannelCloseWhileConnecting() { try (MockTransportService service = build(Settings.builder().put("name", "close").build(), version0, null, true)) { + AtomicBoolean connectionClosedListenerCalled = new AtomicBoolean(false); service.addConnectionListener(new TransportConnectionListener() { @Override public void onConnectionOpened(final Transport.Connection connection) { @@ -2651,6 +2654,11 @@ public void onConnectionOpened(final Transport.Connection connection) { throw new AssertionError(e); } } + + @Override + public void onConnectionClosed(Transport.Connection connection) { + connectionClosedListenerCalled.set(true); + } }); final ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); builder.addConnections(1, @@ -2662,10 +2670,14 @@ public void onConnectionOpened(final Transport.Connection connection) { final ConnectTransportException e = expectThrows(ConnectTransportException.class, () -> service.openConnection(nodeA, builder.build())); assertThat(e, hasToString(containsString(("a channel closed while connecting")))); + assertTrue(connectionClosedListenerCalled.get()); } } - protected abstract void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException; + private void closeConnectionChannel(Transport transport, Transport.Connection connection) { + final TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) ((StubbableTransport.WrappedConnection) connection).getConnection(); + CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true); + } @SuppressForbidden(reason = "need local ephemeral port") private InetSocketAddress getLocalEphemeral() throws UnknownHostException { diff --git a/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java b/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java index 6d1e5116474ff..42658b1d9a60f 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java @@ -21,7 +21,6 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; @@ -60,13 +59,4 @@ protected Version executeHandshake(DiscoveryNode node, TcpChannel mockChannel, T public int channelsPerNodeConnection() { return 1; } - - @Override - protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException { - final MockTcpTransport t = (MockTcpTransport) transport; - final TcpTransport.NodeChannels channels = - (TcpTransport.NodeChannels) connection; - CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true); - } - } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java index 3a78f366bd87f..8b3e7dce367d8 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java @@ -22,7 +22,6 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; @@ -99,12 +98,6 @@ protected int channelsPerNodeConnection() { return 3; } - @Override - protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException { - TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) connection; - CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true); - } - public void testConnectException() throws UnknownHostException { try { serviceA.connectToNode(new DiscoveryNode("C", new TransportAddress(InetAddress.getByName("localhost"), 9876), diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java index 70ab085fcf72b..7397ebc8c7dc4 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java @@ -10,7 +10,6 @@ import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; -import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.MockSecureSettings; @@ -35,6 +34,9 @@ import org.elasticsearch.xpack.core.ssl.SSLConfiguration; import org.elasticsearch.xpack.core.ssl.SSLService; +import javax.net.SocketFactory; +import javax.net.ssl.HandshakeCompletedListener; +import javax.net.ssl.SSLSocket; import java.io.IOException; import java.net.InetAddress; import java.net.SocketTimeoutException; @@ -44,10 +46,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; -import javax.net.SocketFactory; -import javax.net.ssl.HandshakeCompletedListener; -import javax.net.ssl.SSLSocket; - import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.containsString; @@ -118,12 +116,6 @@ protected MockTransportService build(Settings settings, Version version, Cluster return transportService; } - @Override - protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException { - TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) connection; - CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true); - } - public void testConnectException() throws UnknownHostException { try { serviceA.connectToNode(new DiscoveryNode("C", new TransportAddress(InetAddress.getByName("localhost"), 9876), From c57543b9165415abbcdd8bf970f8f427148ae25e Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 17 Aug 2018 11:25:27 -0600 Subject: [PATCH 6/8] Fix issiues --- .../transport/AbstractSimpleTransportTestCase.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 44d77ef6ba789..fb49a3739bd65 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -2648,11 +2648,7 @@ public void testChannelCloseWhileConnecting() { service.addConnectionListener(new TransportConnectionListener() { @Override public void onConnectionOpened(final Transport.Connection connection) { - try { - closeConnectionChannel(service.getOriginalTransport(), connection); - } catch (final IOException e) { - throw new AssertionError(e); - } + closeConnectionChannel(service.getOriginalTransport(), connection); } @Override From f319124e09fc8e017205822aae0cde8c0ad83edf Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 17 Aug 2018 11:43:11 -0600 Subject: [PATCH 7/8] Fix checkstyle --- .../transport/AbstractSimpleTransportTestCase.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index fb49a3739bd65..c7b588b9dad25 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -2648,7 +2648,7 @@ public void testChannelCloseWhileConnecting() { service.addConnectionListener(new TransportConnectionListener() { @Override public void onConnectionOpened(final Transport.Connection connection) { - closeConnectionChannel(service.getOriginalTransport(), connection); + closeConnectionChannel(connection); } @Override @@ -2670,8 +2670,9 @@ public void onConnectionClosed(Transport.Connection connection) { } } - private void closeConnectionChannel(Transport transport, Transport.Connection connection) { - final TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) ((StubbableTransport.WrappedConnection) connection).getConnection(); + private void closeConnectionChannel(Transport.Connection connection) { + StubbableTransport.WrappedConnection wrappedConnection = (StubbableTransport.WrappedConnection) connection; + TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) wrappedConnection.getConnection(); CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true); } From 6b629bf80cba484fde0bfb4b2d48d3abe3c23855 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 17 Aug 2018 12:08:03 -0600 Subject: [PATCH 8/8] Fix issue --- .../transport/AbstractSimpleTransportTestCase.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index c7b588b9dad25..4e59aaecf8de2 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -2649,6 +2649,11 @@ public void testChannelCloseWhileConnecting() { @Override public void onConnectionOpened(final Transport.Connection connection) { closeConnectionChannel(connection); + try { + assertBusy(connection::isClosed); + } catch (Exception e) { + throw new AssertionError(e); + } } @Override