Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,19 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
*/
def writeFully(channel: WritableByteChannel): Unit = {
for (bytes <- getChunks()) {
val curChunkLimit = bytes.limit()
val originalLimit = bytes.limit()
while (bytes.hasRemaining) {
try {
val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
bytes.limit(bytes.position() + ioSize)
channel.write(bytes)
} finally {
bytes.limit(curChunkLimit)
}
// If `bytes` is an on-heap ByteBuffer, the Java NIO API will copy it to a temporary direct
// ByteBuffer when writing it out. This temporary direct ByteBuffer is cached per thread.
// Its size has no limit and can keep growing if it sees a larger input ByteBuffer. This may
// cause significant native memory leak, if a large direct ByteBuffer is allocated and
// cached, as it's never released until thread exits. Here we write the `bytes` with
// fixed-size slices to limit the size of the cached direct ByteBuffer.
// Please refer to http://www.evanjones.ca/java-bytebuffer-leak.html for more details.
val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
bytes.limit(bytes.position() + ioSize)
channel.write(bytes)
bytes.limit(originalLimit)
}
}
}
Expand Down