Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,30 +137,15 @@ protected void deallocate() {
}

private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws IOException {
ByteBuffer buffer = buf.nioBuffer();
int written = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
target.write(buffer) : writeNioBuffer(target, buffer);
// SPARK-24578: cap the sub-region's size of returned nio buffer to improve the performance
// for the case that the passed-in buffer has too many components.
int length = Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT);
ByteBuffer buffer = buf.nioBuffer(buf.readerIndex(), length);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can go one step further here, and call buf.nioBuffers(int, int) (plural)
https://github.com/netty/netty/blob/4.1/buffer/src/main/java/io/netty/buffer/ByteBuf.java#L2355

that will avoid the copying required to create the merged buffer (though its a bit complicated as you have to check for incomplete writes from any single target.write() call).

Also OK to leave this for now as this is a pretty important fix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will make a follow up PR to address this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not do this in this PR since this is a small change and we don't have a new release recently?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this pr is fixing a pretty serious issue, we know Wenbo is going to roll this out immediately, and I suspect even more users will. this fix is also "obviously correct" -- the followup here is not super complicated, but also will be more prone to bugs. So I'm inclined to just get this in.

anyway if @WenboZhao can do the other part today, then sure, but I think we should get this in quickly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @squito and @zsxwing. I would prefer to do it in a different PR with more careful benchmark and testing. As @squito, that change is more prone to bugs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough.

I just spent several minutes to write the following codes:

  private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws IOException {
    // SPARK-24578: cap the sub-region's size of returned nio buffer to improve the performance
    // for the case that the passed-in buffer has too many components.
    int length = Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT);
    ByteBuffer[] buffers = buf.nioBuffers(buf.readerIndex(), length);
    int totalWritten = 0;
    for (ByteBuffer buffer : buffers) {
      int remaining = buffer.remaining();
      int written = target.write(buffer);
      totalWritten += written;
      if (written < remaining) {
        break;
      }
    }
    buf.skipBytes(totalWritten);
    return totalWritten;
  }

Feel free to use them in your follow up PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WenboZhao did you ever follow up on this, or at least file another jira for it? sorry if I missed it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for anybody watching this, eventually SPARK-25115 was opened (currently has a PR)

int written = target.write(buffer);
buf.skipBytes(written);
return written;
}

private int writeNioBuffer(
WritableByteChannel writeCh,
ByteBuffer buf) throws IOException {
int originalLimit = buf.limit();
int ret = 0;

try {
int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
buf.limit(buf.position() + ioSize);
ret = writeCh.write(buf);
} finally {
buf.limit(originalLimit);
}

return ret;
}

@Override
public MessageWithHeader touch(Object o) {
super.touch(o);
Expand Down