2121import java .nio .channels .WritableByteChannel ;
2222
2323import com .google .common .base .Preconditions ;
24- import com .google .common .primitives .Ints ;
2524import io .netty .buffer .ByteBuf ;
2625import io .netty .channel .FileRegion ;
2726import io .netty .util .AbstractReferenceCounted ;
2827import io .netty .util .ReferenceCountUtil ;
2928
3029/**
31- * A wrapper message that holds two separate pieces (a header and a body) to avoid
32- * copying the body's content.
30+ * A wrapper message that holds two separate pieces (a header and a body).
31+ *
32+ * The header must be a ByteBuf, while the body can be a ByteBuf or a FileRegion.
3333 */
3434class MessageWithHeader extends AbstractReferenceCounted implements FileRegion {
3535
@@ -63,32 +63,36 @@ public long transfered() {
6363 return totalBytesTransferred ;
6464 }
6565
66+ /**
67+ * This code is more complicated than you would think because we might require multiple
68+ * transferTo invocations in order to transfer a single MessageWithHeader to avoid busy waiting.
69+ *
70+ * The contract is that the caller will ensure position is properly set to the total number
71+ * of bytes transferred so far (i.e. value returned by transfered()).
72+ */
6673 @ Override
67- public long transferTo (WritableByteChannel target , long position ) throws IOException {
74+ public long transferTo (final WritableByteChannel target , final long position ) throws IOException {
6875 Preconditions .checkArgument (position == totalBytesTransferred , "Invalid position." );
69- long written = 0 ;
70-
71- if (position < headerLength ) {
72- written += copyByteBuf (header , target );
76+ // Bytes written for header in this call.
77+ long writtenHeader = 0 ;
78+ if (header .readableBytes () > 0 ) {
79+ writtenHeader = copyByteBuf (header , target );
80+ totalBytesTransferred += writtenHeader ;
7381 if (header .readableBytes () > 0 ) {
74- totalBytesTransferred += written ;
75- return written ;
82+ return writtenHeader ;
7683 }
7784 }
7885
86+ // Bytes written for body in this call.
87+ long writtenBody = 0 ;
7988 if (body instanceof FileRegion ) {
80- // Adjust the position. If the write is happening as part of the same call where the header
81- // (or some part of it) is written, `position` will be less than the header size, so we want
82- // to start from position 0 in the FileRegion object. Otherwise, we start from the position
83- // requested by the caller.
84- long bodyPos = position > headerLength ? position - headerLength : 0 ;
85- written += ((FileRegion )body ).transferTo (target , bodyPos );
89+ writtenBody = ((FileRegion ) body ).transferTo (target , totalBytesTransferred - headerLength );
8690 } else if (body instanceof ByteBuf ) {
87- written + = copyByteBuf ((ByteBuf ) body , target );
91+ writtenBody = copyByteBuf ((ByteBuf ) body , target );
8892 }
93+ totalBytesTransferred += writtenBody ;
8994
90- totalBytesTransferred += written ;
91- return written ;
95+ return writtenHeader + writtenBody ;
9296 }
9397
9498 @ Override
@@ -102,5 +106,4 @@ private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws IOExcept
102106 buf .skipBytes (written );
103107 return written ;
104108 }
105-
106109}
0 commit comments