Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ public enum Transports {
/** threads whose name is prefixed by this string will be considered network threads, even though they aren't */
public static final String TEST_MOCK_TRANSPORT_THREAD_PREFIX = "__mock_network_thread";

public static final String NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX = "es_nio_transport_worker";
public static final String NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = "es_nio_transport_acceptor";

/**
* Utility method to detect whether a thread is a network thread. Typically
* used in assertions to make sure that we do not call blocking code from
Expand All @@ -40,7 +43,9 @@ public static final boolean isTransportThread(Thread t) {
HttpServerTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX,
TcpTransport.TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX,
TcpTransport.TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX,
TEST_MOCK_TRANSPORT_THREAD_PREFIX)) {
TEST_MOCK_TRANSPORT_THREAD_PREFIX,
NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX,
NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX)) {
if (threadName.contains(s)) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportSettings;
import org.elasticsearch.transport.Transports;
import org.elasticsearch.transport.nio.channel.ChannelFactory;
import org.elasticsearch.transport.nio.channel.NioChannel;
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;
Expand All @@ -57,9 +58,8 @@

public class NioTransport extends TcpTransport<NioChannel> {

// TODO: Need to add to places where we check if transport thread
public static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = "transport_worker";
public static final String TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = "transport_acceptor";
public static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_WORKER_THREAD_NAME_PREFIX;
public static final String TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX = Transports.NIO_TRANSPORT_ACCEPTOR_THREAD_NAME_PREFIX;

public static final Setting<Integer> NIO_WORKER_COUNT =
new Setting<>("transport.nio.worker_count",
Expand Down Expand Up @@ -108,7 +108,14 @@ protected void closeChannels(List<NioChannel> channels) throws IOException {
for (final NioChannel channel : channels) {
if (channel != null && channel.isOpen()) {
try {
channel.closeAsync().awaitClose();
// If we are currently on the selector thread that handles this channel, we should prefer
// the closeFromSelector method. This method always closes the channel immediately.
ESSelector selector = channel.getSelector();
if (selector != null && selector.isOnCurrentThread()) {
channel.closeFromSelector();
} else {
channel.closeAsync().awaitClose();
}
} catch (Exception e) {
if (closingExceptions == null) {
closingExceptions = new IOException("failed to close channels");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,6 @@ public String getProfile() {
*/
@Override
public CloseFuture closeAsync() {
if (selector != null && selector.isOnCurrentThread()) {
closeFromSelector();
return closeFuture;
}

for (; ; ) {
int state = this.state.get();
if (state == UNREGISTERED && this.state.compareAndSet(UNREGISTERED, CLOSING)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,14 @@ private NioSocketChannel getChannel() {
if (isDone()) {
try {
// Get should always return without blocking as we already checked 'isDone'
return super.get();
return super.get(0, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} catch (ExecutionException e) {
return null;
} catch (TimeoutException e) {
throw new AssertionError("This should never happen as we only call get() after isDone() is true.");
}
} else {
return null;
Expand Down