Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.LinkedList;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
Expand Down Expand Up @@ -48,14 +49,30 @@ public class TransportFrameDecoder extends ChannelInboundHandlerAdapter {
private static final int LENGTH_SIZE = 8;
private static final int MAX_FRAME_SIZE = Integer.MAX_VALUE;
private static final int UNKNOWN_FRAME_SIZE = -1;
private static final long CONSOLIDATE_THRESHOLD = 20 * 1024 * 1024;

private final LinkedList<ByteBuf> buffers = new LinkedList<>();
private final ByteBuf frameLenBuf = Unpooled.buffer(LENGTH_SIZE, LENGTH_SIZE);
private final long consolidateThreshold;

private CompositeByteBuf frameBuf = null;
private long consolidatedFrameBufSize = 0;
private int consolidatedNumComponents = 0;

private long totalSize = 0;
private long nextFrameSize = UNKNOWN_FRAME_SIZE;
private int frameRemainingBytes = UNKNOWN_FRAME_SIZE;
private volatile Interceptor interceptor;

public TransportFrameDecoder() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I though you were going to make this configurable. Where are you reading the value from the configuration?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I think maybe we can just make it a fixed value, user will unlikely to change this threshold in most cases, and it requires little memory as shown in the newest tests reports.

this(CONSOLIDATE_THRESHOLD);
}

