Skip to content

Commit 97678ed

Browse files
author
Andrew Or
committed
[SPARK-12390] Clean up unused serializer parameter in BlockManager
No change in functionality is intended. This only changes internal API. Author: Andrew Or <[email protected]> Closes #10343 from andrewor14/clean-bm-serializer.
1 parent d1508dd commit 97678ed

File tree

2 files changed

+11
-28
lines changed

2 files changed

+11
-28
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1190,45 +1190,38 @@ private[spark] class BlockManager(
11901190
def dataSerializeStream(
11911191
blockId: BlockId,
11921192
outputStream: OutputStream,
1193-
values: Iterator[Any],
1194-
serializer: Serializer = defaultSerializer): Unit = {
1193+
values: Iterator[Any]): Unit = {
11951194
val byteStream = new BufferedOutputStream(outputStream)
1196-
val ser = serializer.newInstance()
1195+
val ser = defaultSerializer.newInstance()
11971196
ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
11981197
}
11991198

12001199
/** Serializes into a byte buffer. */
1201-
def dataSerialize(
1202-
blockId: BlockId,
1203-
values: Iterator[Any],
1204-
serializer: Serializer = defaultSerializer): ByteBuffer = {
1200+
def dataSerialize(blockId: BlockId, values: Iterator[Any]): ByteBuffer = {
12051201
val byteStream = new ByteBufferOutputStream(4096)
1206-
dataSerializeStream(blockId, byteStream, values, serializer)
1202+
dataSerializeStream(blockId, byteStream, values)
12071203
byteStream.toByteBuffer
12081204
}
12091205

12101206
/**
12111207
* Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of
12121208
* the iterator is reached.
12131209
*/
1214-
def dataDeserialize(
1215-
blockId: BlockId,
1216-
bytes: ByteBuffer,
1217-
serializer: Serializer = defaultSerializer): Iterator[Any] = {
1210+
def dataDeserialize(blockId: BlockId, bytes: ByteBuffer): Iterator[Any] = {
12181211
bytes.rewind()
1219-
dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true), serializer)
1212+
dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true))
12201213
}
12211214

12221215
/**
12231216
* Deserializes a InputStream into an iterator of values and disposes of it when the end of
12241217
* the iterator is reached.
12251218
*/
1226-
def dataDeserializeStream(
1227-
blockId: BlockId,
1228-
inputStream: InputStream,
1229-
serializer: Serializer = defaultSerializer): Iterator[Any] = {
1219+
def dataDeserializeStream(blockId: BlockId, inputStream: InputStream): Iterator[Any] = {
12301220
val stream = new BufferedInputStream(inputStream)
1231-
serializer.newInstance().deserializeStream(wrapForCompression(blockId, stream)).asIterator
1221+
defaultSerializer
1222+
.newInstance()
1223+
.deserializeStream(wrapForCompression(blockId, stream))
1224+
.asIterator
12321225
}
12331226

12341227
def stop(): Unit = {

core/src/main/scala/org/apache/spark/storage/DiskStore.scala

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -144,16 +144,6 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
144144
getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
145145
}
146146

147-
/**
148-
* A version of getValues that allows a custom serializer. This is used as part of the
149-
* shuffle short-circuit code.
150-
*/
151-
def getValues(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = {
152-
// TODO: Should bypass getBytes and use a stream based implementation, so that
153-
// we won't use a lot of memory during e.g. external sort merge.
154-
getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes, serializer))
155-
}
156-
157147
override def remove(blockId: BlockId): Boolean = {
158148
val file = diskManager.getFile(blockId.name)
159149
if (file.exists()) {

0 commit comments

Comments
 (0)