Skip to content

Commit df1b997

Browse files
author
Andrew Or
committed
Atomically resize the memory pools
1 parent 952d95c commit df1b997

File tree

2 files changed

+8
-8
lines changed

2 files changed

+8
-8
lines changed

core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -119,13 +119,13 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
119119
}
120120

121121
/**
122-
* Try to shrink the size of this storage memory pool by `spaceToFree` bytes. Return the number
123-
* of bytes removed from the pool's capacity.
122+
* Free space to shrink the size of this storage memory pool by `spaceToFree` bytes.
123+
* Note: this method doesn't actually reduce the pool size but relies on the caller to do so.
124+
*
125+
* @return number of bytes to be removed from the pool's capacity.
124126
*/
125-
def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized {
126-
// First, shrink the pool by reclaiming free memory:
127+
def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized {
127128
val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
128-
decrementPoolSize(spaceFreedByReleasingUnusedMemory)
129129
val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
130130
if (remainingSpaceToFree > 0) {
131131
// If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
@@ -134,7 +134,6 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
134134
val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum
135135
// When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
136136
// not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
137-
decrementPoolSize(spaceFreedByEviction)
138137
spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
139138
} else {
140139
spaceFreedByReleasingUnusedMemory

core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,10 @@ private[spark] class UnifiedMemoryManager private[memory] (
103103
math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize)
104104
if (memoryReclaimableFromStorage > 0) {
105105
// Only reclaim as much space as is necessary and available:
106-
val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
106+
val spaceToReclaim = storageMemoryPool.freeSpaceToShrinkPool(
107107
math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
108-
onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
108+
storageMemoryPool.decrementPoolSize(spaceToReclaim)
109+
onHeapExecutionMemoryPool.incrementPoolSize(spaceToReclaim)
109110
}
110111
}
111112
}

0 commit comments

Comments
 (0)