Skip to content

Commit 24185ea

Browse files
committed
Avoid dropping a block back to disk if reading from disk
When the BlockManager reads a block from disk, it attempts to cache it in memory. However, when we put the values in memory, it writes them back out to disk, which is unnecessary and has confusing semantics. Instead, we create a flag "allowPersistToDisk" for memory store's putValues to identify cases we don't want to drop the block to disk even if it doesn't fit in memory.
1 parent 2b7ee66 commit 24185ea

File tree

2 files changed

+26
-7
lines changed

2 files changed

+26
-7
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -463,15 +463,13 @@ private[spark] class BlockManager(
463463
val values = dataDeserialize(blockId, bytes)
464464
if (level.deserialized) {
465465
// Cache the values before returning them
466-
val putResult = memoryStore.putValues(blockId, values, level, returnValues = true)
466+
val putResult = memoryStore.putValues(
467+
blockId, values, level, returnValues = true, allowPersistToDisk = false)
467468
putResult.data match {
468469
case Left(it) =>
469470
return Some(new BlockResult(it, DataReadMethod.Disk, info.size))
470-
case Right(b) =>
471-
return Some(new BlockResult(
472-
dataDeserialize(blockId, b),
473-
DataReadMethod.Disk,
474-
info.size))
471+
case _ =>
472+
throw new SparkException("Memory store did not return an iterator!")
475473
}
476474
} else {
477475
return Some(new BlockResult(values, DataReadMethod.Disk, info.size))

core/src/main/scala/org/apache/spark/storage/MemoryStore.scala

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,27 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
9797
values: Iterator[Any],
9898
level: StorageLevel,
9999
returnValues: Boolean): PutResult = {
100+
putValues(blockId, values, level, returnValues, allowPersistToDisk = true)
101+
}
102+
103+
/**
104+
* Attempt to put the given block in memory store.
105+
*
106+
* There may not be enough space to fully unroll the iterator in memory, in which case we
107+
* optionally drop the values to disk if
108+
* (1) the block's storage level specifies useDisk, and
109+
* (2) `allowPersistToDisk` is true.
110+
*
111+
* One scenario in which `allowPersistToDisk` is false is when the BlockManager reads a block
112+
* back from disk and attempts to cache it in memory. In this case, we should not persist the
113+
* block back on disk again, as it is already in disk store.
114+
*/
115+
private[storage] def putValues(
116+
blockId: BlockId,
117+
values: Iterator[Any],
118+
level: StorageLevel,
119+
returnValues: Boolean,
120+
allowPersistToDisk: Boolean): PutResult = {
100121
val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
101122
val unrolledValues = unrollSafely(blockId, values, droppedBlocks)
102123
unrolledValues match {
@@ -108,7 +129,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
108129
case Right(iteratorValues) =>
109130
// Not enough space to unroll this block; drop to disk if applicable
110131
logWarning(s"Not enough space to store $blockId in memory! Free memory is ${freeMemory}B.")
111-
if (level.useDisk) {
132+
if (level.useDisk && allowPersistToDisk) {
112133
logWarning(s"Persisting $blockId to disk instead.")
113134
val newLevel = StorageLevel(
114135
useDisk = true,

0 commit comments

Comments
 (0)