Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
}

/**
* Try to shrink the size of this storage memory pool by `spaceToFree` bytes. Return the number
* of bytes removed from the pool's capacity.
* Free space to shrink the size of this storage memory pool by `spaceToFree` bytes.
* Note: this method doesn't actually reduce the pool size but relies on the caller to do so.
*
* @return number of bytes to be removed from the pool's capacity.
*/
def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized {
// First, shrink the pool by reclaiming free memory:
def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized {
val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
decrementPoolSize(spaceFreedByReleasingUnusedMemory)
val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
if (remainingSpaceToFree > 0) {
// If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
Expand All @@ -134,7 +134,6 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum
// When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
// not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
decrementPoolSize(spaceFreedByEviction)
spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
} else {
spaceFreedByReleasingUnusedMemory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,12 @@ private[spark] class UnifiedMemoryManager private[memory] (
storageRegionSize,
maxMemory - storageRegionSize) {

assertInvariant()

// We always maintain this invariant:
assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
private def assertInvariant(): Unit = {
assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
}

override def maxStorageMemory: Long = synchronized {
maxMemory - onHeapExecutionMemoryPool.memoryUsed
Expand All @@ -77,7 +81,7 @@ private[spark] class UnifiedMemoryManager private[memory] (
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long = synchronized {
assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
assertInvariant()
assert(numBytes >= 0)
memoryMode match {
case MemoryMode.ON_HEAP =>
Expand All @@ -99,9 +103,10 @@ private[spark] class UnifiedMemoryManager private[memory] (
math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize)
if (memoryReclaimableFromStorage > 0) {
// Only reclaim as much space as is necessary and available:
val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
val spaceToReclaim = storageMemoryPool.freeSpaceToShrinkPool(
math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
storageMemoryPool.decrementPoolSize(spaceToReclaim)
onHeapExecutionMemoryPool.incrementPoolSize(spaceToReclaim)
}
}
}
Expand Down Expand Up @@ -137,7 +142,7 @@ private[spark] class UnifiedMemoryManager private[memory] (
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
assertInvariant()
assert(numBytes >= 0)
if (numBytes > maxStorageMemory) {
// Fail fast if the block simply won't fit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,21 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
ms
}

/**
* Make a mocked [[MemoryStore]] whose [[MemoryStore.evictBlocksToFreeSpace]] method is
* stubbed to always throw [[RuntimeException]].
*/
protected def makeBadMemoryStore(mm: MemoryManager): MemoryStore = {
val ms = mock(classOf[MemoryStore], RETURNS_SMART_NULLS)
when(ms.evictBlocksToFreeSpace(any(), anyLong(), any())).thenAnswer(new Answer[Long] {
override def answer(invocation: InvocationOnMock): Long = {
throw new RuntimeException("bad memory store!")
}
})
mm.setMemoryStore(ms)
ms
}

/**
* Simulate the part of [[MemoryStore.evictBlocksToFreeSpace]] that releases storage memory.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,4 +255,27 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
assert(evictedBlocks.nonEmpty)
}

test("SPARK-15260: atomically resize memory pools") {
val conf = new SparkConf()
.set("spark.memory.fraction", "1")
.set("spark.memory.storageFraction", "0")
.set("spark.testing.memory", "1000")
val mm = UnifiedMemoryManager(conf, numCores = 2)
makeBadMemoryStore(mm)
val memoryMode = MemoryMode.ON_HEAP
// Acquire 1000 then release 600 bytes of storage memory, leaving the
// storage memory pool at 1000 bytes but only 400 bytes of which are used.
assert(mm.acquireStorageMemory(dummyBlock, 1000L, evictedBlocks))
mm.releaseStorageMemory(600L)
// Before the fix for SPARK-15260, we would first shrink the storage pool by the amount of
// unused storage memory (600 bytes), try to evict blocks, then enlarge the execution pool
// by the same amount. If the eviction threw an exception, then we would shrink one pool
// without enlarging the other, resulting in an assertion failure.
intercept[RuntimeException] {
mm.acquireExecutionMemory(1000L, 0, memoryMode)
}
val assertInvariant = PrivateMethod[Unit]('assertInvariant)
mm.invokePrivate[Unit](assertInvariant())
}

}