Skip to content

Commit 0cdadf5

Browse files
Chunk + Throttle Netty Writes (#39286)
* Chunk large writes and throttle on a non-writable channel to reduce direct memory usage by Netty
1 parent 2349fb9 commit 0cdadf5

File tree

1 file changed

+121
-3
lines changed

1 file changed

+121
-3
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java

Lines changed: 121 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,17 @@
2222
import io.netty.buffer.ByteBuf;
2323
import io.netty.channel.Channel;
2424
import io.netty.channel.ChannelDuplexHandler;
25+
import io.netty.channel.ChannelFuture;
2526
import io.netty.channel.ChannelHandlerContext;
27+
import io.netty.channel.ChannelPromise;
2628
import io.netty.util.Attribute;
2729
import org.elasticsearch.ElasticsearchException;
2830
import org.elasticsearch.ExceptionsHelper;
2931
import org.elasticsearch.transport.Transports;
3032

33+
import java.nio.channels.ClosedChannelException;
34+
import java.util.ArrayDeque;
35+
import java.util.Queue;
3136

3237
/**
3338
* A handler (must be the last one!) that does size based frame decoding and forwards the actual message
@@ -37,13 +42,17 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
3742

3843
private final Netty4Transport transport;
3944

45+
private final Queue<WriteOperation> queuedWrites = new ArrayDeque<>();
46+
47+
private WriteOperation currentWrite;
48+
4049
Netty4MessageChannelHandler(Netty4Transport transport) {
4150
this.transport = transport;
4251
}
4352

4453
@Override
45-
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
46-
Transports.assertTransportThread();
54+
public void channelRead(ChannelHandlerContext ctx, Object msg) {
55+
assert Transports.assertTransportThread();
4756
assert msg instanceof ByteBuf : "Expected message type ByteBuf, found: " + msg.getClass();
4857

4958
final ByteBuf buffer = (ByteBuf) msg;
@@ -57,7 +66,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
5766
}
5867

5968
@Override
60-
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
69+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
6170
ExceptionsHelper.maybeDieOnAnotherThread(cause);
6271
final Throwable unwrapped = ExceptionsHelper.unwrap(cause, ElasticsearchException.class);
6372
final Throwable newCause = unwrapped != null ? unwrapped : cause;
@@ -68,4 +77,113 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
6877
transport.onException(tcpChannel, (Exception) newCause);
6978
}
7079
}
80+
81+
@Override
82+
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
83+
assert msg instanceof ByteBuf;
84+
final boolean queued = queuedWrites.offer(new WriteOperation((ByteBuf) msg, promise));
85+
assert queued;
86+
}
87+
88+
@Override
89+
public void channelWritabilityChanged(ChannelHandlerContext ctx) {
90+
if (ctx.channel().isWritable()) {
91+
doFlush(ctx);
92+
}
93+
ctx.fireChannelWritabilityChanged();
94+
}
95+
96+
@Override
97+
public void flush(ChannelHandlerContext ctx) {
98+
Channel channel = ctx.channel();
99+
if (channel.isWritable() || channel.isActive() == false) {
100+
doFlush(ctx);
101+
}
102+
}
103+
104+
@Override
105+
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
106+
doFlush(ctx);
107+
super.channelInactive(ctx);
108+
}
109+
110+
private void doFlush(ChannelHandlerContext ctx) {
111+
assert ctx.executor().inEventLoop();
112+
final Channel channel = ctx.channel();
113+
if (channel.isActive() == false) {
114+
if (currentWrite != null) {
115+
currentWrite.promise.tryFailure(new ClosedChannelException());
116+
}
117+
failQueuedWrites();
118+
return;
119+
}
120+
while (channel.isWritable()) {
121+
if (currentWrite == null) {
122+
currentWrite = queuedWrites.poll();
123+
}
124+
if (currentWrite == null) {
125+
break;
126+
}
127+
final WriteOperation write = currentWrite;
128+
if (write.buf.readableBytes() == 0) {
129+
write.promise.trySuccess();
130+
currentWrite = null;
131+
continue;
132+
}
133+
final int readableBytes = write.buf.readableBytes();
134+
final int bufferSize = Math.min(readableBytes, 1 << 18);
135+
final int readerIndex = write.buf.readerIndex();
136+
final boolean sliced = readableBytes != bufferSize;
137+
final ByteBuf writeBuffer;
138+
if (sliced) {
139+
writeBuffer = write.buf.retainedSlice(readerIndex, bufferSize);
140+
write.buf.readerIndex(readerIndex + bufferSize);
141+
} else {
142+
writeBuffer = write.buf;
143+
}
144+
final ChannelFuture writeFuture = ctx.write(writeBuffer);
145+
if (sliced == false || write.buf.readableBytes() == 0) {
146+
currentWrite = null;
147+
writeFuture.addListener(future -> {
148+
assert ctx.executor().inEventLoop();
149+
if (future.isSuccess()) {
150+
write.promise.trySuccess();
151+
} else {
152+
write.promise.tryFailure(future.cause());
153+
}
154+
});
155+
} else {
156+
writeFuture.addListener(future -> {
157+
assert ctx.executor().inEventLoop();
158+
if (future.isSuccess() == false) {
159+
write.promise.tryFailure(future.cause());
160+
}
161+
});
162+
}
163+
ctx.flush();
164+
if (channel.isActive() == false) {
165+
failQueuedWrites();
166+
return;
167+
}
168+
}
169+
}
170+
171+
private void failQueuedWrites() {
172+
WriteOperation queuedWrite;
173+
while ((queuedWrite = queuedWrites.poll()) != null) {
174+
queuedWrite.promise.tryFailure(new ClosedChannelException());
175+
}
176+
}
177+
178+
private static final class WriteOperation {
179+
180+
private final ByteBuf buf;
181+
182+
private final ChannelPromise promise;
183+
184+
WriteOperation(ByteBuf buf, ChannelPromise promise) {
185+
this.buf = buf;
186+
this.promise = promise;
187+
}
188+
}
71189
}

0 commit comments

Comments
 (0)