From 63ca85a5548858b4fe46a4ade062776cb6747cba Mon Sep 17 00:00:00 2001 From: "Zhang, Liye" Date: Thu, 31 Mar 2016 17:44:41 +0800 Subject: [PATCH 1/2] spark-14290 avoid significant memory copy in netty's transferTo --- .../network/protocol/MessageWithHeader.java | 30 ++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java index 66227f96a1a2..f16d3ac3ad43 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java +++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java @@ -18,6 +18,7 @@ package org.apache.spark.network.protocol; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; import javax.annotation.Nullable; @@ -43,6 +44,14 @@ class MessageWithHeader extends AbstractReferenceCounted implements FileRegion { private final long bodyLength; private long totalBytesTransferred; + /** + * When the write buffer size is larger than this limit, I/O will be done in chunks of this size. + * The size should not be too large as it will waste underlying memory copy. e.g. If network + * avaliable buffer is smaller than this limit, the data cannot be sent within one single write + * operation while it still will make memory copy with this size. + */ + private static final int NIO_BUFFER_LIMIT = 512 * 1024; + /** * Construct a new MessageWithHeader. * @@ -128,8 +137,27 @@ protected void deallocate() { } private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws IOException { - int written = target.write(buf.nioBuffer()); + ByteBuffer buffer = buf.nioBuffer(); + int written = (buffer.remaining() <= NIO_BUFFER_LIMIT) ? + target.write(buffer) : writeNioBuffer(target, buffer); buf.skipBytes(written); return written; } + + private int writeNioBuffer( + WritableByteChannel writeCh, + ByteBuffer buf) throws IOException { + int originalLimit = buf.limit(); + int ret = 0; + + try { + int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT); + buf.limit(buf.position() + ioSize); + ret = writeCh.write(buf); + } finally { + buf.limit(originalLimit); + } + + return ret; + } } From a793696fc0ba3bc14ccb403c6ddc11021de8b34b Mon Sep 17 00:00:00 2001 From: "Zhang, Liye" Date: Wed, 6 Apr 2016 10:55:53 +0800 Subject: [PATCH 2/2] change the NIO buffer size limit from 512K to 256K --- .../org/apache/spark/network/protocol/MessageWithHeader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java index f16d3ac3ad43..4f8781b42a0e 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java +++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java @@ -50,7 +50,7 @@ class MessageWithHeader extends AbstractReferenceCounted implements FileRegion { * avaliable buffer is smaller than this limit, the data cannot be sent within one single write * operation while it still will make memory copy with this size. */ - private static final int NIO_BUFFER_LIMIT = 512 * 1024; + private static final int NIO_BUFFER_LIMIT = 256 * 1024; /** * Construct a new MessageWithHeader.