1818package org .apache .spark .network .protocol ;
1919
2020import java .io .IOException ;
21+ import java .nio .ByteBuffer ;
2122import java .nio .channels .WritableByteChannel ;
2223import javax .annotation .Nullable ;
2324
@@ -43,6 +44,14 @@ class MessageWithHeader extends AbstractReferenceCounted implements FileRegion {
4344 private final long bodyLength ;
4445 private long totalBytesTransferred ;
4546
47+ /**
48+ * When the write buffer size is larger than this limit, I/O will be done in chunks of this size.
49+ * The size should not be too large as it will waste underlying memory copy. e.g. If network
50+ * avaliable buffer is smaller than this limit, the data cannot be sent within one single write
51+ * operation while it still will make memory copy with this size.
52+ */
53+ private static final int NIO_BUFFER_LIMIT = 512 * 1024 ;
54+
4655 /**
4756 * Construct a new MessageWithHeader.
4857 *
@@ -128,8 +137,27 @@ protected void deallocate() {
128137 }
129138
130139 private int copyByteBuf (ByteBuf buf , WritableByteChannel target ) throws IOException {
131- int written = target .write (buf .nioBuffer ());
140+ ByteBuffer buffer = buf .nioBuffer ();
141+ int written = (buffer .remaining () <= NIO_BUFFER_LIMIT ) ?
142+ target .write (buffer ) : writeNioBuffer (target , buffer );
132143 buf .skipBytes (written );
133144 return written ;
134145 }
146+
147+ private int writeNioBuffer (
148+ WritableByteChannel writeCh ,
149+ ByteBuffer buf ) throws IOException {
150+ int originalLimit = buf .limit ();
151+ int ret = 0 ;
152+
153+ try {
154+ int ioSize = Math .min (buf .remaining (), NIO_BUFFER_LIMIT );
155+ buf .limit (buf .position () + ioSize );
156+ ret = writeCh .write (buf );
157+ } finally {
158+ buf .limit (originalLimit );
159+ }
160+
161+ return ret ;
162+ }
135163}
0 commit comments