From 8908585c3029de005e9816d711b0d7ee86398a12 Mon Sep 17 00:00:00 2001 From: "Zhang, Liye" Date: Tue, 29 Mar 2016 23:04:36 +0800 Subject: [PATCH 1/3] spark-14242 avoid using compositeBuffer for frame decoder --- .../spark/network/util/TransportFrameDecoder.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java index bd1830e6abc8..b2234b81ca0d 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java @@ -139,14 +139,18 @@ private ByteBuf decodeNext() throws Exception { return nextBufferForFrame(remaining); } - // Otherwise, create a composite buffer. - CompositeByteBuf frame = buffers.getFirst().alloc().compositeBuffer(); + // Otherwise, create a new buffer to hold the data of all frame splits. + ByteBuf frame = buffers.getFirst().alloc().buffer(remaining, remaining); while (remaining > 0) { ByteBuf next = nextBufferForFrame(remaining); remaining -= next.readableBytes(); - frame.addComponent(next).writerIndex(frame.writerIndex() + next.readableBytes()); + frame.setBytes(frame.writerIndex(), next, next.readerIndex(), next.readableBytes()); + frame.writerIndex(frame.writerIndex() + next.readableBytes()); + // buffer is no longer referenced, so release it. + next.release(); } assert remaining == 0; + assert frame.readableBytes() == (int) frameSize; return frame; } From 1eacf554427dcb8b7a9594bdf5428940969d034a Mon Sep 17 00:00:00 2001 From: "Zhang, Liye" Date: Wed, 30 Mar 2016 12:27:02 +0800 Subject: [PATCH 2/3] use compositeByteBuf.addComponents to for netty frame buffer --- .../spark/network/util/TransportFrameDecoder.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java index b2234b81ca0d..a457ad9220b0 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java @@ -139,18 +139,17 @@ private ByteBuf decodeNext() throws Exception { return nextBufferForFrame(remaining); } - // Otherwise, create a new buffer to hold the data of all frame splits. - ByteBuf frame = buffers.getFirst().alloc().buffer(remaining, remaining); + // Otherwise, create a composite buffer. + CompositeByteBuf frame = buffers.getFirst().alloc().compositeBuffer(); + LinkedList frameBuffers = new LinkedList<>(); while (remaining > 0) { ByteBuf next = nextBufferForFrame(remaining); + frameBuffers.add(next); remaining -= next.readableBytes(); - frame.setBytes(frame.writerIndex(), next, next.readerIndex(), next.readableBytes()); - frame.writerIndex(frame.writerIndex() + next.readableBytes()); - // buffer is no longer referenced, so release it. - next.release(); } + frame.addComponents(frameBuffers).writerIndex(frame.writerIndex() + (int) frameSize); + frameBuffers.clear(); assert remaining == 0; - assert frame.readableBytes() == (int) frameSize; return frame; } From 80f75737d6dd25fa859b06e876f16f21d5b6b247 Mon Sep 17 00:00:00 2001 From: "Zhang, Liye" Date: Thu, 31 Mar 2016 10:12:05 +0800 Subject: [PATCH 3/3] only set maxNumComponents for compositeByteBuffer to solve the issue --- .../apache/spark/network/util/TransportFrameDecoder.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java index a457ad9220b0..fcec7dfd0c21 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java @@ -140,15 +140,12 @@ private ByteBuf decodeNext() throws Exception { } // Otherwise, create a composite buffer. - CompositeByteBuf frame = buffers.getFirst().alloc().compositeBuffer(); - LinkedList frameBuffers = new LinkedList<>(); + CompositeByteBuf frame = buffers.getFirst().alloc().compositeBuffer(Integer.MAX_VALUE); while (remaining > 0) { ByteBuf next = nextBufferForFrame(remaining); - frameBuffers.add(next); remaining -= next.readableBytes(); + frame.addComponent(next).writerIndex(frame.writerIndex() + next.readableBytes()); } - frame.addComponents(frameBuffers).writerIndex(frame.writerIndex() + (int) frameSize); - frameBuffers.clear(); assert remaining == 0; return frame; }