Skip to content

Commit 4e04f95

Browse files
authored
Fix issue where pages aren't released (#27459)
This is related to #27422. Right now when we send a write to the netty transport, we attach a listener to the future. When you submit a write on the netty event loop and the event loop is shutdown, the onFailure method is called. Unfortunately, netty then tries to notify the listener which cannot be done without dispatching to the event loop. In this case, the dispatch fails and netty logs and error and does not tell us. This commit checks that netty is still not shutdown after sending a message. If netty is shutdown, we complete the listener.
1 parent 196dbf3 commit 4e04f95

File tree

1 file changed

+11
-2
lines changed

1 file changed

+11
-2
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,17 @@
2222
import io.netty.channel.Channel;
2323
import io.netty.channel.ChannelFuture;
2424
import io.netty.channel.ChannelOption;
25+
import io.netty.channel.ChannelPromise;
2526
import org.apache.logging.log4j.message.ParameterizedMessage;
2627
import org.apache.logging.log4j.util.Supplier;
28+
import org.elasticsearch.ElasticsearchException;
2729
import org.elasticsearch.action.ActionListener;
2830
import org.elasticsearch.common.bytes.BytesReference;
2931
import org.elasticsearch.transport.TcpChannel;
32+
import org.elasticsearch.transport.TransportException;
3033

3134
import java.net.InetSocketAddress;
35+
import java.nio.channels.ClosedSelectorException;
3236
import java.util.concurrent.CompletableFuture;
3337

3438
public class NettyTcpChannel implements TcpChannel {
@@ -80,8 +84,8 @@ public InetSocketAddress getLocalAddress() {
8084

8185
@Override
8286
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
83-
final ChannelFuture future = channel.writeAndFlush(Netty4Utils.toByteBuf(reference));
84-
future.addListener(f -> {
87+
ChannelPromise writePromise = channel.newPromise();
88+
writePromise.addListener(f -> {
8589
if (f.isSuccess()) {
8690
listener.onResponse(null);
8791
} else {
@@ -91,6 +95,11 @@ public void sendMessage(BytesReference reference, ActionListener<Void> listener)
9195
listener.onFailure((Exception) cause);
9296
}
9397
});
98+
channel.writeAndFlush(Netty4Utils.toByteBuf(reference), writePromise);
99+
100+
if (channel.eventLoop().isShutdown()) {
101+
listener.onFailure(new TransportException("Cannot send message, event loop is shutting down."));
102+
}
94103
}
95104

96105
public Channel getLowLevelChannel() {

0 commit comments

Comments
 (0)