From 943bd8112b4979a559e11af2673381d0704a4aa5 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 29 Jun 2017 11:26:40 -0500 Subject: [PATCH 1/3] Add nio thread names totransport thread checks We have various assertions that check we never block on transport threads. This commit adds the thread names for the nio transport to these assertions. With this change I had to fix two places where we were calling blocking methods from the transport threads. --- .../org/elasticsearch/transport/Transports.java | 7 ++++++- .../elasticsearch/transport/nio/NioTransport.java | 15 +++++++++++---- .../transport/nio/channel/AbstractNioChannel.java | 5 ----- .../transport/nio/channel/ConnectFuture.java | 4 +++- 4 files changed, 20 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/transport/Transports.java b/core/src/main/java/org/elasticsearch/transport/Transports.java index c7a0fe4d4f57f..d8433274168ac 100644 --- a/core/src/main/java/org/elasticsearch/transport/Transports.java +++ b/core/src/main/java/org/elasticsearch/transport/Transports.java @@ -29,6 +29,9 @@ public enum Transports { /** threads whose name is prefixed by this string will be considered network threads, even though they aren't */ public static final String TEST_MOCK_TRANSPORT_THREAD_PREFIX = "__mock_network_thread"; + public static final String NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX = "nio_transport_worker"; + public static final String NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = "nio_transport_acceptor"; + /** * Utility method to detect whether a thread is a network thread. Typically * used in assertions to make sure that we do not call blocking code from @@ -40,7 +43,9 @@ public static final boolean isTransportThread(Thread t) { HttpServerTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX, TcpTransport.TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX, TcpTransport.TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX, - TEST_MOCK_TRANSPORT_THREAD_PREFIX)) { + TEST_MOCK_TRANSPORT_THREAD_PREFIX, + NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX, + NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX)) { if (threadName.contains(s)) { return true; } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index 05c818476a18c..8b0d435a08ef8 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -36,6 +36,7 @@ import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TransportSettings; +import org.elasticsearch.transport.Transports; import org.elasticsearch.transport.nio.channel.ChannelFactory; import org.elasticsearch.transport.nio.channel.NioChannel; import org.elasticsearch.transport.nio.channel.NioServerSocketChannel; @@ -57,9 +58,8 @@ public class NioTransport extends TcpTransport { - // TODO: Need to add to places where we check if transport thread - public static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = "transport_worker"; - public static final String TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = "transport_acceptor"; + public static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX; + public static final String TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX; public static final Setting NIO_WORKER_COUNT = new Setting<>("transport.nio.worker_count", @@ -108,7 +108,14 @@ protected void closeChannels(List channels) throws IOException { for (final NioChannel channel : channels) { if (channel != null && channel.isOpen()) { try { - channel.closeAsync().awaitClose(); + // If we are currently on the selector thread that handles this channel, we should prefer + // the closeFromSelector method. This method always closes the channel immediately. + ESSelector selector = channel.getSelector(); + if (selector != null && selector.isOnCurrentThread()) { + channel.closeFromSelector(); + } else { + channel.closeAsync().awaitClose(); + } } catch (Exception e) { if (closingExceptions == null) { closingExceptions = new IOException("failed to close channels"); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java index be8dbe3f4681f..9792f9e64cc03 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java @@ -102,11 +102,6 @@ public String getProfile() { */ @Override public CloseFuture closeAsync() { - if (selector != null && selector.isOnCurrentThread()) { - closeFromSelector(); - return closeFuture; - } - for (; ; ) { int state = this.state.get(); if (state == UNREGISTERED && this.state.compareAndSet(UNREGISTERED, CLOSING)) { diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ConnectFuture.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ConnectFuture.java index 4bc1ca6043ca8..1675c7326ee04 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ConnectFuture.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ConnectFuture.java @@ -80,12 +80,14 @@ private NioSocketChannel getChannel() { if (isDone()) { try { // Get should always return without blocking as we already checked 'isDone' - return super.get(); + return super.get(0, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; } catch (ExecutionException e) { return null; + } catch (TimeoutException e) { + throw new AssertionError("This should never happen as we only call get() after isDone() is true."); } } else { return null; From 55e6d45a567cc1ce5d372b29dec6dae87f466f6e Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 29 Jun 2017 14:11:41 -0500 Subject: [PATCH 2/3] Add es as a prefix --- .../main/java/org/elasticsearch/transport/Transports.java | 8 ++++---- .../org/elasticsearch/transport/nio/NioTransport.java | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/transport/Transports.java b/core/src/main/java/org/elasticsearch/transport/Transports.java index d8433274168ac..2b6d993098dcf 100644 --- a/core/src/main/java/org/elasticsearch/transport/Transports.java +++ b/core/src/main/java/org/elasticsearch/transport/Transports.java @@ -29,8 +29,8 @@ public enum Transports { /** threads whose name is prefixed by this string will be considered network threads, even though they aren't */ public static final String TEST_MOCK_TRANSPORT_THREAD_PREFIX = "__mock_network_thread"; - public static final String NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX = "nio_transport_worker"; - public static final String NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = "nio_transport_acceptor"; + public static final String ES_NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX = "es_nio_transport_worker"; + public static final String ES_NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = "es_nio_transport_acceptor"; /** * Utility method to detect whether a thread is a network thread. Typically @@ -44,8 +44,8 @@ public static final boolean isTransportThread(Thread t) { TcpTransport.TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX, TcpTransport.TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX, TEST_MOCK_TRANSPORT_THREAD_PREFIX, - NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX, - NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX)) { + ES_NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX, + ES_NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX)) { if (threadName.contains(s)) { return true; } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index 8b0d435a08ef8..09059ca589706 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -58,8 +58,8 @@ public class NioTransport extends TcpTransport { - public static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX; - public static final String TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX; + public static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = Transports.ES_NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX; + public static final String TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = Transports.ES_NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX; public static final Setting NIO_WORKER_COUNT = new Setting<>("transport.nio.worker_count", From 0dbdcfe3e9e9a90e8449f41a588bd73166894fce Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 29 Jun 2017 14:12:48 -0500 Subject: [PATCH 3/3] Rename constant --- .../main/java/org/elasticsearch/transport/Transports.java | 8 ++++---- .../org/elasticsearch/transport/nio/NioTransport.java | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/transport/Transports.java b/core/src/main/java/org/elasticsearch/transport/Transports.java index 2b6d993098dcf..d07846835c23f 100644 --- a/core/src/main/java/org/elasticsearch/transport/Transports.java +++ b/core/src/main/java/org/elasticsearch/transport/Transports.java @@ -29,8 +29,8 @@ public enum Transports { /** threads whose name is prefixed by this string will be considered network threads, even though they aren't */ public static final String TEST_MOCK_TRANSPORT_THREAD_PREFIX = "__mock_network_thread"; - public static final String ES_NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX = "es_nio_transport_worker"; - public static final String ES_NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = "es_nio_transport_acceptor"; + public static final String NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX = "es_nio_transport_worker"; + public static final String NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = "es_nio_transport_acceptor"; /** * Utility method to detect whether a thread is a network thread. Typically @@ -44,8 +44,8 @@ public static final boolean isTransportThread(Thread t) { TcpTransport.TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX, TcpTransport.TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX, TEST_MOCK_TRANSPORT_THREAD_PREFIX, - ES_NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX, - ES_NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX)) { + NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX, + NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX)) { if (threadName.contains(s)) { return true; } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java index 09059ca589706..8b0d435a08ef8 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java @@ -58,8 +58,8 @@ public class NioTransport extends TcpTransport { - public static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = Transports.ES_NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX; - public static final String TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = Transports.ES_NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX; + public static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX; + public static final String TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX; public static final Setting NIO_WORKER_COUNT = new Setting<>("transport.nio.worker_count",