From 9e37e7c4d1e495da91f68e912636dcf865691e39 Mon Sep 17 00:00:00 2001 From: "Zhang, Liye" Date: Mon, 11 Apr 2016 16:05:44 +0800 Subject: [PATCH] SPARK-14290/SPARK-13352 avoid significant memory copy in Netty's transferTo --- .../network/protocol/MessageWithHeader.java | 30 ++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java b/network/common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java index d686a951467cf..44403eede4d59 100644 --- a/network/common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java +++ b/network/common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java @@ -18,6 +18,7 @@ package org.apache.spark.network.protocol; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; import com.google.common.base.Preconditions; @@ -39,6 +40,14 @@ class MessageWithHeader extends AbstractReferenceCounted implements FileRegion { private final long bodyLength; private long totalBytesTransferred; + /** + * When the write buffer size is larger than this limit, I/O will be done in chunks of this size. + * The size should not be too large as it will waste underlying memory copy. e.g. If network + * available buffer is smaller than this limit, the data cannot be sent within one single write + * operation while it still will make memory copy with this size. + */ + private static final int NIO_BUFFER_LIMIT = 256 * 1024; + MessageWithHeader(ByteBuf header, Object body, long bodyLength) { Preconditions.checkArgument(body instanceof ByteBuf || body instanceof FileRegion, "Body must be a ByteBuf or a FileRegion."); @@ -102,8 +111,27 @@ protected void deallocate() { } private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws IOException { - int written = target.write(buf.nioBuffer()); + ByteBuffer buffer = buf.nioBuffer(); + int written = (buffer.remaining() <= NIO_BUFFER_LIMIT) ? + target.write(buffer) : writeNioBuffer(target, buffer); buf.skipBytes(written); return written; } + + private int writeNioBuffer( + WritableByteChannel writeCh, + ByteBuffer buf) throws IOException { + int originalLimit = buf.limit(); + int ret = 0; + + try { + int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT); + buf.limit(buf.position() + ioSize); + ret = writeCh.write(buf); + } finally { + buf.limit(originalLimit); + } + + return ret; + } }