@VisibleForTesting
TransportFrameDecoder(long consolidateThreshold) {
this.consolidateThreshold = consolidateThreshold;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object data) throws Exception {
ByteBuf in = (ByteBuf) data;
Expand Down Expand Up @@ -123,30 +140,56 @@ private long decodeFrameSize() {

private ByteBuf decodeNext() {
long frameSize = decodeFrameSize();
if (frameSize == UNKNOWN_FRAME_SIZE || totalSize < frameSize) {
if (frameSize == UNKNOWN_FRAME_SIZE) {
return null;
}

// Reset size for next frame.
nextFrameSize = UNKNOWN_FRAME_SIZE;

Preconditions.checkArgument(frameSize < MAX_FRAME_SIZE, "Too large frame: %s", frameSize);
Preconditions.checkArgument(frameSize > 0, "Frame length should be positive: %s", frameSize);
if (frameBuf == null) {
Preconditions.checkArgument(frameSize < MAX_FRAME_SIZE,
"Too large frame: %s", frameSize);
Preconditions.checkArgument(frameSize > 0,
"Frame length should be positive: %s", frameSize);
frameRemainingBytes = (int) frameSize;

// If the first buffer holds the entire frame, return it.
int remaining = (int) frameSize;
if (buffers.getFirst().readableBytes() >= remaining) {
return nextBufferForFrame(remaining);
// If buffers is empty, then return immediately for more input data.
if (buffers.isEmpty()) {
return null;
}
// Otherwise, if the first buffer holds the entire frame, we attempt to
// build frame with it and return.
if (buffers.getFirst().readableBytes() >= frameRemainingBytes) {
// Reset buf and size for next frame.
frameBuf = null;
nextFrameSize = UNKNOWN_FRAME_SIZE;
return nextBufferForFrame(frameRemainingBytes);
}
// Other cases, create a composite buffer to manage all the buffers.
frameBuf = buffers.getFirst().alloc().compositeBuffer(Integer.MAX_VALUE);
}

// Otherwise, create a composite buffer.
CompositeByteBuf frame = buffers.getFirst().alloc().compositeBuffer(Integer.MAX_VALUE);
while (remaining > 0) {
ByteBuf next = nextBufferForFrame(remaining);
remaining -= next.readableBytes();
frame.addComponent(next).writerIndex(frame.writerIndex() + next.readableBytes());
while (frameRemainingBytes > 0 && !buffers.isEmpty()) {
ByteBuf next = nextBufferForFrame(frameRemainingBytes);
frameRemainingBytes -= next.readableBytes();
frameBuf.addComponent(true, next);
}
assert remaining == 0;
// If the delta size of frameBuf exceeds the threshold, then we do consolidation
// to reduce memory consumption.
if (frameBuf.capacity() - consolidatedFrameBufSize > consolidateThreshold) {
int newNumComponents = frameBuf.numComponents() - consolidatedNumComponents;
frameBuf.consolidate(consolidatedNumComponents, newNumComponents);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic here seems correct, but how is this different than just calling frameBuf.consolidate() without having to keep track of the component count in this class?

Copy link
Author

@liupc liupc Feb 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No parameter consolidate() will do unnecessary consolidate for already consolidated components (aka there are always one component after consolidation), it's slow and memory wasting, However, consolidate(cIndex, numComponents) will only consolidate specified new components.

For instance, let's say we add 10 components, and do first consolidation, then we got one consolidated component. If we use consolidate(cIndex, numComponents) here, then next time we do consolidation after another 10 components added, we do not need to consolidate the components already consolidated(no extra memory allocation and copy).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so in the end you don't end up with a single component, but with many components of size CONSOLIDATE_THRESHOLD each (minus the last one). I thought I saw the tests checking for a single component after consolidation, but may have misread.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's it.

consolidatedFrameBufSize = frameBuf.capacity();
consolidatedNumComponents = frameBuf.numComponents();
}
if (frameRemainingBytes > 0) {
return null;
}

// Reset buf and size for next frame.
ByteBuf frame = frameBuf;
frameBuf = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To follow up Sean's question, aren't you leaking frameBuf here now? You're returning a duplicate and not releasing this instance to decrement its ref count.

(Another way to say that returning the buffer itself is probably the right thing.)

Copy link
Author

@liupc liupc Feb 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, frameBuf.duplicate create a derived buffer which shares the memory region of the parent buffer. A derived buffer does not have its own reference count but shares the reference count of the parent buffer.

Here we can return a local variable refer to the frameBuf object, and null out the frameBuf for next frame decoding.

consolidatedFrameBufSize = 0;
consolidatedNumComponents = 0;
nextFrameSize = UNKNOWN_FRAME_SIZE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have to reset consolidatedFrameBufSize and consolidatedNumComponents back to 0 for the next frame buffer.

Otherwise after a very huge frame all the smaller but still quite huge frames are not consolidated at all.
And when consolidation starts as a frame which bigger then the maximum up to this then only the components are consolidated which are after the previous maximum.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@attilapiros Good catch! Than you so much! I will fix it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@attilapiros done!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you fixed this, but it should have been caught by unit tests. So there's probably a check missing in your tests (expected number of components?).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not the check for the expected number of components missing but testing with multiple messages. Right now within the loop body, where a new TransportFrameDecoder is created too, there is only one 1GB message sent.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I can add some code to test multiple messages, and we just need to do the same check for consolidated buf capacity. I think this is more result oriented.

return frame;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,15 @@
import io.netty.channel.ChannelHandlerContext;
import org.junit.AfterClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

public class TransportFrameDecoderSuite {

private static final Logger logger = LoggerFactory.getLogger(TransportFrameDecoderSuite.class);
private static Random RND = new Random();

@AfterClass
Expand All @@ -47,6 +51,69 @@ public void testFrameDecoding() throws Exception {
verifyAndCloseDecoder(decoder, ctx, data);
}

@Test
public void testConsolidationPerf() throws Exception {
long[] testingConsolidateThresholds = new long[] {
ByteUnit.MiB.toBytes(1),
ByteUnit.MiB.toBytes(5),
ByteUnit.MiB.toBytes(10),
ByteUnit.MiB.toBytes(20),
ByteUnit.MiB.toBytes(30),
ByteUnit.MiB.toBytes(50),
ByteUnit.MiB.toBytes(80),
ByteUnit.MiB.toBytes(100),
ByteUnit.MiB.toBytes(300),
ByteUnit.MiB.toBytes(500),
Long.MAX_VALUE };
for (long threshold : testingConsolidateThresholds) {
TransportFrameDecoder decoder = new TransportFrameDecoder(threshold);
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
List<ByteBuf> retained = new ArrayList<>();
when(ctx.fireChannelRead(any())).thenAnswer(in -> {
ByteBuf buf = (ByteBuf) in.getArguments()[0];
retained.add(buf);
return null;
});

// Testing multiple messages
int numMessages = 3;
long targetBytes = ByteUnit.MiB.toBytes(300);
int pieceBytes = (int) ByteUnit.KiB.toBytes(32);
for (int i = 0; i < numMessages; i++) {
try {
long writtenBytes = 0;
long totalTime = 0;
ByteBuf buf = Unpooled.buffer(8);
buf.writeLong(8 + targetBytes);
decoder.channelRead(ctx, buf);
while (writtenBytes < targetBytes) {
buf = Unpooled.buffer(pieceBytes * 2);
ByteBuf writtenBuf = Unpooled.buffer(pieceBytes).writerIndex(pieceBytes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wanted to point out you're counting this allocation time in your performance measurement, which isn't optimal.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, thank you @vanzin

buf.writeBytes(writtenBuf);
writtenBuf.release();
long start = System.currentTimeMillis();
decoder.channelRead(ctx, buf);
long elapsedTime = System.currentTimeMillis() - start;
totalTime += elapsedTime;
writtenBytes += pieceBytes;
}
logger.info("Writing 300MiB frame buf with consolidation of threshold " + threshold
+ " took " + totalTime + " milis");
} finally {
for (ByteBuf buf : retained) {
release(buf);
}
}
}
long totalBytesGot = 0;
for (ByteBuf buf : retained) {
totalBytesGot += buf.capacity();
}
assertEquals(numMessages, retained.size());
assertEquals(targetBytes * numMessages, totalBytesGot);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean this test now requires 3GB of memory just to store the data it's checking?

That seems wasteful. Either change the test to do checks after each separate message is written, or lower the size of the messages.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

}
}

@Test
public void testInterception() throws Exception {
int interceptedReads = 3;
Expand Down