diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java index fa9989f7270c4..3d71735a2a8e6 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java @@ -22,13 +22,17 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPromise; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.transport.TcpChannel; +import org.elasticsearch.transport.TransportException; import java.net.InetSocketAddress; +import java.nio.channels.ClosedSelectorException; import java.util.concurrent.CompletableFuture; public class NettyTcpChannel implements TcpChannel { @@ -80,8 +84,8 @@ public InetSocketAddress getLocalAddress() { @Override public void sendMessage(BytesReference reference, ActionListener listener) { - final ChannelFuture future = channel.writeAndFlush(Netty4Utils.toByteBuf(reference)); - future.addListener(f -> { + ChannelPromise writePromise = channel.newPromise(); + writePromise.addListener(f -> { if (f.isSuccess()) { listener.onResponse(null); } else { @@ -91,6 +95,11 @@ public void sendMessage(BytesReference reference, ActionListener listener) listener.onFailure((Exception) cause); } }); + channel.writeAndFlush(Netty4Utils.toByteBuf(reference), writePromise); + + if (channel.eventLoop().isShutdown()) { + listener.onFailure(new TransportException("Cannot send message, event loop is shutting down.")); + } } public Channel getLowLevelChannel() {