From 68eabd48b5be6adcf30192a6e5d8ea0d02674918 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 16 Dec 2015 18:02:55 -0800 Subject: [PATCH] Clean up unused parameters / code --- .../apache/spark/storage/BlockManager.scala | 29 +++++++------------ .../org/apache/spark/storage/DiskStore.scala | 10 ------- 2 files changed, 11 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 540e1ec003a2..6074fc58d70d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1190,20 +1190,16 @@ private[spark] class BlockManager( def dataSerializeStream( blockId: BlockId, outputStream: OutputStream, - values: Iterator[Any], - serializer: Serializer = defaultSerializer): Unit = { + values: Iterator[Any]): Unit = { val byteStream = new BufferedOutputStream(outputStream) - val ser = serializer.newInstance() + val ser = defaultSerializer.newInstance() ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close() } /** Serializes into a byte buffer. */ - def dataSerialize( - blockId: BlockId, - values: Iterator[Any], - serializer: Serializer = defaultSerializer): ByteBuffer = { + def dataSerialize(blockId: BlockId, values: Iterator[Any]): ByteBuffer = { val byteStream = new ByteBufferOutputStream(4096) - dataSerializeStream(blockId, byteStream, values, serializer) + dataSerializeStream(blockId, byteStream, values) byteStream.toByteBuffer } @@ -1211,24 +1207,21 @@ private[spark] class BlockManager( * Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of * the iterator is reached. */ - def dataDeserialize( - blockId: BlockId, - bytes: ByteBuffer, - serializer: Serializer = defaultSerializer): Iterator[Any] = { + def dataDeserialize(blockId: BlockId, bytes: ByteBuffer): Iterator[Any] = { bytes.rewind() - dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true), serializer) + dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true)) } /** * Deserializes a InputStream into an iterator of values and disposes of it when the end of * the iterator is reached. */ - def dataDeserializeStream( - blockId: BlockId, - inputStream: InputStream, - serializer: Serializer = defaultSerializer): Iterator[Any] = { + def dataDeserializeStream(blockId: BlockId, inputStream: InputStream): Iterator[Any] = { val stream = new BufferedInputStream(inputStream) - serializer.newInstance().deserializeStream(wrapForCompression(blockId, stream)).asIterator + defaultSerializer + .newInstance() + .deserializeStream(wrapForCompression(blockId, stream)) + .asIterator } def stop(): Unit = { diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index c008b9dc1632..6c4477184d5b 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -144,16 +144,6 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer)) } - /** - * A version of getValues that allows a custom serializer. This is used as part of the - * shuffle short-circuit code. - */ - def getValues(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = { - // TODO: Should bypass getBytes and use a stream based implementation, so that - // we won't use a lot of memory during e.g. external sort merge. - getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes, serializer)) - } - override def remove(blockId: BlockId): Boolean = { val file = diskManager.getFile(blockId.name) if (file.exists()) {