diff --git a/core/src/main/java/org/elasticsearch/transport/Transports.java b/core/src/main/java/org/elasticsearch/transport/Transports.java index c7a0fe4d4f57f..d07846835c23f 100644 --- a/core/src/main/java/org/elasticsearch/transport/Transports.java +++ b/core/src/main/java/org/elasticsearch/transport/Transports.java @@ -29,6 +29,9 @@ public enum Transports { /** threads whose name is prefixed by this string will be considered network threads, even though they aren't */ public static final String TEST_MOCK_TRANSPORT_THREAD_PREFIX = "__mock_network_thread"; + public static final String NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX = "es_nio_transport_worker"; + public static final String NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = "es_nio_transport_acceptor"; + /** * Utility method to detect whether a thread is a network thread. Typically * used in assertions to make sure that we do not call blocking code from @@ -40,7 +43,9 @@ public static final boolean isTransportThread(Thread t) { HttpServerTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX, TcpTransport.TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX, TcpTransport.TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX, - TEST_MOCK_TRANSPORT_THREAD_PREFIX)) { + TEST_MOCK_TRANSPORT_THREAD_PREFIX, + NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX, + NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX)) { if (threadName.contains(s)) { return true; } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index 05c818476a18c..8b0d435a08ef8 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -36,6 +36,7 @@ import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TransportSettings; +import org.elasticsearch.transport.Transports; import org.elasticsearch.transport.nio.channel.ChannelFactory; import org.elasticsearch.transport.nio.channel.NioChannel; import org.elasticsearch.transport.nio.channel.NioServerSocketChannel; @@ -57,9 +58,8 @@ public class NioTransport extends TcpTransport { - // TODO: Need to add to places where we check if transport thread - public static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = "transport_worker"; - public static final String TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = "transport_acceptor"; + public static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX; + public static final String TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX; public static final Setting NIO_WORKER_COUNT = new Setting<>("transport.nio.worker_count", @@ -108,7 +108,14 @@ protected void closeChannels(List channels) throws IOException { for (final NioChannel channel : channels) { if (channel != null && channel.isOpen()) { try { - channel.closeAsync().awaitClose(); + // If we are currently on the selector thread that handles this channel, we should prefer + // the closeFromSelector method. This method always closes the channel immediately. + ESSelector selector = channel.getSelector(); + if (selector != null && selector.isOnCurrentThread()) { + channel.closeFromSelector(); + } else { + channel.closeAsync().awaitClose(); + } } catch (Exception e) { if (closingExceptions == null) { closingExceptions = new IOException("failed to close channels"); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java index be8dbe3f4681f..9792f9e64cc03 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java @@ -102,11 +102,6 @@ public String getProfile() { */ @Override public CloseFuture closeAsync() { - if (selector != null && selector.isOnCurrentThread()) { - closeFromSelector(); - return closeFuture; - } - for (; ; ) { int state = this.state.get(); if (state == UNREGISTERED && this.state.compareAndSet(UNREGISTERED, CLOSING)) { diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ConnectFuture.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ConnectFuture.java index 4bc1ca6043ca8..1675c7326ee04 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ConnectFuture.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ConnectFuture.java @@ -80,12 +80,14 @@ private NioSocketChannel getChannel() { if (isDone()) { try { // Get should always return without blocking as we already checked 'isDone' - return super.get(); + return super.get(0, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; } catch (ExecutionException e) { return null; + } catch (TimeoutException e) { + throw new AssertionError("This should never happen as we only call get() after isDone() is true."); } } else { return null;