-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Chunk + Throttle Netty Writes #39286
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Chunk + Throttle Netty Writes #39286
Conversation
|
Pinging @elastic/es-distributed |
|
Jenkins run elasticsearch-ci/2 |
|
Jenkins run elasticsearch-ci/2 |
henningandersen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your work on this @original-brownbear . I have left comments - please bear in mind that I may be wrong on all of them since this is new territory for me.
| } | ||
|
|
||
| @Override | ||
| public void channelInactive(ChannelHandlerContext ctx) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find this a bit suspicious since channelInactive is from ChannelInboundHandler?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ChannelInboundHandler has state callbacks for the channel in general, it's not specific to reading if that's what makes you suspicious? :)
| return; | ||
| } | ||
| while (channel.isWritable()) { | ||
| if (currentWrite == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we sure that there is only one thread in here at a time? If not, we need to guard the reads/writes to currentWrite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't be calling flush from outside the event loop ever an all other invocations of this method do happen in event loop callbacks so we should be good here. If you look at other Netty handler implementations (like compression implementations or so) you'll see that they make the same assumption about flush.
| if (channel.isActive() == false) { | ||
| if (currentWrite != null) { | ||
| currentWrite.promise.tryFailure(new ClosedChannelException()); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe set currentWrite to null to only trigger once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's ok currentWrite.promise.tryFailure is idempotent and I think it's easier to let Netty guard us here than do it ourselves and maybe make a mistake :)
...ort-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java
Outdated
Show resolved
Hide resolved
| ctx.flush(); | ||
| if (channel.isActive() == false) { | ||
| WriteOperation queuedWrite; | ||
| while ((queuedWrite = queuedWrites.poll()) != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should also notify currentWrite if it is not null and thus this is the same as is done in the beginning of this method (extract to method?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currentWrite will have its callback fail on the event loop because we just enqueued a flush for it. If we touch it here as well it should be a noop.
| continue; | ||
| } | ||
| final int bufferSize = Math.min(write.buf.readableBytes(), 1 << 18); | ||
| writeBuffer = ctx.alloc().directBuffer(bufferSize, bufferSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we skip the copy if the write is smaller than 1<<18?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe, it's probably a noop since Netty will itself do the same copying internally if we don't do it here. I mainly did it here to make profiling a little easier (only one source of allocations for writes).
Also, I think we could do even better here performance wise if we want/need to by using a different allocator that just caches a single direct ByteBuf, so I figured this way of extracting the alloc to a single place makes it a little easier to experiment with that kind of thing. I'll add some more comments on that to the PR description in a bit.
Tim-Brooks
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments.
|
|
||
| private final Netty4Transport transport; | ||
|
|
||
| private final ConcurrentLinkedQueue<WriteOperation> queuedWrites = new ConcurrentLinkedQueue<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As you mention in another comment, Netty channel handlers are only called from the event loop thread, so I do not see the need for a concurrent implementation here.
...ort-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java
Outdated
Show resolved
Hide resolved
...ort-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java
Outdated
Show resolved
Hide resolved
|
|
||
| private WriteOperation currentWrite; | ||
|
|
||
| Netty4MessageChannelHandler(Netty4Transport transport) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure that this should be in this (Netty4MessageChannelHandler). You are copying heap bytes -> direct bytes. That works fine for plaintext. But this handler comes before the SslHandler. Which means in the security use case, this is heap bytes -> direct bytes -> heap bytes (temporary byte array in SSLEngine) encrypted-> heap buffer -> direct bytes (in filterOutboundMessages). So this will introduce more copies.
I think that either:
- This should be a outbound handler that comes after the
SslHandler - This should not copy the bytes to a direct buffer and instead just limit the heap bytes that are passed to the next handler until the writability callback is invoked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right :) I went with option 2. now. That also makes the whole issue of potentially trying to write released bytes a lot safer/cleaner. Once/if the buffer is below our write chunk size we can simply pass it down outright and use retained slices in the writes preceding that situation.
(+ we save some objects for writes that are smaller than the buffer size :))
| if (channel.isActive() == false) { | ||
| WriteOperation queuedWrite; | ||
| while ((queuedWrite = queuedWrites.poll()) != null) { | ||
| queuedWrite.promise.tryFailure(new ClosedChannelException()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is confusing that these are cleared in the doFlush call. Can we not just clear and release everything in the channelInactive callback? Or the close callback if you change this to an outbound handler and put it after the ssl handler?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This solves the problem of potentially trying to flush released bytes because we can't just rely on the channel inactive hook as far as I can see because it triggers in a later iteration of the event-loop. From io.netty.channel.AbstractChannel.AbstractUnsafe#close:
// Fail all the queued messages
try {
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION);
} finally {
if (wasActive && !isActive()) {
invokeLater(new OneTimeTask() {
@Override
public void run() {
pipeline.fireChannelInactive();
}
});
}| if (currentWrite == null) { | ||
| break; | ||
| } | ||
| final WriteOperation write = currentWrite; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you will need to guard against the promise from already failing. The prior channel write could have failed and failed the the promise associated with the write operation. I imagine that this will kill the channel and cause everything to shutdown. But it still seems safest to prevent potentially flushing bytes that have already been released.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's why I put the cleanup loop that you mentioned above. This way we def. fail all the writes and drain the queue on failures before going for another write in the loop. And since the only way, the callbacks/futures that would release the bytes is the ctx.flush we do at the end of the loop body I think we're safe here?
|
@tbrooks8 thanks for pointing out the threading situation :) (my bad for getting confused by the SslHandler here :(), I simplified the logic accordingly. |
|
@original-brownbear , instead of creating new instances of http://normanmaurer.me/blog/2013/11/09/The-hidden-performance-costs-of-instantiating-Throwables/ You can see from the search results below that the Netty project often uses this technique, In a nutshell, this will reduce object creation and eliminate the performance cost of |
|
@travishaagen since the connections here are intended to be long-lived and the messages generally have a number of other objects attached for their callbacks I don't think we have to worry about the cost of setting up these |
|
@tbrooks8 ping :) can you take another look, please? Thanks! |
|
Yes I will take a look tomorrow. |
Tim-Brooks
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
@tbrooks8 thanks! |
* Chunk large writes and throttle on a non-writable channel to reduce direct memory usage by Netty
Fixes issues with allocating huge amounts of direct memory in Netty under heavy write load.
The initial issue reported here was that writing a large message to multiple nodes was incurring a large amount of direct memory allocations when writing a large cluster state to multiple nodes, but there were also reports of ES generally consuming a lot of direct memory in heavy bulk request processing in the past sporadically.
The general reason for these problems lies in the fact that we were invoking Netty's
writemethod faster than the network/other side could read and with large individual messages.byte[]initially. So situations like writing 100MB to 50 connections simultaneously could result in allocating up to5Gif writes and/or GCing of buffers are slow.writemethod to direct memory and queue it to be written without ever blocking the caller towrite, which can lead to runaway allocations if writes are initiated faster than they can physically be written long term.The fix here addresses both issues:
OP_WRITEsense).writeto avoid big allocations and only allocate buffers for what can be immediately written.We could potentially do even better here memory wise (if necessary) by pooling the direct
ByteBufused for writing ourselves instead of using the Netty allocator, but we'd need data from benchmarking to support and measure the impact of that kind of effort.