Skip to content

Commit f9ff82e

Browse files
committed
putValues -> putIterator + putArray
1 parent beb368f commit f9ff82e

File tree

5 files changed

+20
-20
lines changed

5 files changed

+20
-20
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -463,10 +463,10 @@ private[spark] class BlockManager(
463463
val values = dataDeserialize(blockId, bytes)
464464
if (level.deserialized) {
465465
// Cache the values before returning them
466-
val putResult = memoryStore.putValues(
466+
val putResult = memoryStore.putIterator(
467467
blockId, values, level, returnValues = true, allowPersistToDisk = false)
468468
// The put may or may not have succeeded, depending on whether there was enough
469-
// space to unroll the block. Either way, putValues should return an iterator.
469+
// space to unroll the block. Either way, the put here should return an iterator.
470470
putResult.data match {
471471
case Left(it) =>
472472
return Some(new BlockResult(it, DataReadMethod.Disk, info.size))
@@ -716,9 +716,9 @@ private[spark] class BlockManager(
716716
// Actually put the values
717717
val result = data match {
718718
case IteratorValues(iterator) =>
719-
blockStore.putValues(blockId, iterator, putLevel, returnValues)
719+
blockStore.putIterator(blockId, iterator, putLevel, returnValues)
720720
case ArrayValues(array) =>
721-
blockStore.putValues(blockId, array, putLevel, returnValues)
721+
blockStore.putArray(blockId, array, putLevel, returnValues)
722722
case ByteBufferValues(bytes) =>
723723
bytes.rewind()
724724
blockStore.putBytes(blockId, bytes, putLevel)
@@ -873,7 +873,7 @@ private[spark] class BlockManager(
873873
logInfo(s"Writing block $blockId to disk")
874874
data match {
875875
case Left(elements) =>
876-
diskStore.putValues(blockId, elements, level, returnValues = false)
876+
diskStore.putArray(blockId, elements, level, returnValues = false)
877877
case Right(bytes) =>
878878
diskStore.putBytes(blockId, bytes, level)
879879
}

core/src/main/scala/org/apache/spark/storage/BlockStore.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,13 @@ private[spark] abstract class BlockStore(val blockManager: BlockManager) extends
3737
* @return a PutResult that contains the size of the data, as well as the values put if
3838
* returnValues is true (if not, the result's data field can be null)
3939
*/
40-
def putValues(
40+
def putIterator(
4141
blockId: BlockId,
4242
values: Iterator[Any],
4343
level: StorageLevel,
4444
returnValues: Boolean): PutResult
4545

46-
def putValues(
46+
def putArray(
4747
blockId: BlockId,
4848
values: Array[Any],
4949
level: StorageLevel,

core/src/main/scala/org/apache/spark/storage/DiskStore.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,15 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
5555
PutResult(bytes.limit(), Right(bytes.duplicate()))
5656
}
5757

58-
override def putValues(
58+
override def putArray(
5959
blockId: BlockId,
6060
values: Array[Any],
6161
level: StorageLevel,
6262
returnValues: Boolean): PutResult = {
63-
putValues(blockId, values.toIterator, level, returnValues)
63+
putIterator(blockId, values.toIterator, level, returnValues)
6464
}
6565

66-
override def putValues(
66+
override def putIterator(
6767
blockId: BlockId,
6868
values: Iterator[Any],
6969
level: StorageLevel,

core/src/main/scala/org/apache/spark/storage/MemoryStore.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,14 +78,14 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
7878
bytes.rewind()
7979
if (level.deserialized) {
8080
val values = blockManager.dataDeserialize(blockId, bytes)
81-
putValues(blockId, values, level, returnValues = true)
81+
putIterator(blockId, values, level, returnValues = true)
8282
} else {
8383
val putAttempt = tryToPut(blockId, bytes, bytes.limit, deserialized = false)
8484
PutResult(bytes.limit(), Right(bytes.duplicate()), putAttempt.droppedBlocks)
8585
}
8686
}
8787

88-
override def putValues(
88+
override def putArray(
8989
blockId: BlockId,
9090
values: Array[Any],
9191
level: StorageLevel,
@@ -101,12 +101,12 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
101101
}
102102
}
103103

104-
override def putValues(
104+
override def putIterator(
105105
blockId: BlockId,
106106
values: Iterator[Any],
107107
level: StorageLevel,
108108
returnValues: Boolean): PutResult = {
109-
putValues(blockId, values, level, returnValues, allowPersistToDisk = true)
109+
putIterator(blockId, values, level, returnValues, allowPersistToDisk = true)
110110
}
111111

112112
/**
@@ -121,7 +121,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
121121
* back from disk and attempts to cache it in memory. In this case, we should not persist the
122122
* block back on disk again, as it is already in disk store.
123123
*/
124-
private[storage] def putValues(
124+
private[storage] def putIterator(
125125
blockId: BlockId,
126126
values: Iterator[Any],
127127
level: StorageLevel,
@@ -132,7 +132,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
132132
unrolledValues match {
133133
case Left(arrayValues) =>
134134
// Values are fully unrolled in memory, so store them as an array
135-
val res = putValues(blockId, arrayValues, level, returnValues)
135+
val res = putArray(blockId, arrayValues, level, returnValues)
136136
droppedBlocks ++= res.droppedBlocks
137137
PutResult(res.size, res.data, droppedBlocks)
138138
case Right(iteratorValues) =>
@@ -141,7 +141,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
141141
s"Free memory is $freeMemory bytes.")
142142
if (level.useDisk && allowPersistToDisk) {
143143
logWarning(s"Persisting block $blockId to disk instead.")
144-
val res = blockManager.diskStore.putValues(blockId, iteratorValues, level, returnValues)
144+
val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues)
145145
PutResult(res.size, res.data, droppedBlocks)
146146
} else {
147147
PutResult(0, Left(iteratorValues), droppedBlocks)

core/src/main/scala/org/apache/spark/storage/TachyonStore.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,15 @@ private[spark] class TachyonStore(
4343
putIntoTachyonStore(blockId, bytes, returnValues = true)
4444
}
4545

46-
override def putValues(
46+
override def putArray(
4747
blockId: BlockId,
4848
values: Array[Any],
4949
level: StorageLevel,
5050
returnValues: Boolean): PutResult = {
51-
putValues(blockId, values.toIterator, level, returnValues)
51+
putIterator(blockId, values.toIterator, level, returnValues)
5252
}
5353

54-
override def putValues(
54+
override def putIterator(
5555
blockId: BlockId,
5656
values: Iterator[Any],
5757
level: StorageLevel,

0 commit comments

Comments
 (0)