From 7cbadc5e367a045dd70af4c85e4c17fd0ac3cba7 Mon Sep 17 00:00:00 2001 From: zhoukang Date: Tue, 25 Jul 2017 17:44:46 +0800 Subject: [PATCH 01/19] [SPARK][CORE] Slice write by channel --- .../org/apache/spark/storage/DiskStore.scala | 2 +- .../apache/spark/util/io/ChunkedByteBuffer.scala | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index c6656341fcd15..2133276401e33 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -93,7 +93,7 @@ private[spark] class DiskStore( def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = { put(blockId) { channel => - bytes.writeFully(channel) + bytes.writeWithSlice(channel) } } diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index 2f905c8af0f63..b6d082bf41102 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -40,6 +40,8 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { require(chunks != null, "chunks must not be null") require(chunks.forall(_.position() == 0), "chunks' positions must be 0") + private val NIO_BUFFER_LIMIT = 64 * 1024 * 1024 // Chunk size in bytes + private[this] var disposed: Boolean = false /** @@ -62,6 +64,19 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { } } + /** + * Write this buffer to a channel with slice. + */ + def writeWithSlice(channel: WritableByteChannel): Unit = { + for (bytes <- getChunks()) { + val capacity = bytes.limit() + while (bytes.position() < capacity) { + bytes.limit(Math.min(capacity, bytes.position + NIO_BUFFER_LIMIT.toInt)) + channel.write(bytes) + } + } + } + /** * Wrap this buffer to view it as a Netty ByteBuf. */ From fc91f96085b4706e09e1e5adccf017b22ffe062b Mon Sep 17 00:00:00 2001 From: zhoukang Date: Wed, 23 Aug 2017 13:53:20 +0800 Subject: [PATCH 02/19] Fix code style --- .../scala/org/apache/spark/util/io/ChunkedByteBuffer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index b6d082bf41102..def7e8b6174b9 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -65,8 +65,8 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { } /** - * Write this buffer to a channel with slice. - */ + * Write this buffer to a channel with slice. + */ def writeWithSlice(channel: WritableByteChannel): Unit = { for (bytes <- getChunks()) { val capacity = bytes.limit() From bab91db933947b57159b21e5f6506570b6b721cb Mon Sep 17 00:00:00 2001 From: zhoukang Date: Thu, 24 Aug 2017 09:59:37 +0800 Subject: [PATCH 03/19] Refine to avoid underflow --- .../scala/org/apache/spark/util/io/ChunkedByteBuffer.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index def7e8b6174b9..d4eeb5b9887d0 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -71,7 +71,8 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { for (bytes <- getChunks()) { val capacity = bytes.limit() while (bytes.position() < capacity) { - bytes.limit(Math.min(capacity, bytes.position + NIO_BUFFER_LIMIT.toInt)) + val ioSize = Math.min(bytes.remaining(), NIO_BUFFER_LIMIT.toInt) + bytes.limit(bytes.position + ioSize) channel.write(bytes) } } From 72aef679b498bb042ecb9ffa8df62ed41e1f519d Mon Sep 17 00:00:00 2001 From: zhoukang Date: Thu, 24 Aug 2017 11:12:15 +0800 Subject: [PATCH 04/19] Fix no ending loop --- .../spark/util/io/ChunkedByteBuffer.scala | 2 +- .../spark/io/ChunkedByteBufferSuite.scala | 17 +++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index d4eeb5b9887d0..8122e18a8fb07 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -70,7 +70,7 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { def writeWithSlice(channel: WritableByteChannel): Unit = { for (bytes <- getChunks()) { val capacity = bytes.limit() - while (bytes.position() < capacity) { + while (bytes.position() < capacity && bytes.remaining() > 0) { val ioSize = Math.min(bytes.remaining(), NIO_BUFFER_LIMIT.toInt) bytes.limit(bytes.position + ioSize) channel.write(bytes) diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala index 3b798e36b0499..dcaf7674050e1 100644 --- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala @@ -56,6 +56,23 @@ class ChunkedByteBufferSuite extends SparkFunSuite { assert(chunkedByteBuffer.getChunks().head.position() === 0) } + test("benchmark") { + val buffer100 = ByteBuffer.allocate(1024 * 1024 * 100) + val buffer30 = ByteBuffer.allocate(1024 * 1024 * 30) + val chunkedByteBuffer = new ChunkedByteBuffer(Array.fill(5)(buffer100)) + var starTime = System.currentTimeMillis() + for (i <- 1 to 10) { + chunkedByteBuffer.writeFully(new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt)) + } + // scalastyle:off + System.out.println(System.currentTimeMillis() - starTime) + starTime = System.currentTimeMillis() + for (i <- 1 to 10) { + chunkedByteBuffer.writeWithSlice(new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt)) + } + System.out.println(System.currentTimeMillis() - starTime) + } + test("toArray()") { val empty = ByteBuffer.wrap(Array.empty[Byte]) val bytes = ByteBuffer.wrap(Array.tabulate(8)(_.toByte)) From 478977293aadb9383740eabbaee23a43cc64b062 Mon Sep 17 00:00:00 2001 From: zhoukang Date: Thu, 24 Aug 2017 11:46:36 +0800 Subject: [PATCH 05/19] Remove useless unit test --- .../spark/io/ChunkedByteBufferSuite.scala | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala index dcaf7674050e1..74c2e0e04e164 100644 --- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala @@ -20,9 +20,9 @@ package org.apache.spark.io import java.nio.ByteBuffer import com.google.common.io.ByteStreams - import org.apache.spark.SparkFunSuite import org.apache.spark.network.util.ByteArrayWritableChannel +import org.apache.spark.util.Utils import org.apache.spark.util.io.ChunkedByteBuffer class ChunkedByteBufferSuite extends SparkFunSuite { @@ -56,23 +56,6 @@ class ChunkedByteBufferSuite extends SparkFunSuite { assert(chunkedByteBuffer.getChunks().head.position() === 0) } - test("benchmark") { - val buffer100 = ByteBuffer.allocate(1024 * 1024 * 100) - val buffer30 = ByteBuffer.allocate(1024 * 1024 * 30) - val chunkedByteBuffer = new ChunkedByteBuffer(Array.fill(5)(buffer100)) - var starTime = System.currentTimeMillis() - for (i <- 1 to 10) { - chunkedByteBuffer.writeFully(new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt)) - } - // scalastyle:off - System.out.println(System.currentTimeMillis() - starTime) - starTime = System.currentTimeMillis() - for (i <- 1 to 10) { - chunkedByteBuffer.writeWithSlice(new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt)) - } - System.out.println(System.currentTimeMillis() - starTime) - } - test("toArray()") { val empty = ByteBuffer.wrap(Array.empty[Byte]) val bytes = ByteBuffer.wrap(Array.tabulate(8)(_.toByte)) From aeabe1d1aacf5abf58d631bc291dd409728b5569 Mon Sep 17 00:00:00 2001 From: zhoukang Date: Thu, 24 Aug 2017 11:57:09 +0800 Subject: [PATCH 06/19] Delete unused package --- .../test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala index 74c2e0e04e164..3b798e36b0499 100644 --- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala @@ -20,9 +20,9 @@ package org.apache.spark.io import java.nio.ByteBuffer import com.google.common.io.ByteStreams + import org.apache.spark.SparkFunSuite import org.apache.spark.network.util.ByteArrayWritableChannel -import org.apache.spark.util.Utils import org.apache.spark.util.io.ChunkedByteBuffer class ChunkedByteBufferSuite extends SparkFunSuite { From ca77b51549f8be32f14565ac5ea94a8b83386afd Mon Sep 17 00:00:00 2001 From: zhoukang Date: Thu, 24 Aug 2017 16:57:23 +0800 Subject: [PATCH 07/19] Add benchmark testing with Utils.BenchMark --- .../spark/io/ChunkedByteBufferSuite.scala | 29 ++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala index 3b798e36b0499..e748df44382d1 100644 --- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala @@ -20,9 +20,9 @@ package org.apache.spark.io import java.nio.ByteBuffer import com.google.common.io.ByteStreams - import org.apache.spark.SparkFunSuite import org.apache.spark.network.util.ByteArrayWritableChannel +import org.apache.spark.util.Benchmark import org.apache.spark.util.io.ChunkedByteBuffer class ChunkedByteBufferSuite extends SparkFunSuite { @@ -56,6 +56,33 @@ class ChunkedByteBufferSuite extends SparkFunSuite { assert(chunkedByteBuffer.getChunks().head.position() === 0) } + test("benchmark testing for writeWithSlice()and writeFully()") { + val benchmark = new Benchmark("Benchmark writeWithSlice() and writeFully()", 1024 * 1024 * 15) + val buffer100 = ByteBuffer.allocate(1024 * 1024 * 100) + val buffer30 = ByteBuffer.allocate(1024 * 1024 * 30) + val chunkedByteBuffer30m = new ChunkedByteBuffer(Array.fill(10)(buffer30)) + val chunkedByteBuffer100m = new ChunkedByteBuffer(Array.fill(10)(buffer100)) + + benchmark.addCase("Test writeFully() chunks each with 30m for 10 loop", 10) { _ => + chunkedByteBuffer30m.writeFully( + new ByteArrayWritableChannel(chunkedByteBuffer30m.size.toInt)) + } + benchmark.addCase("Test writeWithSlice() chunks each with 30m for 10 loop", 10) { _ => + chunkedByteBuffer30m.writeWithSlice( + new ByteArrayWritableChannel(chunkedByteBuffer30m.size.toInt)) + } + + benchmark.addCase("Test writeFully() chunks each with 100m for 50 loop", 50) { _ => + chunkedByteBuffer30m.writeFully( + new ByteArrayWritableChannel(chunkedByteBuffer30m.size.toInt)) + } + benchmark.addCase("Test writeWithSlice() chunks each with 100m for 50 loop", 50) { _ => + chunkedByteBuffer30m.writeWithSlice( + new ByteArrayWritableChannel(chunkedByteBuffer30m.size.toInt)) + } + benchmark.run() + } + test("toArray()") { val empty = ByteBuffer.wrap(Array.empty[Byte]) val bytes = ByteBuffer.wrap(Array.tabulate(8)(_.toByte)) From e84a6d7862b60fdabf7a463f08e911baf74d7e62 Mon Sep 17 00:00:00 2001 From: zhoukang Date: Thu, 24 Aug 2017 16:59:58 +0800 Subject: [PATCH 08/19] Refine code style --- .../test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala index e748df44382d1..ecd342e8de476 100644 --- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.io import java.nio.ByteBuffer import com.google.common.io.ByteStreams + import org.apache.spark.SparkFunSuite import org.apache.spark.network.util.ByteArrayWritableChannel import org.apache.spark.util.Benchmark From d7081428f145f94283304184ad14d2144077b04d Mon Sep 17 00:00:00 2001 From: zhoukang Date: Thu, 24 Aug 2017 21:56:04 +0800 Subject: [PATCH 09/19] Update as comment replace writeFully with slice logic --- .../org/apache/spark/storage/DiskStore.scala | 2 +- .../spark/util/io/ChunkedByteBuffer.scala | 11 -------- .../spark/io/ChunkedByteBufferSuite.scala | 27 ------------------- 3 files changed, 1 insertion(+), 39 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 2133276401e33..c6656341fcd15 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -93,7 +93,7 @@ private[spark] class DiskStore( def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = { put(blockId) { channel => - bytes.writeWithSlice(channel) + bytes.writeFully(channel) } } diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index 8122e18a8fb07..d50a42db5d095 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -57,17 +57,6 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { * Write this buffer to a channel. */ def writeFully(channel: WritableByteChannel): Unit = { - for (bytes <- getChunks()) { - while (bytes.remaining > 0) { - channel.write(bytes) - } - } - } - - /** - * Write this buffer to a channel with slice. - */ - def writeWithSlice(channel: WritableByteChannel): Unit = { for (bytes <- getChunks()) { val capacity = bytes.limit() while (bytes.position() < capacity && bytes.remaining() > 0) { diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala index ecd342e8de476..273b48bfa5419 100644 --- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala @@ -57,33 +57,6 @@ class ChunkedByteBufferSuite extends SparkFunSuite { assert(chunkedByteBuffer.getChunks().head.position() === 0) } - test("benchmark testing for writeWithSlice()and writeFully()") { - val benchmark = new Benchmark("Benchmark writeWithSlice() and writeFully()", 1024 * 1024 * 15) - val buffer100 = ByteBuffer.allocate(1024 * 1024 * 100) - val buffer30 = ByteBuffer.allocate(1024 * 1024 * 30) - val chunkedByteBuffer30m = new ChunkedByteBuffer(Array.fill(10)(buffer30)) - val chunkedByteBuffer100m = new ChunkedByteBuffer(Array.fill(10)(buffer100)) - - benchmark.addCase("Test writeFully() chunks each with 30m for 10 loop", 10) { _ => - chunkedByteBuffer30m.writeFully( - new ByteArrayWritableChannel(chunkedByteBuffer30m.size.toInt)) - } - benchmark.addCase("Test writeWithSlice() chunks each with 30m for 10 loop", 10) { _ => - chunkedByteBuffer30m.writeWithSlice( - new ByteArrayWritableChannel(chunkedByteBuffer30m.size.toInt)) - } - - benchmark.addCase("Test writeFully() chunks each with 100m for 50 loop", 50) { _ => - chunkedByteBuffer30m.writeFully( - new ByteArrayWritableChannel(chunkedByteBuffer30m.size.toInt)) - } - benchmark.addCase("Test writeWithSlice() chunks each with 100m for 50 loop", 50) { _ => - chunkedByteBuffer30m.writeWithSlice( - new ByteArrayWritableChannel(chunkedByteBuffer30m.size.toInt)) - } - benchmark.run() - } - test("toArray()") { val empty = ByteBuffer.wrap(Array.empty[Byte]) val bytes = ByteBuffer.wrap(Array.tabulate(8)(_.toByte)) From 33a27960d8635fde62351ddbf73304e55d54da15 Mon Sep 17 00:00:00 2001 From: zhoukang Date: Thu, 24 Aug 2017 21:58:09 +0800 Subject: [PATCH 10/19] Remove unused package --- .../test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala index 273b48bfa5419..3b798e36b0499 100644 --- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala @@ -23,7 +23,6 @@ import com.google.common.io.ByteStreams import org.apache.spark.SparkFunSuite import org.apache.spark.network.util.ByteArrayWritableChannel -import org.apache.spark.util.Benchmark import org.apache.spark.util.io.ChunkedByteBuffer class ChunkedByteBufferSuite extends SparkFunSuite { From ab384d4e94ed37e8943f368ff557ee0a80ddd435 Mon Sep 17 00:00:00 2001 From: zhoukang Date: Thu, 24 Aug 2017 22:08:14 +0800 Subject: [PATCH 11/19] No need call toInt --- .../main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index d50a42db5d095..45411bc147721 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -60,7 +60,7 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { for (bytes <- getChunks()) { val capacity = bytes.limit() while (bytes.position() < capacity && bytes.remaining() > 0) { - val ioSize = Math.min(bytes.remaining(), NIO_BUFFER_LIMIT.toInt) + val ioSize = Math.min(bytes.remaining(), NIO_BUFFER_LIMIT) bytes.limit(bytes.position + ioSize) channel.write(bytes) } From b6693513441faf350dcdc3636723359561346a17 Mon Sep 17 00:00:00 2001 From: zhoukang Date: Thu, 24 Aug 2017 22:19:23 +0800 Subject: [PATCH 12/19] Only need to use bytes.remaining() --- .../main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index 45411bc147721..8fb4040fc5b64 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -59,7 +59,7 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { def writeFully(channel: WritableByteChannel): Unit = { for (bytes <- getChunks()) { val capacity = bytes.limit() - while (bytes.position() < capacity && bytes.remaining() > 0) { + while (bytes.remaining() > 0) { val ioSize = Math.min(bytes.remaining(), NIO_BUFFER_LIMIT) bytes.limit(bytes.position + ioSize) channel.write(bytes) From 717f8863e1f3555febddb27116738268246dd57e Mon Sep 17 00:00:00 2001 From: zhoukang Date: Thu, 24 Aug 2017 22:36:11 +0800 Subject: [PATCH 13/19] Use config for NIO_BUFFER_LIMIT --- .../scala/org/apache/spark/internal/config/package.scala | 7 +++++++ .../scala/org/apache/spark/util/io/ChunkedByteBuffer.scala | 4 +++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 8dee0d970c4c6..0514c86c902b0 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -336,4 +336,11 @@ package object config { "spark.") .booleanConf .createWithDefault(false) + + private[spark] val STORAGE_NIO_BUFFER_LIMIT = + ConfigBuilder("spark.storage.nioBufferLimit") + .internal() + .doc("The block size limit when use ChunkedByteBuffer to writeFully bytes.") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(64 * 1024 * 1024) } diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index 8fb4040fc5b64..197f43129ad99 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -24,6 +24,7 @@ import java.nio.channels.WritableByteChannel import com.google.common.primitives.UnsignedBytes import io.netty.buffer.{ByteBuf, Unpooled} +import org.apache.spark.internal.config import org.apache.spark.network.util.ByteArrayWritableChannel import org.apache.spark.storage.StorageUtils @@ -40,7 +41,8 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { require(chunks != null, "chunks must not be null") require(chunks.forall(_.position() == 0), "chunks' positions must be 0") - private val NIO_BUFFER_LIMIT = 64 * 1024 * 1024 // Chunk size in bytes + // Chunk size in bytes + private val NIO_BUFFER_LIMIT = SparkEnv.get.conf.get(config.STORAGE_NIO_BUFFER_LIMIT) private[this] var disposed: Boolean = false From 9d3004d5fc91b3b49b314ad08fd3866d3fa5b020 Mon Sep 17 00:00:00 2001 From: zhoukang Date: Thu, 24 Aug 2017 22:46:31 +0800 Subject: [PATCH 14/19] Use BUFFER_WRITE_CHUNK_SIZE --- .../org/apache/spark/internal/config/package.scala | 14 +++++++------- .../apache/spark/util/io/ChunkedByteBuffer.scala | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 0514c86c902b0..fff54b27ae687 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -321,6 +321,13 @@ package object config { .intConf .createWithDefault(3) + private[spark] val BUFFER_WRITE_CHUNK_SIZE = + ConfigBuilder("spark.buffer.write.chunkSize") + .internal() + .doc("The block size limit when use ChunkedByteBuffer to writeFully bytes.") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(64 * 1024 * 1024) + private[spark] val REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM = ConfigBuilder("spark.reducer.maxReqSizeShuffleToMem") .internal() @@ -336,11 +343,4 @@ package object config { "spark.") .booleanConf .createWithDefault(false) - - private[spark] val STORAGE_NIO_BUFFER_LIMIT = - ConfigBuilder("spark.storage.nioBufferLimit") - .internal() - .doc("The block size limit when use ChunkedByteBuffer to writeFully bytes.") - .bytesConf(ByteUnit.BYTE) - .createWithDefault(64 * 1024 * 1024) } diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index 197f43129ad99..249f5db077e4a 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -42,7 +42,7 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { require(chunks.forall(_.position() == 0), "chunks' positions must be 0") // Chunk size in bytes - private val NIO_BUFFER_LIMIT = SparkEnv.get.conf.get(config.STORAGE_NIO_BUFFER_LIMIT) + private val NIO_BUFFER_LIMIT = SparkEnv.get.conf.get(config.BUFFER_WRITE_CHUNK_SIZE) private[this] var disposed: Boolean = false From e48f44f99355c2811c4cc8f094c062dee33ff428 Mon Sep 17 00:00:00 2001 From: zhoukang Date: Thu, 24 Aug 2017 22:58:32 +0800 Subject: [PATCH 15/19] Update as commented --- .../org/apache/spark/internal/config/package.scala | 14 +++++++------- .../apache/spark/util/io/ChunkedByteBuffer.scala | 7 ++++--- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index fff54b27ae687..7c449e9c48eef 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -293,6 +293,13 @@ package object config { .booleanConf .createWithDefault(false) + private[spark] val BUFFER_WRITE_CHUNK_SIZE = + ConfigBuilder("spark.buffer.write.chunkSize") + .internal() + .doc("The chunk size during writing out the bytes of ChunkedByteBuffer.") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(64 * 1024 * 1024) + private[spark] val CHECKPOINT_COMPRESS = ConfigBuilder("spark.checkpoint.compress") .doc("Whether to compress RDD checkpoints. Generally a good idea. Compression will use " + @@ -321,13 +328,6 @@ package object config { .intConf .createWithDefault(3) - private[spark] val BUFFER_WRITE_CHUNK_SIZE = - ConfigBuilder("spark.buffer.write.chunkSize") - .internal() - .doc("The block size limit when use ChunkedByteBuffer to writeFully bytes.") - .bytesConf(ByteUnit.BYTE) - .createWithDefault(64 * 1024 * 1024) - private[spark] val REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM = ConfigBuilder("spark.reducer.maxReqSizeShuffleToMem") .internal() diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index 249f5db077e4a..f724cd68444b0 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -42,7 +42,9 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { require(chunks.forall(_.position() == 0), "chunks' positions must be 0") // Chunk size in bytes - private val NIO_BUFFER_LIMIT = SparkEnv.get.conf.get(config.BUFFER_WRITE_CHUNK_SIZE) + private val bufferWriteChunkSize = + Option(SparkEnv.get).map(_.conf.get(BUFFER_WRITE_CHUNK_SIZE)) + .getOrElse(BUFFER_WRITE_CHUNK_SIZE.defaultValue.get) private[this] var disposed: Boolean = false @@ -60,9 +62,8 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { */ def writeFully(channel: WritableByteChannel): Unit = { for (bytes <- getChunks()) { - val capacity = bytes.limit() while (bytes.remaining() > 0) { - val ioSize = Math.min(bytes.remaining(), NIO_BUFFER_LIMIT) + val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize) bytes.limit(bytes.position + ioSize) channel.write(bytes) } From fc184aa535b2bf9cd771739da6ac1bbbec0504c4 Mon Sep 17 00:00:00 2001 From: zhoukang Date: Thu, 24 Aug 2017 23:16:26 +0800 Subject: [PATCH 16/19] Fix build error --- .../scala/org/apache/spark/util/io/ChunkedByteBuffer.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index f724cd68444b0..cc3cf48573c74 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -26,6 +26,7 @@ import io.netty.buffer.{ByteBuf, Unpooled} import org.apache.spark.internal.config import org.apache.spark.network.util.ByteArrayWritableChannel +import org.apache.spark.SparkEnv import org.apache.spark.storage.StorageUtils /** @@ -44,7 +45,7 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { // Chunk size in bytes private val bufferWriteChunkSize = Option(SparkEnv.get).map(_.conf.get(BUFFER_WRITE_CHUNK_SIZE)) - .getOrElse(BUFFER_WRITE_CHUNK_SIZE.defaultValue.get) + .getOrElse(config.BUFFER_WRITE_CHUNK_SIZE.defaultValue.get) private[this] var disposed: Boolean = false From f1d67a39abf574d551c95865d59dc0c7e6d24c7a Mon Sep 17 00:00:00 2001 From: zhoukang Date: Thu, 24 Aug 2017 23:43:35 +0800 Subject: [PATCH 17/19] Fix compile error and refine scala style --- .../scala/org/apache/spark/util/io/ChunkedByteBuffer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index cc3cf48573c74..5074db97c039d 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -24,9 +24,9 @@ import java.nio.channels.WritableByteChannel import com.google.common.primitives.UnsignedBytes import io.netty.buffer.{ByteBuf, Unpooled} +import org.apache.spark.SparkEnv import org.apache.spark.internal.config import org.apache.spark.network.util.ByteArrayWritableChannel -import org.apache.spark.SparkEnv import org.apache.spark.storage.StorageUtils /** @@ -44,7 +44,7 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { // Chunk size in bytes private val bufferWriteChunkSize = - Option(SparkEnv.get).map(_.conf.get(BUFFER_WRITE_CHUNK_SIZE)) + Option(SparkEnv.get).map(_.conf.get(config.BUFFER_WRITE_CHUNK_SIZE)) .getOrElse(config.BUFFER_WRITE_CHUNK_SIZE.defaultValue.get) private[this] var disposed: Boolean = false From 14ca824794ffd543aa169327e78de95f23b1102d Mon Sep 17 00:00:00 2001 From: zhoukang Date: Fri, 25 Aug 2017 10:03:44 +0800 Subject: [PATCH 18/19] Need to convert to int to fix compile error --- .../main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index 5074db97c039d..0a35fee770de1 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -65,7 +65,7 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { for (bytes <- getChunks()) { while (bytes.remaining() > 0) { val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize) - bytes.limit(bytes.position + ioSize) + bytes.limit(bytes.position + ioSize.toInt) channel.write(bytes) } } From a02575e3bb3f5f7f2f2d9f7a0a51a1f162c12c4d Mon Sep 17 00:00:00 2001 From: zhoukang Date: Fri, 25 Aug 2017 17:29:07 +0800 Subject: [PATCH 19/19] Avoid per-loop type cast --- .../main/scala/org/apache/spark/internal/config/package.scala | 2 ++ .../scala/org/apache/spark/util/io/ChunkedByteBuffer.scala | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 7c449e9c48eef..eae3c15c1d706 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -298,6 +298,8 @@ package object config { .internal() .doc("The chunk size during writing out the bytes of ChunkedByteBuffer.") .bytesConf(ByteUnit.BYTE) + .checkValue(_ <= Int.MaxValue, "The chunk size during writing out the bytes of" + + " ChunkedByteBuffer should not larger than Int.MaxValue.") .createWithDefault(64 * 1024 * 1024) private[spark] val CHECKPOINT_COMPRESS = diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index 0a35fee770de1..138eb0d317eaf 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -45,7 +45,7 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { // Chunk size in bytes private val bufferWriteChunkSize = Option(SparkEnv.get).map(_.conf.get(config.BUFFER_WRITE_CHUNK_SIZE)) - .getOrElse(config.BUFFER_WRITE_CHUNK_SIZE.defaultValue.get) + .getOrElse(config.BUFFER_WRITE_CHUNK_SIZE.defaultValue.get).toInt private[this] var disposed: Boolean = false @@ -65,7 +65,7 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { for (bytes <- getChunks()) { while (bytes.remaining() > 0) { val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize) - bytes.limit(bytes.position + ioSize.toInt) + bytes.limit(bytes.position + ioSize) channel.write(bytes) } }