diff --git a/core/src/main/java/org/apache/spark/network/buffer/LargeByteBufferInputStream.java b/core/src/main/java/org/apache/spark/network/buffer/LargeByteBufferInputStream.java
new file mode 100644
index 0000000000000..dc8da016db3ed
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/network/buffer/LargeByteBufferInputStream.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.buffer;
+
+import java.io.InputStream;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Reads data from a LargeByteBuffer, and optionally cleans it up using buffer.dispose()
+ * when the stream is closed (e.g. to close a memory-mapped file).
+ */
+public class LargeByteBufferInputStream extends InputStream {
+
+ private LargeByteBuffer buffer;
+ private final boolean dispose;
+
+ public LargeByteBufferInputStream(LargeByteBuffer buffer, boolean dispose) {
+ this.buffer = buffer;
+ this.dispose = dispose;
+ }
+
+ public LargeByteBufferInputStream(LargeByteBuffer buffer) {
+ this(buffer, false);
+ }
+
+ @Override
+ public int read() {
+ if (buffer == null || buffer.remaining() == 0) {
+ return -1;
+ } else {
+ return buffer.get() & 0xFF;
+ }
+ }
+
+ @Override
+ public int read(byte[] dest) {
+ return read(dest, 0, dest.length);
+ }
+
+ @Override
+ public int read(byte[] dest, int offset, int length) {
+ if (buffer == null || buffer.remaining() == 0) {
+ return -1;
+ } else {
+ int amountToGet = (int) Math.min(buffer.remaining(), length);
+ buffer.get(dest, offset, amountToGet);
+ return amountToGet;
+ }
+ }
+
+ @Override
+ public long skip(long toSkip) {
+ if (buffer != null) {
+ return buffer.skip(toSkip);
+ } else {
+ return 0L;
+ }
+ }
+
+ /**
+ * Clean up the buffer, and potentially dispose of it
+ */
+ @Override
+ public void close() {
+ if (buffer != null) {
+ if (dispose) {
+ buffer.dispose();
+ }
+ buffer = null;
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/spark/network/buffer/LargeByteBufferOutputStream.java b/core/src/main/java/org/apache/spark/network/buffer/LargeByteBufferOutputStream.java
new file mode 100644
index 0000000000000..81de016781400
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/network/buffer/LargeByteBufferOutputStream.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.buffer;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.spark.util.io.ByteArrayChunkOutputStream;
+
+/**
+ * An OutputStream that will write all data to memory. It supports writing over 2GB
+ * and the resulting data can be retrieved as a
+ * {@link org.apache.spark.network.buffer.LargeByteBuffer}
+ */
+public class LargeByteBufferOutputStream extends OutputStream {
+
+ private final ByteArrayChunkOutputStream output;
+
+ /**
+ * Create a new LargeByteBufferOutputStream which writes to byte arrays of the given size. Note
+ * that chunkSize has no effect on the LargeByteBuffer returned by
+ * {@link #largeBuffer()}.
+ *
+ * @param chunkSize size of the byte arrays used by this output stream, in bytes
+ */
+ public LargeByteBufferOutputStream(int chunkSize) {
+ output = new ByteArrayChunkOutputStream(chunkSize);
+ }
+
+ @Override
+ public void write(int b) {
+ output.write(b);
+ }
+
+ @Override
+ public void write(byte[] bytes, int off, int len) {
+ output.write(bytes, off, len);
+ }
+
+ /**
+ * Get all of the data written to the stream so far as a LargeByteBuffer. This method can be
+ * called multiple times, and each returned buffer will be completely independent (the data
+ * is copied for each returned buffer). It does not close the stream.
+ *
+ * @return the data written to the stream as a LargeByteBuffer
+ */
+ public LargeByteBuffer largeBuffer() {
+ return largeBuffer(LargeByteBufferHelper.MAX_CHUNK_SIZE);
+ }
+
+ /**
+ * You don't really ever want to call this method -- the returned
+ * buffer will not implement {{asByteBuffer}} correctly.
+ */
+ @VisibleForTesting
+ LargeByteBuffer largeBuffer(int maxChunk) {
+ long totalSize = output.size();
+ int chunksNeeded = (int) ((totalSize + maxChunk - 1) / maxChunk);
+ ByteBuffer[] chunks = new ByteBuffer[chunksNeeded];
+ long remaining = totalSize;
+ long pos = 0;
+ for (int idx = 0; idx < chunksNeeded; idx++) {
+ int nextSize = (int) Math.min(maxChunk, remaining);
+ chunks[idx] = ByteBuffer.wrap(output.slice(pos, pos + nextSize));
+ pos += nextSize;
+ remaining -= nextSize;
+ }
+ return new WrappedLargeByteBuffer(chunks, maxChunk);
+ }
+
+ @Override
+ public void close() throws IOException {
+ output.close();
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/util/io/ByteArrayChunkOutputStream.scala b/core/src/main/scala/org/apache/spark/util/io/ByteArrayChunkOutputStream.scala
index daac6f971eb20..d48eb2f330321 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ByteArrayChunkOutputStream.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ByteArrayChunkOutputStream.scala
@@ -21,7 +21,6 @@ import java.io.OutputStream
import scala.collection.mutable.ArrayBuffer
-
/**
* An OutputStream that writes to fixed-size chunks of byte arrays.
*
@@ -43,10 +42,13 @@ class ByteArrayChunkOutputStream(chunkSize: Int) extends OutputStream {
*/
private var position = chunkSize
+ private[spark] var size: Long = 0L
+
override def write(b: Int): Unit = {
allocateNewChunkIfNeeded()
chunks(lastChunkIndex)(position) = b.toByte
position += 1
+ size += 1
}
override def write(bytes: Array[Byte], off: Int, len: Int): Unit = {
@@ -58,6 +60,7 @@ class ByteArrayChunkOutputStream(chunkSize: Int) extends OutputStream {
written += thisBatch
position += thisBatch
}
+ size += len
}
@inline
@@ -91,4 +94,44 @@ class ByteArrayChunkOutputStream(chunkSize: Int) extends OutputStream {
ret
}
}
+
+ /**
+ * Get a copy of the data between the two endpoints, start <= idx < until. Always returns
+ * an array of size (until - start). Throws an IllegalArgumentException unless
+ * 0 <= start <= until <= size
+ */
+ def slice(start: Long, until: Long): Array[Byte] = {
+ require((until - start) < Integer.MAX_VALUE, "max slice length = Integer.MAX_VALUE")
+ require(start >= 0 && start <= until, s"start ($start) must be >= 0 and <= until ($until)")
+ require(until >= start && until <= size,
+ s"until ($until) must be >= start ($start) and <= size ($size)")
+ var chunkStart = 0L
+ var chunkIdx = 0
+ val length = (until - start).toInt
+ var foundStart = false
+ val result = new Array[Byte](length)
+ while (!foundStart) {
+ val nextChunkStart = chunkStart + chunks(chunkIdx).size
+ if (nextChunkStart > start) {
+ foundStart = true
+ } else {
+ chunkStart = nextChunkStart
+ chunkIdx += 1
+ }
+ }
+
+ var remaining = length
+ var pos = 0
+ var offsetInChunk = (start - chunkStart).toInt
+ while (remaining > 0) {
+ val lenToCopy = math.min(remaining, chunks(chunkIdx).size - offsetInChunk)
+ System.arraycopy(chunks(chunkIdx), offsetInChunk, result, pos, lenToCopy)
+ chunkIdx += 1
+ offsetInChunk = 0
+ pos += lenToCopy
+ remaining -= lenToCopy
+ }
+ result
+ }
+
}
diff --git a/core/src/test/scala/org/apache/spark/network/buffer/LargeByteBufferInputStreamSuite.scala b/core/src/test/scala/org/apache/spark/network/buffer/LargeByteBufferInputStreamSuite.scala
new file mode 100644
index 0000000000000..44b458d6555cf
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/network/buffer/LargeByteBufferInputStreamSuite.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.buffer
+
+import java.io.{File, FileInputStream, FileOutputStream, OutputStream}
+import java.nio.channels.FileChannel.MapMode
+
+import org.junit.Assert._
+import org.mockito.Mockito._
+import org.scalatest.Matchers
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.SparkFunSuite
+
+class LargeByteBufferInputStreamSuite extends SparkFunSuite with Matchers with MockitoSugar {
+
+ test("read from large mapped file") {
+ val testFile = File.createTempFile("large-buffer-input-stream-test", ".bin")
+
+ try {
+ val out: OutputStream = new FileOutputStream(testFile)
+ val buffer: Array[Byte] = new Array[Byte](1 << 16)
+ val len: Long = buffer.length.toLong + Integer.MAX_VALUE + 1
+ (0 until buffer.length).foreach { idx =>
+ buffer(idx) = idx.toByte
+ }
+ (0 until (len / buffer.length).toInt).foreach { idx =>
+ out.write(buffer)
+ }
+ out.close
+
+ val channel = new FileInputStream(testFile).getChannel
+ val buf = LargeByteBufferHelper.mapFile(channel, MapMode.READ_ONLY, 0, len)
+ val in = new LargeByteBufferInputStream(buf, true)
+
+ val read = new Array[Byte](buffer.length)
+ (0 until (len / buffer.length).toInt).foreach { idx =>
+ in.read(read) should be(read.length)
+ (0 until buffer.length).foreach { arrIdx =>
+ assertEquals(buffer(arrIdx), read(arrIdx))
+ }
+ }
+ in.read(read) should be(-1)
+ in.close()
+ } finally {
+ testFile.delete()
+ }
+ }
+
+ test("dispose on close") {
+ // don't need to read to the end -- dispose anytime we close
+ val mockBuffer = mock[LargeByteBuffer]
+ when(mockBuffer.remaining()).thenReturn(0)
+ val in = new LargeByteBufferInputStream(mockBuffer, true)
+ verify(mockBuffer, times(0)).dispose()
+ // reading to the end shouldn't auto-dispose
+ in.read() should be (-1)
+ verify(mockBuffer, times(0)).dispose()
+ in.close()
+ verify(mockBuffer, times(1)).dispose()
+ }
+
+ test("io stream roundtrip") {
+ val out = new LargeByteBufferOutputStream(128)
+ (0 until 200).foreach { idx => out.write(idx) }
+ out.close()
+
+ val lb = out.largeBuffer(128)
+ // just make sure that we test reading from multiple chunks
+ lb.asInstanceOf[WrappedLargeByteBuffer].underlying.size should be > 1
+
+ val rawIn = new LargeByteBufferInputStream(lb)
+ val arr = new Array[Byte](500)
+ val nRead = rawIn.read(arr, 0, 500)
+ nRead should be (200)
+ (0 until 200).foreach { idx =>
+ arr(idx) should be (idx.toByte)
+ }
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/network/buffer/LargeByteBufferOutputStreamSuite.scala b/core/src/test/scala/org/apache/spark/network/buffer/LargeByteBufferOutputStreamSuite.scala
new file mode 100644
index 0000000000000..72c98b7feacab
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/network/buffer/LargeByteBufferOutputStreamSuite.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.buffer
+
+import scala.util.Random
+
+import org.scalatest.Matchers
+
+import org.apache.spark.SparkFunSuite
+
+class LargeByteBufferOutputStreamSuite extends SparkFunSuite with Matchers {
+
+ test("merged buffers for < 2GB") {
+ val out = new LargeByteBufferOutputStream(10)
+ val bytes = new Array[Byte](100)
+ Random.nextBytes(bytes)
+ out.write(bytes)
+
+ val buffer = out.largeBuffer
+ buffer.position() should be (0)
+ buffer.size() should be (100)
+ val nioBuffer = buffer.asByteBuffer()
+ nioBuffer.position() should be (0)
+ nioBuffer.capacity() should be (100)
+ nioBuffer.limit() should be (100)
+
+ val read = new Array[Byte](100)
+ buffer.get(read, 0, 100)
+ read should be (bytes)
+
+ buffer.rewind()
+ nioBuffer.get(read)
+ read should be (bytes)
+ }
+
+ test("chunking") {
+ val out = new LargeByteBufferOutputStream(10)
+ val bytes = new Array[Byte](100)
+ Random.nextBytes(bytes)
+ out.write(bytes)
+
+ (10 to 100 by 10).foreach { chunkSize =>
+ val buffer = out.largeBuffer(chunkSize).asInstanceOf[WrappedLargeByteBuffer]
+ buffer.position() should be (0)
+ buffer.size() should be (100)
+ val read = new Array[Byte](100)
+ buffer.get(read, 0, 100)
+ read should be (bytes)
+ }
+
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/util/io/ByteArrayChunkOutputStreamSuite.scala b/core/src/test/scala/org/apache/spark/util/io/ByteArrayChunkOutputStreamSuite.scala
index 361ec95654f47..2bb0df394a9c2 100644
--- a/core/src/test/scala/org/apache/spark/util/io/ByteArrayChunkOutputStreamSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/io/ByteArrayChunkOutputStreamSuite.scala
@@ -21,7 +21,6 @@ import scala.util.Random
import org.apache.spark.SparkFunSuite
-
class ByteArrayChunkOutputStreamSuite extends SparkFunSuite {
test("empty output") {
@@ -106,4 +105,30 @@ class ByteArrayChunkOutputStreamSuite extends SparkFunSuite {
assert(arrays(1).toSeq === ref.slice(10, 20))
assert(arrays(2).toSeq === ref.slice(20, 30))
}
+
+ test("slice") {
+ val ref = new Array[Byte](30)
+ Random.nextBytes(ref)
+ val o = new ByteArrayChunkOutputStream(5)
+ o.write(ref)
+
+ for {
+ start <- (0 until 30)
+ end <- (start to 30)
+ } {
+ withClue(s"start = $start; end = $end") {
+ try {
+ assert(o.slice(start, end).toSeq === ref.slice(start, end))
+ } catch {
+ case ex => fail(ex)
+ }
+ }
+ }
+
+ // errors on bad bounds
+ intercept[IllegalArgumentException] { o.slice(31, 31) }
+ intercept[IllegalArgumentException] { o.slice(-1, 10) }
+ intercept[IllegalArgumentException] { o.slice(10, 5) }
+ intercept[IllegalArgumentException] { o.slice(10, 35) }
+ }
}
diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/BufferTooLargeException.java b/network/common/src/main/java/org/apache/spark/network/buffer/BufferTooLargeException.java
new file mode 100644
index 0000000000000..4e1a85ba1f126
--- /dev/null
+++ b/network/common/src/main/java/org/apache/spark/network/buffer/BufferTooLargeException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.buffer;
+
+import java.io.IOException;
+
+public class BufferTooLargeException extends IOException {
+ public final long actualSize;
+ public final long extra;
+ public final long maxSize;
+
+ public BufferTooLargeException(long actualSize, long maxSize) {
+ super(String.format("LargeByteBuffer is too large to convert. Size: %d; Size Limit: %d (%d " +
+ "too big)", actualSize, maxSize,
+ actualSize - maxSize));
+ this.extra = actualSize - maxSize;
+ this.actualSize = actualSize;
+ this.maxSize = maxSize;
+ }
+}
diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/LargeByteBuffer.java b/network/common/src/main/java/org/apache/spark/network/buffer/LargeByteBuffer.java
new file mode 100644
index 0000000000000..21d954fcaeca8
--- /dev/null
+++ b/network/common/src/main/java/org/apache/spark/network/buffer/LargeByteBuffer.java
@@ -0,0 +1,146 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.spark.network.buffer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * A byte buffer which can hold over 2GB.
+ *
get methods). It supports random access via skip to move around the
+ * buffer.
+ *
+ * In general, implementations are expected to support O(1) random access. Furthermore,
+ * neighboring locations in the buffer are likely to be neighboring in memory, so sequential access
+ * will avoid cache-misses. However, these are only rough guidelines which may differ in
+ * implementations.
+ *
+ * Any code which expects a ByteBuffer can obtain one via {@link #asByteBuffer} when possible -- see
+ * that method for a full description of its limitations.
+ *
+ * Instances of this class can be created with
+ * {@link org.apache.spark.network.buffer.LargeByteBufferHelper},
+ * with a LargeByteBufferOutputStream,
+ * or directly from the implementation
+ * {@link org.apache.spark.network.buffer.WrappedLargeByteBuffer}.
+ */
+public interface LargeByteBuffer {
+ public byte get();
+
+ /**
+ * Bulk copy data from this buffer into the given array. First checks there is sufficient
+ * data in this buffer; if not, throws a {@link java.nio.BufferUnderflowException}. Behaves
+ * in the exact same way as get(dst, 0, dst.length)
+ *
+ * @param dst the destination array
+ * @return this buffer
+ */
+ public LargeByteBuffer get(byte[] dst);
+
+ /**
+ * Bulk copy data from this buffer into the given array. First checks there is sufficient
+ * data in this buffer; if not, throws a {@link java.nio.BufferUnderflowException}.
+ *
+ * @param dst the destination array
+ * @param offset the offset within the destination array to write to
+ * @param length how many bytes to write
+ * @return this buffer
+ */
+ public LargeByteBuffer get(byte[] dst, int offset, int length);
+
+ public LargeByteBuffer rewind();
+
+ /**
+ * Return a deep copy of this buffer.
+ * The returned buffer will have position == 0. The position
+ * of this buffer will not change as a result of copying.
+ *
+ * @return a new buffer with a full copy of this buffer's data
+ */
+ public LargeByteBuffer deepCopy();
+
+ /**
+ * Advance the position in this buffer by up to n bytes. n may be
+ * positive or negative. It will move the full n unless that moves
+ * it past the end (or beginning) of the buffer, in which case it will move to the end
+ * (or beginning).
+ *
+ * @return the number of bytes moved forward (can be negative if n is negative)
+ */
+ public long skip(long n);
+
+ public long position();
+
+ /**
+ * Creates a new byte buffer that shares this buffer's content.
+ *
+ * The content of the new buffer will be that of this buffer. Changes
+ * to this buffer's content will be visible in the new buffer, and vice
+ * versa; the two buffers' positions will be independent.
+ *
+ * The new buffer's position will be identical to those of this buffer
+ */
+ public LargeByteBuffer duplicate();
+
+ public long remaining();
+
+ /**
+ * Total number of bytes in this buffer
+ */
+ public long size();
+
+ /**
+ * Writes the data from the current position() to the end of this buffer
+ * to the given channel. The position() will be moved to the end of
+ * the buffer after this.
+ *
+ * Note that this method will continually attempt to push data to the given channel. If the
+ * channel cannot accept more data, this will continuously retry until the channel accepts
+ * the data.
+ *
+ * @param channel
+ * @return the number of bytes written to the channel
+ * @throws IOException
+ */
+ public long writeTo(WritableByteChannel channel) throws IOException;
+
+ /**
+ * Get the entire contents of this as one ByteBuffer, if possible. The returned ByteBuffer
+ * will always have the position set to 0, and the limit set to the end of the data. Each
+ * call will return a new ByteBuffer, but will not require copying the data (eg., it will
+ * use ByteBuffer#duplicate()). The returned byte buffer will share data with this buffer. The
+ * returned buffers will never be larger than
+ * {@link org.apache.spark.network.buffer.LargeByteBufferHelper#MAX_CHUNK_SIZE}
+ *
+ * @throws BufferTooLargeException if this buffer is too large to fit in one {@link ByteBuffer}
+ */
+ public ByteBuffer asByteBuffer() throws BufferTooLargeException;
+
+ /**
+ * Attempt to clean this up if it is memory-mapped. This uses an *unsafe* Sun API that
+ * might cause errors if one attempts to read from the unmapped buffer, but it's better than
+ * waiting for the GC to find it because that could lead to huge numbers of open files. There's
+ * unfortunately no standard API to do this.
+ */
+ public void dispose();
+}
diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/LargeByteBufferHelper.java b/network/common/src/main/java/org/apache/spark/network/buffer/LargeByteBufferHelper.java
new file mode 100644
index 0000000000000..4941ed6559ea9
--- /dev/null
+++ b/network/common/src/main/java/org/apache/spark/network/buffer/LargeByteBufferHelper.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.buffer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Utils for creating {@link org.apache.spark.network.buffer.LargeByteBuffer}s, either from
+ * pre-allocated byte arrays, ByteBuffers, or by memory mapping a file.
+ */
+public class LargeByteBufferHelper {
+
+ // netty can't quite send msgs that are a full 2GB -- they need to be slightly smaller
+ // not sure what the exact limit is, but 200 seems OK.
+ /**
+ * The maximum size of any ByteBuffer.
+ * {@link org.apache.spark.network.buffer.LargeByteBuffer#asByteBuffer} will never return a
+ * ByteBuffer larger than this. This is close to the max ByteBuffer size (2GB), minus a small
+ * amount for message overhead.
+ */
+ public static final int MAX_CHUNK_SIZE = Integer.MAX_VALUE - 200;
+
+ public static LargeByteBuffer asLargeByteBuffer(ByteBuffer buffer) {
+ return new WrappedLargeByteBuffer(new ByteBuffer[] { buffer } );
+ }
+
+ public static LargeByteBuffer asLargeByteBuffer(byte[] bytes) {
+ return asLargeByteBuffer(ByteBuffer.wrap(bytes));
+ }
+
+ public static LargeByteBuffer allocate(long size) {
+ return allocate(size, MAX_CHUNK_SIZE);
+ }
+
+ @VisibleForTesting
+ static LargeByteBuffer allocate(long size, int maxChunk) {
+ int chunksNeeded = (int) ((size + maxChunk - 1) / maxChunk);
+ ByteBuffer[] chunks = new ByteBuffer[chunksNeeded];
+ long remaining = size;
+ for (int i = 0; i < chunksNeeded; i++) {
+ int nextSize = (int) Math.min(remaining, maxChunk);
+ ByteBuffer next = ByteBuffer.allocate(nextSize);
+ remaining -= nextSize;
+ chunks[i] = next;
+ }
+ if (remaining != 0) {
+ throw new IllegalStateException("remaining = " + remaining);
+ }
+ return new WrappedLargeByteBuffer(chunks, maxChunk);
+ }
+
+ public static LargeByteBuffer mapFile(
+ FileChannel channel,
+ FileChannel.MapMode mode,
+ long offset,
+ long length
+ ) throws IOException {
+ int chunksNeeded = (int) ((length - 1) / MAX_CHUNK_SIZE) + 1;
+ ByteBuffer[] chunks = new ByteBuffer[chunksNeeded];
+ long curPos = offset;
+ long end = offset + length;
+ for (int i = 0; i < chunksNeeded; i++) {
+ long nextPos = Math.min(curPos + MAX_CHUNK_SIZE, end);
+ chunks[i] = channel.map(mode, curPos, nextPos - curPos);
+ curPos = nextPos;
+ }
+ return new WrappedLargeByteBuffer(chunks);
+ }
+
+}
diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/WrappedLargeByteBuffer.java b/network/common/src/main/java/org/apache/spark/network/buffer/WrappedLargeByteBuffer.java
new file mode 100644
index 0000000000000..859ad30100413
--- /dev/null
+++ b/network/common/src/main/java/org/apache/spark/network/buffer/WrappedLargeByteBuffer.java
@@ -0,0 +1,291 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.spark.network.buffer;
+
+import java.io.IOException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Arrays;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import sun.nio.ch.DirectBuffer;
+
+/**
+ * A {@link org.apache.spark.network.buffer.LargeByteBuffer} which may contain multiple
+ * {@link java.nio.ByteBuffer}s. In order to support asByteBuffer, all
+ * of the underlying ByteBuffers must have size equal to
+ * {@link org.apache.spark.network.buffer.LargeByteBufferHelper#MAX_CHUNK_SIZE} (except that last
+ * one). The underlying ByteBuffers may be on-heap, direct, or memory-mapped.
+ */
+public class WrappedLargeByteBuffer implements LargeByteBuffer {
+
+ @VisibleForTesting
+ final ByteBuffer[] underlying;
+
+ private final long size;
+ /**
+ * each sub-ByteBuffer (except for the last one) must be exactly this size. Note that this
+ * class *really* expects this to be LargeByteBufferHelper.MAX_CHUNK_SIZE. The only reason it isn't
+ * is so that we can do tests without creating ginormous buffers. Public methods force it to
+ * be LargeByteBufferHelper.MAX_CHUNK_SIZE
+ */
+ private final int subBufferSize;
+ private long _pos;
+ @VisibleForTesting
+ int currentBufferIdx;
+ @VisibleForTesting
+ ByteBuffer currentBuffer;
+
+ /**
+ * Construct a WrappedLargeByteBuffer from the given ByteBuffers. Each of the ByteBuffers must
+ * have size equal to {@link org.apache.spark.network.buffer.LargeByteBufferHelper#MAX_CHUNK_SIZE}
+ * except for the final one. The buffers are duplicated, so the position of the
+ * given buffers and the returned buffer will be independent, though the underlying data will be
+ * shared. The constructed buffer will always have position == 0.
+ */
+ public WrappedLargeByteBuffer(ByteBuffer[] underlying) {
+ this(underlying, LargeByteBufferHelper.MAX_CHUNK_SIZE);
+ }
+
+ /**
+ * you do **not** want to call this version. It leads to a buffer which doesn't properly
+ * support {@link #asByteBuffer}. The only reason it exists is to we can have tests which
+ * don't require 2GB of memory
+ *
+ * @param underlying
+ * @param subBufferSize
+ */
+ @VisibleForTesting
+ WrappedLargeByteBuffer(ByteBuffer[] underlying, int subBufferSize) {
+ if (underlying.length == 0) {
+ throw new IllegalArgumentException("must wrap at least one ByteBuffer");
+ }
+ this.underlying = new ByteBuffer[underlying.length];
+ this.subBufferSize = subBufferSize;
+ long sum = 0L;
+
+ for (int i = 0; i < underlying.length; i++) {
+ ByteBuffer b = underlying[i].duplicate();
+ b.position(0);
+ this.underlying[i] = b;
+ if (i != underlying.length - 1 && b.capacity() != subBufferSize) {
+ // this is to make sure that asByteBuffer() is implemented correctly. We need the first
+ // subBuffer to be LargeByteBufferHelper.MAX_CHUNK_SIZE. We don't *have* to check all the
+ // subBuffers, but I figure its makes it more consistent this way. (Also, this check
+ // really only serves a purpose when using the public constructor -- subBufferSize is a
+ // a parameter just to allow small tests.)
+ throw new IllegalArgumentException("All buffers, except for the final one, must have " +
+ "size = " + subBufferSize);
+ }
+ sum += b.capacity();
+ }
+ _pos = 0;
+ currentBufferIdx = 0;
+ currentBuffer = this.underlying[0];
+ size = sum;
+ }
+
+ @Override
+ public WrappedLargeByteBuffer get(byte[] dest) {
+ return get(dest, 0, dest.length);
+ }
+
+ @Override
+ public WrappedLargeByteBuffer get(byte[] dest, int offset, int length) {
+ if (length > remaining()) {
+ throw new BufferUnderflowException();
+ }
+ int moved = 0;
+ while (moved < length) {
+ int toRead = Math.min(length - moved, currentBuffer.remaining());
+ currentBuffer.get(dest, offset + moved, toRead);
+ moved += toRead;
+ updateCurrentBufferIfNeeded();
+ }
+ _pos += moved;
+ return this;
+ }
+
+ @Override
+ public LargeByteBuffer rewind() {
+ if (currentBuffer != null) {
+ currentBuffer.rewind();
+ }
+ while (currentBufferIdx > 0) {
+ currentBufferIdx -= 1;
+ currentBuffer = underlying[currentBufferIdx];
+ currentBuffer.rewind();
+ }
+ _pos = 0;
+ return this;
+ }
+
+ @Override
+ public WrappedLargeByteBuffer deepCopy() {
+ ByteBuffer[] dataCopy = new ByteBuffer[underlying.length];
+ for (int i = 0; i < underlying.length; i++) {
+ ByteBuffer b = underlying[i];
+ dataCopy[i] = ByteBuffer.allocate(b.capacity());
+ int originalPosition = b.position();
+ b.rewind();
+ dataCopy[i].put(b);
+ dataCopy[i].position(0);
+ b.position(originalPosition);
+ }
+ return new WrappedLargeByteBuffer(dataCopy, subBufferSize);
+ }
+
+ @Override
+ public byte get() {
+ if (remaining() < 1L) {
+ throw new BufferUnderflowException();
+ }
+ byte r = currentBuffer.get();
+ _pos += 1;
+ updateCurrentBufferIfNeeded();
+ return r;
+ }
+
+ /**
+ * If we've read to the end of the current buffer, move on to the next one. Safe to call
+ * even if we haven't moved to the next buffer
+ */
+ private void updateCurrentBufferIfNeeded() {
+ while (currentBuffer != null && !currentBuffer.hasRemaining()) {
+ currentBufferIdx += 1;
+ currentBuffer = currentBufferIdx < underlying.length ? underlying[currentBufferIdx] : null;
+ }
+ }
+
+ @Override
+ public long position() {
+ return _pos;
+ }
+
+ @Override
+ public long skip(long n) {
+ if (n < 0) {
+ final long moveTotal = Math.min(-n, _pos);
+ long toMove = moveTotal;
+ // move backwards and update the position of every buffer as we go
+ if (currentBuffer != null) {
+ currentBufferIdx += 1;
+ }
+ while (toMove > 0) {
+ currentBufferIdx -= 1;
+ currentBuffer = underlying[currentBufferIdx];
+ int thisMove = (int) Math.min(toMove, currentBuffer.position());
+ currentBuffer.position(currentBuffer.position() - thisMove);
+ toMove -= thisMove;
+ }
+ _pos -= moveTotal;
+ return -moveTotal;
+ } else if (n > 0) {
+ final long moveTotal = Math.min(n, remaining());
+ long toMove = moveTotal;
+ // move forwards and update the position of every buffer as we go
+ currentBufferIdx -= 1;
+ while (toMove > 0) {
+ currentBufferIdx += 1;
+ currentBuffer = underlying[currentBufferIdx];
+ int thisMove = (int) Math.min(toMove, currentBuffer.remaining());
+ currentBuffer.position(currentBuffer.position() + thisMove);
+ toMove -= thisMove;
+ }
+ _pos += moveTotal;
+ return moveTotal;
+ } else {
+ return 0;
+ }
+ }
+
+ @Override
+ public long remaining() {
+ return size - _pos;
+ }
+
+ @Override
+ public WrappedLargeByteBuffer duplicate() {
+ // the constructor will duplicate the underlying buffers for us
+ WrappedLargeByteBuffer dup = new WrappedLargeByteBuffer(underlying, subBufferSize);
+ dup.skip(position());
+ return dup;
+ }
+
+ @Override
+ public long size() {
+ return size;
+ }
+
+ @Override
+ public long writeTo(WritableByteChannel channel) throws IOException {
+ long written = 0L;
+ for (; currentBufferIdx < underlying.length; currentBufferIdx++) {
+ currentBuffer = underlying[currentBufferIdx];
+ written += currentBuffer.remaining();
+ while (currentBuffer.hasRemaining())
+ channel.write(currentBuffer);
+ }
+ _pos = size();
+ return written;
+ }
+
+ @Override
+ public ByteBuffer asByteBuffer() throws BufferTooLargeException {
+ if (underlying.length == 1) {
+ ByteBuffer b = underlying[0].duplicate();
+ b.rewind();
+ return b;
+ } else {
+ // NOTE: if subBufferSize != LargeByteBufferHelper.MAX_CAPACITY, in theory
+ // we could copy the data into a new buffer. But we don't want to do any copying.
+ // The only reason we allow smaller subBufferSize is so that we can have tests which
+ // don't require 2GB of memory
+ throw new BufferTooLargeException(size(), underlying[0].capacity());
+ }
+ }
+
+ @VisibleForTesting
+ List