Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 11 additions & 18 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1190,45 +1190,38 @@ private[spark] class BlockManager(
def dataSerializeStream(
blockId: BlockId,
outputStream: OutputStream,
values: Iterator[Any],
serializer: Serializer = defaultSerializer): Unit = {
values: Iterator[Any]): Unit = {
val byteStream = new BufferedOutputStream(outputStream)
val ser = serializer.newInstance()
val ser = defaultSerializer.newInstance()
ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
}

/** Serializes into a byte buffer. */
def dataSerialize(
blockId: BlockId,
values: Iterator[Any],
serializer: Serializer = defaultSerializer): ByteBuffer = {
def dataSerialize(blockId: BlockId, values: Iterator[Any]): ByteBuffer = {
val byteStream = new ByteBufferOutputStream(4096)
dataSerializeStream(blockId, byteStream, values, serializer)
dataSerializeStream(blockId, byteStream, values)
byteStream.toByteBuffer
}

/**
* Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of
* the iterator is reached.
*/
def dataDeserialize(
blockId: BlockId,
bytes: ByteBuffer,
serializer: Serializer = defaultSerializer): Iterator[Any] = {
def dataDeserialize(blockId: BlockId, bytes: ByteBuffer): Iterator[Any] = {
bytes.rewind()
dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true), serializer)
dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true))
}

/**
* Deserializes a InputStream into an iterator of values and disposes of it when the end of
* the iterator is reached.
*/
def dataDeserializeStream(
blockId: BlockId,
inputStream: InputStream,
serializer: Serializer = defaultSerializer): Iterator[Any] = {
def dataDeserializeStream(blockId: BlockId, inputStream: InputStream): Iterator[Any] = {
val stream = new BufferedInputStream(inputStream)
serializer.newInstance().deserializeStream(wrapForCompression(blockId, stream)).asIterator
defaultSerializer
.newInstance()
.deserializeStream(wrapForCompression(blockId, stream))
.asIterator
}

def stop(): Unit = {
Expand Down
10 changes: 0 additions & 10 deletions core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,6 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
}

/**
* A version of getValues that allows a custom serializer. This is used as part of the
* shuffle short-circuit code.
*/
def getValues(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = {
// TODO: Should bypass getBytes and use a stream based implementation, so that
// we won't use a lot of memory during e.g. external sort merge.
getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes, serializer))
}

override def remove(blockId: BlockId): Boolean = {
val file = diskManager.getFile(blockId.name)
if (file.exists()) {
Expand Down