Skip to content
Merged
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
4dbe81b
Chunked Netty Writes
original-brownbear Feb 22, 2019
569492d
cleaner handling empty write
original-brownbear Feb 22, 2019
b9fd81d
Merge remote-tracking branch 'elastic/master' into chunked-netty
original-brownbear Feb 22, 2019
375011e
Merge remote-tracking branch 'elastic/master' into chunked-netty
original-brownbear Feb 22, 2019
fbe339c
Merge remote-tracking branch 'elastic/master' into chunked-netty
original-brownbear Feb 25, 2019
760bf28
more efficient buffer allocation
original-brownbear Feb 25, 2019
21ef9d1
faster flushing
original-brownbear Feb 25, 2019
1e70435
Merge remote-tracking branch 'elastic/master' into chunked-netty
original-brownbear Feb 25, 2019
9e9f201
Merge remote-tracking branch 'elastic/master' into chunked-netty
original-brownbear Feb 25, 2019
24ec12d
fix overflow
original-brownbear Feb 25, 2019
0e8a0ad
Merge remote-tracking branch 'elastic/master' into chunked-netty
original-brownbear Feb 28, 2019
2cf079f
Merge remote-tracking branch 'elastic/master' into chunked-netty
original-brownbear Feb 28, 2019
7119379
cleanup redundant retain - release cycle
original-brownbear Feb 28, 2019
844d9b9
Merge remote-tracking branch 'elastic/master' into chunked-netty
original-brownbear Feb 28, 2019
cd9ab1c
guard against async handler changes
original-brownbear Feb 28, 2019
6854d74
no exceptions here ...
original-brownbear Feb 28, 2019
9234382
Merge remote-tracking branch 'elastic/master' into chunked-netty
original-brownbear Mar 1, 2019
30cbf46
CR: assume all operations in IO thread
original-brownbear Mar 1, 2019
de7a7be
Merge remote-tracking branch 'elastic/master' into chunked-netty
original-brownbear Mar 1, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,17 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.Attribute;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.transport.Transports;

import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Queue;

/**
* A handler (must be the last one!) that does size based frame decoding and forwards the actual message
Expand All @@ -37,13 +42,17 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler {

private final Netty4Transport transport;

private final Queue<WriteOperation> queuedWrites = new ArrayDeque<>();

private WriteOperation currentWrite;

Netty4MessageChannelHandler(Netty4Transport transport) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that this should be in this (Netty4MessageChannelHandler). You are copying heap bytes -> direct bytes. That works fine for plaintext. But this handler comes before the SslHandler. Which means in the security use case, this is heap bytes -> direct bytes -> heap bytes (temporary byte array in SSLEngine) encrypted-> heap buffer -> direct bytes (in filterOutboundMessages). So this will introduce more copies.

I think that either:

  1. This should be a outbound handler that comes after the SslHandler
  2. This should not copy the bytes to a direct buffer and instead just limit the heap bytes that are passed to the next handler until the writability callback is invoked.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right :) I went with option 2. now. That also makes the whole issue of potentially trying to write released bytes a lot safer/cleaner. Once/if the buffer is below our write chunk size we can simply pass it down outright and use retained slices in the writes preceding that situation.
(+ we save some objects for writes that are smaller than the buffer size :))

this.transport = transport;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Transports.assertTransportThread();
public void channelRead(ChannelHandlerContext ctx, Object msg) {
assert Transports.assertTransportThread();
assert msg instanceof ByteBuf : "Expected message type ByteBuf, found: " + msg.getClass();

final ByteBuf buffer = (ByteBuf) msg;
Expand All @@ -57,7 +66,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
final Throwable unwrapped = ExceptionsHelper.unwrap(cause, ElasticsearchException.class);
final Throwable newCause = unwrapped != null ? unwrapped : cause;
Expand All @@ -68,4 +77,113 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
transport.onException(tcpChannel, (Exception) newCause);
}
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
assert msg instanceof ByteBuf;
final boolean queued = queuedWrites.offer(new WriteOperation((ByteBuf) msg, promise));
assert queued;
}

@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) {
if (ctx.channel().isWritable()) {
doFlush(ctx);
}
ctx.fireChannelWritabilityChanged();
}

@Override
public void flush(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
if (channel.isWritable() || channel.isActive() == false) {
doFlush(ctx);
}
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this a bit suspicious since channelInactive is from ChannelInboundHandler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ChannelInboundHandler has state callbacks for the channel in general, it's not specific to reading if that's what makes you suspicious? :)

doFlush(ctx);
super.channelInactive(ctx);
}

private void doFlush(ChannelHandlerContext ctx) {
assert ctx.executor().inEventLoop();
final Channel channel = ctx.channel();
if (channel.isActive() == false) {
if (currentWrite != null) {
currentWrite.promise.tryFailure(new ClosedChannelException());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe set currentWrite to null to only trigger once?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's ok currentWrite.promise.tryFailure is idempotent and I think it's easier to let Netty guard us here than do it ourselves and maybe make a mistake :)

failQueuedWrites();
return;
}
while (channel.isWritable()) {
if (currentWrite == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure that there is only one thread in here at a time? If not, we need to guard the reads/writes to currentWrite.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't be calling flush from outside the event loop ever an all other invocations of this method do happen in event loop callbacks so we should be good here. If you look at other Netty handler implementations (like compression implementations or so) you'll see that they make the same assumption about flush.

currentWrite = queuedWrites.poll();
}
if (currentWrite == null) {
break;
}
final WriteOperation write = currentWrite;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you will need to guard against the promise from already failing. The prior channel write could have failed and failed the the promise associated with the write operation. I imagine that this will kill the channel and cause everything to shutdown. But it still seems safest to prevent potentially flushing bytes that have already been released.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's why I put the cleanup loop that you mentioned above. This way we def. fail all the writes and drain the queue on failures before going for another write in the loop. And since the only way, the callbacks/futures that would release the bytes is the ctx.flush we do at the end of the loop body I think we're safe here?

if (write.buf.readableBytes() == 0) {
write.promise.trySuccess();
currentWrite = null;
continue;
}
final int readableBytes = write.buf.readableBytes();
final int bufferSize = Math.min(readableBytes, 1 << 18);
final int readerIndex = write.buf.readerIndex();
final boolean sliced = readableBytes != bufferSize;
final ByteBuf writeBuffer;
if (sliced) {
writeBuffer = write.buf.retainedSlice(readerIndex, bufferSize);
write.buf.readerIndex(readerIndex + bufferSize);
} else {
writeBuffer = write.buf;
}
final ChannelFuture writeFuture = ctx.write(writeBuffer);
if (sliced == false || write.buf.readableBytes() == 0) {
currentWrite = null;
writeFuture.addListener(future -> {
assert ctx.executor().inEventLoop();
if (future.isSuccess()) {
write.promise.trySuccess();
} else {
write.promise.tryFailure(future.cause());
}
});
} else {
writeFuture.addListener(future -> {
assert ctx.executor().inEventLoop();
if (future.isSuccess() == false) {
write.promise.tryFailure(future.cause());
}
});
}
ctx.flush();
if (channel.isActive() == false) {
failQueuedWrites();
return;
}
}
}

private void failQueuedWrites() {
WriteOperation queuedWrite;
while ((queuedWrite = queuedWrites.poll()) != null) {
queuedWrite.promise.tryFailure(new ClosedChannelException());
}
}

private static final class WriteOperation {

private final ByteBuf buf;

private final ChannelPromise promise;

WriteOperation(ByteBuf buf, ChannelPromise promise) {
this.buf = buf;
this.promise = promise;
}
}
}