From cd2f0e3658964818b076e6de150f15db32f3c455 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 15 May 2018 12:29:56 +0800 Subject: [PATCH 1/2] improve --- .../spark/util/io/ChunkedByteBuffer.scala | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index 3ae8dfcc1cb66..24b3820564f1a 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -63,15 +63,18 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { */ def writeFully(channel: WritableByteChannel): Unit = { for (bytes <- getChunks()) { - val curChunkLimit = bytes.limit() + val originalLimit = bytes.limit() while (bytes.hasRemaining) { - try { - val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize) - bytes.limit(bytes.position() + ioSize) - channel.write(bytes) - } finally { - bytes.limit(curChunkLimit) - } + // If `bytes` is an on-heap ByteBuffer, the JDK will copy it to a temporary direct + // ByteBuffer when writing it out. The JDK caches one temporary buffer per thread, and we + // may have significant memory pressure if the cached temp buffer gets created and freed + // frequently. Here we write the `bytes` with fixed-size slices to reuse the cached temp + // buffer and overcome this issue. + // Please refer to http://www.evanjones.ca/java-bytebuffer-leak.html for more details. + val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize) + bytes.limit(bytes.position() + ioSize) + channel.write(bytes) + bytes.limit(originalLimit) } } } From d651d3efb76772a6d43631f577180486319c31bd Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 17 May 2018 00:35:55 +0800 Subject: [PATCH 2/2] address comments --- .../org/apache/spark/util/io/ChunkedByteBuffer.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index 24b3820564f1a..700ce56466c35 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -65,11 +65,12 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { for (bytes <- getChunks()) { val originalLimit = bytes.limit() while (bytes.hasRemaining) { - // If `bytes` is an on-heap ByteBuffer, the JDK will copy it to a temporary direct - // ByteBuffer when writing it out. The JDK caches one temporary buffer per thread, and we - // may have significant memory pressure if the cached temp buffer gets created and freed - // frequently. Here we write the `bytes` with fixed-size slices to reuse the cached temp - // buffer and overcome this issue. + // If `bytes` is an on-heap ByteBuffer, the Java NIO API will copy it to a temporary direct + // ByteBuffer when writing it out. This temporary direct ByteBuffer is cached per thread. + // Its size has no limit and can keep growing if it sees a larger input ByteBuffer. This may + // cause significant native memory leak, if a large direct ByteBuffer is allocated and + // cached, as it's never released until thread exits. Here we write the `bytes` with + // fixed-size slices to limit the size of the cached direct ByteBuffer. // Please refer to http://www.evanjones.ca/java-bytebuffer-leak.html for more details. val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize) bytes.limit(bytes.position() + ioSize)