Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,24 @@ private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws IOExcept
// SPARK-24578: cap the sub-region's size of returned nio buffer to improve the performance
// for the case that the passed-in buffer has too many components.
int length = Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this limit needed now at all? I think your change was suggested as a future enhancement when reviewing SPARK-24578 but I never noticed a PR to implement it until yours...

@attilapiros

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin This is to avoid memory copy when writing a large ByteBuffer. You merged this actually: #12083

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that the code here is avoiding the copy by using nioBuffers() already, so that limit shouldn't be needed, right? Or maybe I'm missing something.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin The ByteBuffer here may be just a large ByteBuffer. See my comment here: #12083 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I re-read that discussion and you're right. I know this has been checked in, but the comment is now stale; the limit is there because of the behavior of the JRE code, not because of the composite buffer. It would be good to update it.

Copy link
Member

@dbtsai dbtsai Aug 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of my curiosity, how did we come out with number of NIO_BUFFER_LIMIT 256KB?

In Hadoop, they are using 8KB

For most OSes, in the write(ByteBuffer[]) API in sun.nio.ch.IOUtil, it goes one buffer at a time, and gets a temporary direct buffer from the BufferCache, up to a limit of IOUtil#IOV_MAX which is 1KB.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was only briefly discussed in the PR that Ryan linked above... the original code actually used 512k.

I think Hadoop's limit is a little low, but maybe 256k is a bit high. IIRC socket buffers are 32k by default on Linux, so it seems unlikely you'd be able to write 256k in one call (ignoring what IOUtil does internally). But maybe in practice it works ok.

If anyone has the time to test this out that would be great.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe @normanmaurer has some insight?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC socket buffers are 32k by default on Linux, so it seems unlikely you'd be able to write 256k in one call (ignoring what IOUtil does internally). But maybe in practice it works ok.

After reading the context in #12083 and this discussion, I want to provide a possibility about 256k in one call can work in practice. As in our scenario, user will change /proc/sys/net/core/wmem_default based on their online behavior, generally we'll set this value larger than wmem_default.
image
So maybe 256k of NIO_BUFFER_LIMIT is ok here? We just need add more annotation to remind what params related with this value.

ByteBuffer buffer = buf.nioBuffer(buf.readerIndex(), length);
int written = target.write(buffer);
// If the ByteBuf holds more then one ByteBuffer we should better call nioBuffers(...)
// to eliminate extra memory copies.
int written = 0;
if (buf.nioBufferCount() == 1) {
ByteBuffer buffer = buf.nioBuffer(buf.readerIndex(), length);
written = target.write(buffer);
} else {
ByteBuffer[] buffers = buf.nioBuffers(buf.readerIndex(), length);
for (ByteBuffer buffer: buffers) {
int remaining = buffer.remaining();
int w = target.write(buffer);
written += w;
Copy link
Member

@kiszk kiszk Aug 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we guarantee written does not cause overflow while we accumulate int values?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using Int in the current codebase, so this should not be a concern. Also, ByteBuf is using Int to index the byte; as a result, it's impossible to overflow written given they're from the same ByteBuf.

if (w < remaining) {
// Could not write all, we need to break now.
break;
}
}
}
buf.skipBytes(written);
return written;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.nio.channels.WritableByteChannel;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.spark.network.util.AbstractFileRegion;
import org.junit.Test;
Expand All @@ -48,7 +49,36 @@ public void testShortWrite() throws Exception {

@Test
public void testByteBufBody() throws Exception {
testByteBufBody(Unpooled.copyLong(42));
}

@Test
public void testCompositeByteBufBodySingleBuffer() throws Exception {
ByteBuf header = Unpooled.copyLong(42);
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
compositeByteBuf.addComponent(true, header);
assertEquals(1, compositeByteBuf.nioBufferCount());
testByteBufBody(compositeByteBuf);
}

@Test
public void testCompositeByteBufBodyMultipleBuffers() throws Exception {
ByteBuf header = Unpooled.copyLong(42);
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
compositeByteBuf.addComponent(true, header.retainedSlice(0, 4));
compositeByteBuf.addComponent(true, header.slice(4, 4));
assertEquals(2, compositeByteBuf.nioBufferCount());
testByteBufBody(compositeByteBuf);
}

/**
* Test writing a {@link MessageWithHeader} using the given {@link ByteBuf} as header.
*
* @param header the header to use.
* @throws Exception thrown on error.
*/
private void testByteBufBody(ByteBuf header) throws Exception {
long expectedHeaderValue = header.getLong(header.readerIndex());
ByteBuf bodyPassedToNettyManagedBuffer = Unpooled.copyLong(84);
assertEquals(1, header.refCnt());
assertEquals(1, bodyPassedToNettyManagedBuffer.refCnt());
Expand All @@ -61,7 +91,7 @@ public void testByteBufBody() throws Exception {
MessageWithHeader msg = new MessageWithHeader(managedBuf, header, body, managedBuf.size());
ByteBuf result = doWrite(msg, 1);
assertEquals(msg.count(), result.readableBytes());
assertEquals(42, result.readLong());
assertEquals(expectedHeaderValue, result.readLong());
assertEquals(84, result.readLong());

assertTrue(msg.release());
Expand Down