Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
109 commits
Select commit Hold shift + click to select a range
5cdcd42
add some failing tests, though these probably shouldnt actually get m…
squito Feb 20, 2015
03db862
steal some code from earlier work of @mridulm
squito Feb 23, 2015
d6337f0
wip -- changed a bunch of types to LargeByteBuffer; discovered probl…
squito Feb 24, 2015
a139e97
compiling but all sorts of bad casting etc.
squito Feb 25, 2015
4965bad
move LargeByteBuffer to network-common, since we need it there for th…
squito Feb 25, 2015
149d4fa
move large byte buffer to network/common ... still lots of crud
squito Feb 25, 2015
01cafbf
tests compile too
squito Feb 25, 2015
ce391a0
failing test case (though its crappy)
squito Feb 25, 2015
29f0a8a
fix use of LargeByteBuffer in some tests, create UploadPartialBlock
squito Feb 27, 2015
dcb4669
add real test case for uploading large blocks (failing now)
squito Feb 27, 2015
660f5e3
flesh out NettyBlockTransfer#uploadBlock
squito Feb 28, 2015
4c228a0
minor cleanup
squito Feb 28, 2015
cf7c3a7
cleanup abandonded block uploads
squito Feb 28, 2015
fe90fd6
crank up memory for tests
squito Mar 2, 2015
857f3df
fix LargeByteBuffer dispose()
squito Mar 2, 2015
b700723
maven needs you to request lots of extra memory for some reason
squito Mar 2, 2015
6b102a0
passing tests! assorted little changes
squito Mar 2, 2015
84df2dd
fix tests
squito Mar 2, 2015
7a5d1ac
dont kill jenkins with huge tests
squito Mar 2, 2015
38d6993
test cleanup
squito Mar 2, 2015
fc0d118
fixup test & fix bug in WrappedLargeByteBuffer
squito Mar 2, 2015
0e3700f
cleanup
squito Mar 2, 2015
6f6a8d7
cleanup
squito Mar 2, 2015
9de8866
style fixes
squito Mar 2, 2015
ef88085
minor cleanup
squito Mar 2, 2015
17d0c1a
get streaming to compile
squito Mar 2, 2015
f603ba4
very basic assertion in large broadcast test
squito Mar 2, 2015
cd84c69
but turn test off b/c it requires large heap
squito Mar 2, 2015
8c81f1c
Merge branch 'master' into SPARK-1391_2g_partition_limit
squito Mar 24, 2015
da86124
rip out everything attempting to send > 2GB over the network; add som…
squito Mar 26, 2015
74a633a
Merge branch 'master' into SPARK-1391_2g_partition_limit
squito Mar 26, 2015
e09d2c7
oops, add some missing files
squito Mar 26, 2015
165b1c3
undo more changes
squito Mar 26, 2015
27330a5
fix test compile
squito Mar 26, 2015
23c8a7b
Merge branch 'master' into SPARK-1391_2g_partition_limit
squito Apr 2, 2015
e33b77c
add missing cast (remote data is always one ByteBuffer)
squito Apr 2, 2015
f8264e7
tests & some fixes for WrappedLargeByteBuffer
squito Apr 3, 2015
0b43d8b
LargeByteBuffer has its own dispose
squito Apr 3, 2015
db20e2c
fix some tests for LargeByteBuffer
squito Apr 3, 2015
4124ca3
cleanup of LargeBB interface; more tests
squito Apr 3, 2015
a0b6976
more tests
squito Apr 3, 2015
8cbc967
BufferUnderflowException
squito Apr 3, 2015
ec7c50c
tests (wip)
squito Apr 3, 2015
ba9fcda
ByteArrayChunkOutputStream.slice()
squito Apr 3, 2015
31381a1
LargeByteBufferOutputStream gives a merged array, if possible
squito Apr 3, 2015
879ad73
fix output.largeBuffers, add test
squito Apr 3, 2015
bae8f23
fix NPE
squito Apr 3, 2015
c2e721b
restructure tests
squito Apr 3, 2015
8920e76
another test
squito Apr 4, 2015
ce1ac7c
test placeholders
squito Apr 4, 2015
b5dab44
test for large mapped file
squito Apr 4, 2015
248dc1c
cleanup
squito Apr 4, 2015
9e8b8c7
inputstream tests & fixes
squito Apr 4, 2015
d6c92e9
cleanup
squito Apr 6, 2015
ebc0344
Merge branch 'master' into SPARK-1391_2g_partition_limit
squito Apr 6, 2015
8a65ca9
cleanup
squito Apr 6, 2015
0aad3d3
cleanup
squito Apr 6, 2015
f648e90
wip on shuffle 2GB error msgs
squito Apr 6, 2015
e57cb64
test exception message for failed replication w/ large blocks
squito Apr 6, 2015
69bd90e
get rid of BroadcastSuite test, not really relevant
squito Apr 6, 2015
13d9c0f
update tests
squito Apr 7, 2015
763e8dc
more docs & tests on LargeBB
squito Apr 7, 2015
5323dd7
style
squito Apr 7, 2015
ca32934
oops, grab the position before we close the channel
squito Apr 7, 2015
9834942
ByteArrayChunkOutputStream.slice, for grabbing aribitrary portion of …
squito Apr 7, 2015
4e1a842
LargeByteBuffer
squito Apr 7, 2015
8bd5606
update from PR feedback
squito Apr 9, 2015
ff9f968
LargeBBInputStream should dispose when we close the stream, not when …
squito Apr 9, 2015
a3dc811
add bounds checks to ByteArrayChunkOutputStream.slice()
squito Apr 9, 2015
da29c3d
cleanup rewind() a little
squito Apr 9, 2015
366f921
WrappedLargeByteBuffer.asByteBuffer shouldnt copy -- it should just e…
squito Apr 13, 2015
34c2131
sigh, need to make the WrappedLargeBB constructor w/ chunkSize public…
squito Apr 14, 2015
bf4ec0a
style
squito Apr 14, 2015
9d232d1
move package of LargeByteBufferIOStreams so that I can keep unsafe me…
squito Apr 14, 2015
6b2f751
fix typo
squito Apr 14, 2015
4da4626
error handling for get(); comments, style
squito Jun 1, 2015
2afb351
constructed WrappedLargeByteBuffer always has position==0, simplifies…
squito Jun 1, 2015
4042c1a
move LargeBBIn/Out Streams to java
squito Jun 1, 2015
3c599b2
add comments for MAX_CHUNK_SIZE
squito Jun 1, 2015
95588c2
updateCurrentBuffer --> updateCurrentBufferIfNeeded + comment
squito Jun 1, 2015
b77bbe2
style
squito Jun 1, 2015
6c2a115
add tests that buffers position is independent from underyling bytebu…
squito Jun 2, 2015
a9616e4
better variable name; more chunks in tests
squito Jun 2, 2015
0250ac5
private, @VisibleForTesting
squito Jun 2, 2015
b3b6363
fix newlines
squito Jun 2, 2015
112c49e
style
squito Jun 2, 2015
8ec2c5c
comment explaining check on subBufferSize
squito Jun 2, 2015
d0605a1
get() return this; another version of get() which just takes dest array
squito Jun 3, 2015
b6620d0
docs on LargeBBOutputStream
squito Jun 3, 2015
54d09af
@Override
squito Jun 3, 2015
31244c8
fix comment
squito Jun 3, 2015
6594c81
Merge branch 'master' into SPARK-1391_2g_partition_limit
squito Jun 10, 2015
6cf204f
fix mistakes w/ merge
squito Jun 10, 2015
05ba79a
Merge branch 'SPARK-6190_largeBB' into SPARK-1391_2g_partition_limit
squito Jun 10, 2015
ca23763
Merge branch 'master' into SPARK-6190_largeBB
squito Jun 10, 2015
4c7117b
Merge branch 'SPARK-6190_largeBB' into SPARK-1391_2g_partition_limit
squito Jun 10, 2015
160a458
more merge fixes
squito Jun 10, 2015
36d1801
Merge branch 'master' into SPARK-6190_largeBB
squito Aug 19, 2015
e3f9bfc
Merge branch 'master' into SPARK-1391_2g_partition_limit
squito Aug 19, 2015
040a461
use random numbers for test
squito Aug 19, 2015
69e3009
Merge branch 'SPARK-6190_largeBB' into SPARK-1391_2g_partition_limit
squito Aug 19, 2015
0fa150f
bring up to date
squito Aug 19, 2015
fb733c5
fix check for underlying ByteBuffer class
squito Aug 19, 2015
b921b0b
fix check, add test
squito Aug 19, 2015
e0f5130
fix check
squito Aug 19, 2015
331d517
ignore the big tests for now
squito Aug 19, 2015
9d56e1b
Merge branch 'master' into SPARK-1391_2g_partition_limit
squito Aug 19, 2015
06c8ffa
style
squito Aug 19, 2015
200afdc
whoops, swap cases
squito Aug 20, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.network.buffer;

