From fae181433ca1eda6be0ad450223d73c6eb5f3f35 Mon Sep 17 00:00:00 2001 From: WangJinhai02 Date: Thu, 26 Apr 2018 22:43:44 +0800 Subject: [PATCH 01/10] restore bytes limit value --- .../main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index 7367af7888bd8..abf4578e8473c 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -63,10 +63,12 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { */ def writeFully(channel: WritableByteChannel): Unit = { for (bytes <- getChunks()) { + val limit = bytes.limit() while (bytes.remaining() > 0) { val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize) bytes.limit(bytes.position() + ioSize) channel.write(bytes) + bytes.limit(limit) } } } From 623f26dec34ff11d9ef7311f6dcef9ba255446b8 Mon Sep 17 00:00:00 2001 From: WangJinhai02 Date: Fri, 27 Apr 2018 12:33:30 +0800 Subject: [PATCH 02/10] add ChunkedByteBufferSuite unit test --- .../scala/org/apache/spark/io/ChunkedByteBufferSuite.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala index 3b798e36b0499..b6c5feaf479a5 100644 --- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala @@ -56,6 +56,12 @@ class ChunkedByteBufferSuite extends SparkFunSuite { assert(chunkedByteBuffer.getChunks().head.position() === 0) } + test("writeFully() does not affect original buffer's position") { + val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(80*1024*1024))) + chunkedByteBuffer.writeFully(new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt)) + assert(chunkedByteBuffer.getChunks().head.position() === 0) + } + test("toArray()") { val empty = ByteBuffer.wrap(Array.empty[Byte]) val bytes = ByteBuffer.wrap(Array.tabulate(8)(_.toByte)) From c5851318d6c8f434e4656063d48362a5a5e6af19 Mon Sep 17 00:00:00 2001 From: WangJinhai02 Date: Fri, 27 Apr 2018 13:23:59 +0800 Subject: [PATCH 03/10] ChunkedByteBufferSuite improve unit test --- .../test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala index b6c5feaf479a5..100680f67c7bc 100644 --- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala @@ -56,7 +56,7 @@ class ChunkedByteBufferSuite extends SparkFunSuite { assert(chunkedByteBuffer.getChunks().head.position() === 0) } - test("writeFully() does not affect original buffer's position") { + test("writeFully() can write buffer which is larger than bufferWriteChunkSize correctly") { val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(80*1024*1024))) chunkedByteBuffer.writeFully(new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt)) assert(chunkedByteBuffer.getChunks().head.position() === 0) From a2a82f1a8431faf23e2be7c418d779bbf0fede76 Mon Sep 17 00:00:00 2001 From: WangJinhai02 Date: Fri, 27 Apr 2018 14:30:39 +0800 Subject: [PATCH 04/10] change assert size --- .../scala/org/apache/spark/io/ChunkedByteBufferSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala index 100680f67c7bc..a75577bff7bb3 100644 --- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala @@ -57,9 +57,9 @@ class ChunkedByteBufferSuite extends SparkFunSuite { } test("writeFully() can write buffer which is larger than bufferWriteChunkSize correctly") { - val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(80*1024*1024))) + val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(80 * 1024 * 1024))) chunkedByteBuffer.writeFully(new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt)) - assert(chunkedByteBuffer.getChunks().head.position() === 0) + assert(chunkedByteBuffer.size === (80L * 1024L * 1024L)) } test("toArray()") { From 217ec9d03e83d4431b799285204b5a0abd09ea1d Mon Sep 17 00:00:00 2001 From: WangJinhai02 Date: Fri, 27 Apr 2018 14:37:37 +0800 Subject: [PATCH 05/10] add SPARK-24107: --- .../test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala index a75577bff7bb3..af9e96d3e3bd8 100644 --- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala @@ -56,7 +56,7 @@ class ChunkedByteBufferSuite extends SparkFunSuite { assert(chunkedByteBuffer.getChunks().head.position() === 0) } - test("writeFully() can write buffer which is larger than bufferWriteChunkSize correctly") { + test("SPARK-24107: writeFully() write buffer which is larger than bufferWriteChunkSize") { val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(80 * 1024 * 1024))) chunkedByteBuffer.writeFully(new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt)) assert(chunkedByteBuffer.size === (80L * 1024L * 1024L)) From 2bc19a3594353b18c6932240eb74776ab72346c6 Mon Sep 17 00:00:00 2001 From: WangJinhai02 Date: Fri, 27 Apr 2018 15:30:44 +0800 Subject: [PATCH 06/10] change ByteArrayWritableChannel's size --- .../scala/org/apache/spark/io/ChunkedByteBufferSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala index af9e96d3e3bd8..dcf666f53934f 100644 --- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala @@ -58,8 +58,9 @@ class ChunkedByteBufferSuite extends SparkFunSuite { test("SPARK-24107: writeFully() write buffer which is larger than bufferWriteChunkSize") { val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(80 * 1024 * 1024))) - chunkedByteBuffer.writeFully(new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt)) - assert(chunkedByteBuffer.size === (80L * 1024L * 1024L)) + val byteArrayWritableChannel = new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt) + chunkedByteBuffer.writeFully(byteArrayWritableChannel) + assert(byteArrayWritableChannel.length() === chunkedByteBuffer.size) } test("toArray()") { From c9a6816cbe9ef22c59df648b6dc4f651765ea1a8 Mon Sep 17 00:00:00 2001 From: WangJinhai02 Date: Fri, 27 Apr 2018 20:05:32 +0800 Subject: [PATCH 07/10] add configure bufferWriteChunkSize --- .../org/apache/spark/util/io/ChunkedByteBuffer.scala | 6 +++--- .../org/apache/spark/io/ChunkedByteBufferSuite.scala | 8 +++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index abf4578e8473c..d500634814dc7 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -63,12 +63,12 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { */ def writeFully(channel: WritableByteChannel): Unit = { for (bytes <- getChunks()) { - val limit = bytes.limit() - while (bytes.remaining() > 0) { + val curChunkLimit = bytes.limit() + while (bytes.hasRemaining) { val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize) bytes.limit(bytes.position() + ioSize) channel.write(bytes) - bytes.limit(limit) + bytes.limit(curChunkLimit) } } } diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala index dcf666f53934f..1720cf482352c 100644 --- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala @@ -20,8 +20,8 @@ package org.apache.spark.io import java.nio.ByteBuffer import com.google.common.io.ByteStreams - -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkEnv, SparkFunSuite} +import org.apache.spark.internal.config import org.apache.spark.network.util.ByteArrayWritableChannel import org.apache.spark.util.io.ChunkedByteBuffer @@ -57,7 +57,9 @@ class ChunkedByteBufferSuite extends SparkFunSuite { } test("SPARK-24107: writeFully() write buffer which is larger than bufferWriteChunkSize") { - val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(80 * 1024 * 1024))) + val bufferWriteChunkSize = Option(SparkEnv.get).map(_.conf.get(config.BUFFER_WRITE_CHUNK_SIZE)) + .getOrElse(config.BUFFER_WRITE_CHUNK_SIZE.defaultValue.get).toInt + val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(bufferWriteChunkSize + 8))) val byteArrayWritableChannel = new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt) chunkedByteBuffer.writeFully(byteArrayWritableChannel) assert(byteArrayWritableChannel.length() === chunkedByteBuffer.size) From fa99a1985453c6bf00edc5b956d06548e33ed450 Mon Sep 17 00:00:00 2001 From: WangJinhai02 Date: Fri, 27 Apr 2018 22:30:31 +0800 Subject: [PATCH 08/10] sc.conf.set spark.buffer.write.chunkSize --- .../spark/io/ChunkedByteBufferSuite.scala | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala index 1720cf482352c..b8f0b469c7963 100644 --- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala @@ -20,12 +20,12 @@ package org.apache.spark.io import java.nio.ByteBuffer import com.google.common.io.ByteStreams -import org.apache.spark.{SparkEnv, SparkFunSuite} +import org.apache.spark.{SparkFunSuite, SharedSparkContext} import org.apache.spark.internal.config import org.apache.spark.network.util.ByteArrayWritableChannel import org.apache.spark.util.io.ChunkedByteBuffer -class ChunkedByteBufferSuite extends SparkFunSuite { +class ChunkedByteBufferSuite extends SparkFunSuite with SharedSparkContext { test("no chunks") { val emptyChunkedByteBuffer = new ChunkedByteBuffer(Array.empty[ByteBuffer]) @@ -57,12 +57,15 @@ class ChunkedByteBufferSuite extends SparkFunSuite { } test("SPARK-24107: writeFully() write buffer which is larger than bufferWriteChunkSize") { - val bufferWriteChunkSize = Option(SparkEnv.get).map(_.conf.get(config.BUFFER_WRITE_CHUNK_SIZE)) - .getOrElse(config.BUFFER_WRITE_CHUNK_SIZE.defaultValue.get).toInt - val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(bufferWriteChunkSize + 8))) - val byteArrayWritableChannel = new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt) - chunkedByteBuffer.writeFully(byteArrayWritableChannel) - assert(byteArrayWritableChannel.length() === chunkedByteBuffer.size) + try { + sc.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 32L * 1024L * 1024L) + val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(40 * 1024 * 1024))) + val byteArrayWritableChannel = new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt) + chunkedByteBuffer.writeFully(byteArrayWritableChannel) + assert(byteArrayWritableChannel.length() === chunkedByteBuffer.size) + } finally { + sc.conf.remove(config.BUFFER_WRITE_CHUNK_SIZE) + } } test("toArray()") { From fb527c87a1f4ddb05eb601038736aeb4ec3f7223 Mon Sep 17 00:00:00 2001 From: WangJinhai02 Date: Fri, 27 Apr 2018 23:48:21 +0800 Subject: [PATCH 09/10] add try finally --- .../org/apache/spark/util/io/ChunkedByteBuffer.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index d500634814dc7..3ae8dfcc1cb66 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -65,10 +65,13 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { for (bytes <- getChunks()) { val curChunkLimit = bytes.limit() while (bytes.hasRemaining) { - val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize) - bytes.limit(bytes.position() + ioSize) - channel.write(bytes) - bytes.limit(curChunkLimit) + try { + val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize) + bytes.limit(bytes.position() + ioSize) + channel.write(bytes) + } finally { + bytes.limit(curChunkLimit) + } } } } From e78ef396a571b26870bcc9326524fb8881e293dd Mon Sep 17 00:00:00 2001 From: WangJinhai02 Date: Wed, 2 May 2018 15:45:37 +0800 Subject: [PATCH 10/10] fixed the style --- .../scala/org/apache/spark/io/ChunkedByteBufferSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala index b8f0b469c7963..2107559572d78 100644 --- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala @@ -20,7 +20,8 @@ package org.apache.spark.io import java.nio.ByteBuffer import com.google.common.io.ByteStreams -import org.apache.spark.{SparkFunSuite, SharedSparkContext} + +import org.apache.spark.{SharedSparkContext, SparkFunSuite} import org.apache.spark.internal.config import org.apache.spark.network.util.ByteArrayWritableChannel import org.apache.spark.util.io.ChunkedByteBuffer