From e0189c223316afb40708652f8d94e733605fba95 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 20 Nov 2017 13:52:16 -0600 Subject: [PATCH 1/2] Fix issue where pages aren't released This is related to #27422. Right now when we do a write in the netty transport, we attach a listener to the future. When you submit a write on the netty event loop and the event loop is shutdown, the onFailure method is called. Unfortunately, netty then tries to notify the listener which cannot be done without dispatching to the event loop. In this case, the dispatch fails and netty logs and error and does not tell us. This commit checks that netty is still running before sending a message. This will not 100% fix this issue, but it should reduce it. --- .../transport/netty4/NettyTcpChannel.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java index fa9989f7270c4..1f7bf76fae4fa 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java @@ -22,13 +22,17 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPromise; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.transport.TcpChannel; +import org.elasticsearch.transport.TransportException; import java.net.InetSocketAddress; +import java.nio.channels.ClosedSelectorException; import java.util.concurrent.CompletableFuture; public class NettyTcpChannel implements TcpChannel { @@ -80,8 +84,11 @@ public InetSocketAddress getLocalAddress() { @Override public void sendMessage(BytesReference reference, ActionListener listener) { - final ChannelFuture future = channel.writeAndFlush(Netty4Utils.toByteBuf(reference)); - future.addListener(f -> { + if (channel.eventLoop().isShuttingDown()) { + listener.onFailure(new TransportException("Cannot send message, event loop is shutting down.")); + } else { + ChannelPromise writePromise = channel.newPromise(); + writePromise.addListener(f -> { if (f.isSuccess()) { listener.onResponse(null); } else { @@ -90,7 +97,9 @@ public void sendMessage(BytesReference reference, ActionListener listener) assert cause instanceof Exception; listener.onFailure((Exception) cause); } - }); + }); + channel.writeAndFlush(Netty4Utils.toByteBuf(reference), writePromise); + } } public Channel getLowLevelChannel() { From 0ff8cf7b74977bcaa629e1962dff2f6317038308 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 20 Nov 2017 14:09:03 -0600 Subject: [PATCH 2/2] Clean up --- .../transport/netty4/NettyTcpChannel.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java index 1f7bf76fae4fa..3d71735a2a8e6 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java @@ -84,11 +84,8 @@ public InetSocketAddress getLocalAddress() { @Override public void sendMessage(BytesReference reference, ActionListener listener) { - if (channel.eventLoop().isShuttingDown()) { - listener.onFailure(new TransportException("Cannot send message, event loop is shutting down.")); - } else { - ChannelPromise writePromise = channel.newPromise(); - writePromise.addListener(f -> { + ChannelPromise writePromise = channel.newPromise(); + writePromise.addListener(f -> { if (f.isSuccess()) { listener.onResponse(null); } else { @@ -97,8 +94,11 @@ public void sendMessage(BytesReference reference, ActionListener listener) assert cause instanceof Exception; listener.onFailure((Exception) cause); } - }); - channel.writeAndFlush(Netty4Utils.toByteBuf(reference), writePromise); + }); + channel.writeAndFlush(Netty4Utils.toByteBuf(reference), writePromise); + + if (channel.eventLoop().isShutdown()) { + listener.onFailure(new TransportException("Cannot send message, event loop is shutting down.")); } }