import java.io.InputStream;

import com.google.common.annotations.VisibleForTesting;

/**
* Reads data from a LargeByteBuffer, and optionally cleans it up using buffer.dispose()
* when the stream is closed (e.g. to close a memory-mapped file).
*/
public class LargeByteBufferInputStream extends InputStream {

private LargeByteBuffer buffer;
private final boolean dispose;

public LargeByteBufferInputStream(LargeByteBuffer buffer, boolean dispose) {
this.buffer = buffer;
this.dispose = dispose;
}

public LargeByteBufferInputStream(LargeByteBuffer buffer) {
this(buffer, false);
}

@Override
public int read() {
if (buffer == null || buffer.remaining() == 0) {
return -1;
} else {
return buffer.get() & 0xFF;
}
}

@Override
public int read(byte[] dest) {
return read(dest, 0, dest.length);
}

@Override
public int read(byte[] dest, int offset, int length) {
if (buffer == null || buffer.remaining() == 0) {
return -1;
} else {
int amountToGet = (int) Math.min(buffer.remaining(), length);
buffer.get(dest, offset, amountToGet);
return amountToGet;
}
}

@Override
public long skip(long toSkip) {
if (buffer != null) {
return buffer.skip(toSkip);
} else {
return 0L;
}
}

// only for testing
@VisibleForTesting
boolean disposed = false;

/**
* Clean up the buffer, and potentially dispose of it
*/
@Override
public void close() {
if (buffer != null) {
if (dispose) {
buffer.dispose();
disposed = true;
}
buffer = null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.network.buffer;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;

import com.google.common.annotations.VisibleForTesting;
import org.apache.spark.util.io.ByteArrayChunkOutputStream;

/**
* An OutputStream that will write all data to memory. It supports writing over 2GB
* and the resulting data can be retrieved as a
* {@link org.apache.spark.network.buffer.LargeByteBuffer}
*/
public class LargeByteBufferOutputStream extends OutputStream {

private final ByteArrayChunkOutputStream output;

/**
* Create a new LargeByteBufferOutputStream which writes to byte arrays of the given size. Note
* that <code>chunkSize</code> has <b>no effect</b> on the LargeByteBuffer returned by
* {@link #largeBuffer()}.
*
* @param chunkSize size of the byte arrays used by this output stream, in bytes
*/
public LargeByteBufferOutputStream(int chunkSize) {
output = new ByteArrayChunkOutputStream(chunkSize);
}

@Override
public void write(int b) {
output.write(b);
}

@Override
public void write(byte[] bytes, int off, int len) {
output.write(bytes, off, len);
}

/**
* Get all of the data written to the stream so far as a LargeByteBuffer. This method can be
* called multiple times, and each returned buffer will be completely independent (the data
* is copied for each returned buffer). It does not close the stream.
*
* @return the data written to the stream as a LargeByteBuffer
*/
public LargeByteBuffer largeBuffer() {
return largeBuffer(LargeByteBufferHelper.MAX_CHUNK_SIZE);
}

/**
* exposed for testing. You don't really ever want to call this method -- the returned
* buffer will not implement {{asByteBuffer}} correctly.
*/
@VisibleForTesting
LargeByteBuffer largeBuffer(int maxChunk) {
long totalSize = output.size();
int chunksNeeded = (int) ((totalSize + maxChunk - 1) / maxChunk);
ByteBuffer[] chunks = new ByteBuffer[chunksNeeded];
long remaining = totalSize;
long pos = 0;
for (int idx = 0; idx < chunksNeeded; idx++) {
int nextSize = (int) Math.min(maxChunk, remaining);
chunks[idx] = ByteBuffer.wrap(output.slice(pos, pos + nextSize));
pos += nextSize;
remaining -= nextSize;
}
return new WrappedLargeByteBuffer(chunks, maxChunk);
}

@Override
public void close() throws IOException {
output.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@
package org.apache.spark.broadcast

import java.io._
import java.nio.ByteBuffer

import scala.collection.JavaConversions.asJavaEnumeration
import scala.reflect.ClassTag
import scala.util.Random

import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.network.buffer.{LargeByteBufferInputStream, LargeByteBufferHelper, LargeByteBuffer}
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
import org.apache.spark.util.{ByteBufferInputStream, Utils}
import org.apache.spark.util.Utils
import org.apache.spark.util.io.ByteArrayChunkOutputStream

/**
Expand Down Expand Up @@ -111,10 +111,10 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
}

/** Fetch torrent blocks from the driver and/or other executors. */
private def readBlocks(): Array[ByteBuffer] = {
private def readBlocks(): Array[LargeByteBuffer] = {
// Fetch chunks of data. Note that all these chunks are stored in the BlockManager and reported
// to the driver, so other executors can pull these chunks from this executor as well.
val blocks = new Array[ByteBuffer](numBlocks)
val blocks = new Array[LargeByteBuffer](numBlocks)
val bm = SparkEnv.get.blockManager

for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
Expand All @@ -123,8 +123,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
// First try getLocalBytes because there is a chance that previous attempts to fetch the
// broadcast blocks have already fetched some of the blocks. In that case, some blocks
// would be available locally (on this executor).
def getLocal: Option[ByteBuffer] = bm.getLocalBytes(pieceId)
def getRemote: Option[ByteBuffer] = bm.getRemoteBytes(pieceId).map { block =>
def getLocal: Option[LargeByteBuffer] = bm.getLocalBytes(pieceId)
def getRemote: Option[LargeByteBuffer] = bm.getRemoteBytes(pieceId).map { block =>
// If we found the block from remote executors/driver's BlockManager, put the block
// in this executor's BlockManager.
SparkEnv.get.blockManager.putBytes(
Expand All @@ -134,7 +134,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
tellMaster = true)
block
}
val block: ByteBuffer = getLocal.orElse(getRemote).getOrElse(
val block: LargeByteBuffer = getLocal.orElse(getRemote).getOrElse(
throw new SparkException(s"Failed to get $pieceId of $broadcastId"))
blocks(pid) = block
}
Expand Down Expand Up @@ -195,22 +195,22 @@ private object TorrentBroadcast extends Logging {
obj: T,
blockSize: Int,
serializer: Serializer,
compressionCodec: Option[CompressionCodec]): Array[ByteBuffer] = {
compressionCodec: Option[CompressionCodec]): Array[LargeByteBuffer] = {
val bos = new ByteArrayChunkOutputStream(blockSize)
val out: OutputStream = compressionCodec.map(c => c.compressedOutputStream(bos)).getOrElse(bos)
val ser = serializer.newInstance()
val serOut = ser.serializeStream(out)
serOut.writeObject[T](obj).close()
bos.toArrays.map(ByteBuffer.wrap)
bos.toArrays.map(LargeByteBufferHelper.asLargeByteBuffer)
}

def unBlockifyObject[T: ClassTag](
blocks: Array[ByteBuffer],
blocks: Array[LargeByteBuffer],
serializer: Serializer,
compressionCodec: Option[CompressionCodec]): T = {
require(blocks.nonEmpty, "Cannot unblockify an empty array of blocks")
val is = new SequenceInputStream(
asJavaEnumeration(blocks.iterator.map(block => new ByteBufferInputStream(block))))
asJavaEnumeration(blocks.iterator.map(block => new LargeByteBufferInputStream(block))))
val in: InputStream = compressionCodec.map(c => c.compressedInputStream(is)).getOrElse(is)
val ser = serializer.newInstance()
val serIn = ser.deserializeStream(in)
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import java.net.URL
import java.nio.ByteBuffer
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}

import org.apache.spark.network.buffer.LargeByteBufferHelper

import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.util.control.NonFatal
Expand Down Expand Up @@ -266,7 +268,8 @@ private[spark] class Executor(
} else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
blockId, LargeByteBufferHelper.asLargeByteBuffer(serializedDirectResult),
StorageLevel.MEMORY_AND_DISK_SER)
logInfo(
s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import scala.concurrent.{Promise, Await, Future}
import scala.concurrent.duration.Duration

import org.apache.spark.Logging
import org.apache.spark.network.buffer.{NioManagedBuffer, ManagedBuffer}
import org.apache.spark.network.buffer.{LargeByteBufferHelper, NioManagedBuffer, ManagedBuffer}
import org.apache.spark.network.shuffle.{ShuffleClient, BlockFetchingListener}
import org.apache.spark.storage.{BlockManagerId, BlockId, StorageLevel}
import org.apache.spark.storage.{BlockId, StorageLevel}

private[spark]
abstract class BlockTransferService extends ShuffleClient with Closeable with Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ import scala.collection.JavaConversions._

import org.apache.spark.Logging
import org.apache.spark.network.BlockDataManager
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.buffer.{BufferTooLargeException, ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
import org.apache.spark.network.server.{OneForOneStreamManager, RpcHandler, StreamManager}
import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, OpenBlocks, StreamHandle, UploadBlock}
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.storage.{ShuffleRemoteBlockSizeLimitException, BlockId, StorageLevel}

/**
* Serves requests to open blocks by simply registering one chunk per block requested.
Expand All @@ -53,11 +53,24 @@ class NettyBlockRpcServer(

message match {
case openBlocks: OpenBlocks =>
val blocks: Seq[ManagedBuffer] =
openBlocks.blockIds.map(BlockId.apply).map(blockManager.getBlockData)
val streamId = streamManager.registerStream(blocks.iterator)
logTrace(s"Registered streamId $streamId with ${blocks.size} buffers")
responseContext.onSuccess(new StreamHandle(streamId, blocks.size).toByteArray)
try {
val blocks: Seq[ManagedBuffer] =
openBlocks.blockIds.map(BlockId.apply).map(blockManager.getBlockData)
val streamId = streamManager.registerStream(blocks.iterator)
logTrace(s"Registered streamId $streamId with ${blocks.size} buffers")
responseContext.onSuccess(new StreamHandle(streamId, blocks.size).toByteArray)
} catch {
// shouldn't ever happen, b/c we should prevent writing 2GB shuffle files,
// but just to be safe
case ex: BufferTooLargeException =>
// throw & catch this helper exception, just to get full stack trace
try {
throw new ShuffleRemoteBlockSizeLimitException(ex)
} catch {
case ex2: ShuffleRemoteBlockSizeLimitException =>
responseContext.onFailure(ex2)
}
}

case uploadBlock: UploadBlock =>
// StorageLevel is serialized as bytes using our JavaSerializer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
return
}
val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
serializedTaskResult.get)
serializedTaskResult.get.asByteBuffer())
sparkEnv.blockManager.master.removeBlock(blockId)
(deserializedResult, size)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.shuffle

import java.nio.ByteBuffer

import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.storage.ShuffleBlockId

Expand Down
Loading