Skip to content

Commit 83050dd

Browse files
Andrew Ordavies
authored andcommitted
[SPARK-15260] Atomically resize memory pools
## What changes were proposed in this pull request? When we acquire execution memory, we do a lot of things between shrinking the storage memory pool and enlarging the execution memory pool. In particular, we call `memoryStore.evictBlocksToFreeSpace`, which may do a lot of I/O and can throw exceptions. If an exception is thrown, the pool sizes on that executor will be in a bad state. This patch minimizes the things we do between the two calls to make the resizing more atomic. ## How was this patch tested? Jenkins. Author: Andrew Or <[email protected]> Closes #13039 from andrewor14/safer-pool. (cherry picked from commit bb88ad4) Signed-off-by: Davies Liu <[email protected]>
1 parent 6b36185 commit 83050dd

File tree

4 files changed

+46
-8
lines changed

4 files changed

+46
-8
lines changed

core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,21 +116,20 @@ private[memory] class StorageMemoryPool(
116116
}
117117

118118
/**
119-
* Try to shrink the size of this storage memory pool by `spaceToFree` bytes. Return the number
120-
* of bytes removed from the pool's capacity.
119+
* Free space to shrink the size of this storage memory pool by `spaceToFree` bytes.
120+
* Note: this method doesn't actually reduce the pool size but relies on the caller to do so.
121+
*
122+
* @return number of bytes to be removed from the pool's capacity.
121123
*/
122-
def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized {
123-
// First, shrink the pool by reclaiming free memory:
124+
def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized {
124125
val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
125-
decrementPoolSize(spaceFreedByReleasingUnusedMemory)
126126
val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
127127
if (remainingSpaceToFree > 0) {
128128
// If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
129129
val spaceFreedByEviction =
130130
memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode)
131131
// When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
132132
// not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
133-
decrementPoolSize(spaceFreedByEviction)
134133
spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
135134
} else {
136135
spaceFreedByReleasingUnusedMemory

core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,10 @@ private[spark] class UnifiedMemoryManager private[memory] (
113113
storagePool.poolSize - storageRegionSize)
114114
if (memoryReclaimableFromStorage > 0) {
115115
// Only reclaim as much space as is necessary and available:
116-
val spaceReclaimed = storagePool.shrinkPoolToFreeSpace(
116+
val spaceToReclaim = storagePool.freeSpaceToShrinkPool(
117117
math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
118-
executionPool.incrementPoolSize(spaceReclaimed)
118+
storagePool.decrementPoolSize(spaceToReclaim)
119+
executionPool.incrementPoolSize(spaceToReclaim)
119120
}
120121
}
121122
}

core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,21 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
7878
ms
7979
}
8080

81+
/**
82+
* Make a mocked [[MemoryStore]] whose [[MemoryStore.evictBlocksToFreeSpace]] method is
83+
* stubbed to always throw [[RuntimeException]].
84+
*/
85+
protected def makeBadMemoryStore(mm: MemoryManager): MemoryStore = {
86+
val ms = mock(classOf[MemoryStore], RETURNS_SMART_NULLS)
87+
when(ms.evictBlocksToFreeSpace(any(), anyLong(), any())).thenAnswer(new Answer[Long] {
88+
override def answer(invocation: InvocationOnMock): Long = {
89+
throw new RuntimeException("bad memory store!")
90+
}
91+
})
92+
mm.setMemoryStore(ms)
93+
ms
94+
}
95+
8196
/**
8297
* Simulate the part of [[MemoryStore.evictBlocksToFreeSpace]] that releases storage memory.
8398
*

core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,4 +280,27 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
280280
assert(evictedBlocks.nonEmpty)
281281
}
282282

283+
test("SPARK-15260: atomically resize memory pools") {
284+
val conf = new SparkConf()
285+
.set("spark.memory.fraction", "1")
286+
.set("spark.memory.storageFraction", "0")
287+
.set("spark.testing.memory", "1000")
288+
val mm = UnifiedMemoryManager(conf, numCores = 2)
289+
makeBadMemoryStore(mm)
290+
val memoryMode = MemoryMode.ON_HEAP
291+
// Acquire 1000 then release 600 bytes of storage memory, leaving the
292+
// storage memory pool at 1000 bytes but only 400 bytes of which are used.
293+
assert(mm.acquireStorageMemory(dummyBlock, 1000L, memoryMode))
294+
mm.releaseStorageMemory(600L, memoryMode)
295+
// Before the fix for SPARK-15260, we would first shrink the storage pool by the amount of
296+
// unused storage memory (600 bytes), try to evict blocks, then enlarge the execution pool
297+
// by the same amount. If the eviction threw an exception, then we would shrink one pool
298+
// without enlarging the other, resulting in an assertion failure.
299+
intercept[RuntimeException] {
300+
mm.acquireExecutionMemory(1000L, 0, memoryMode)
301+
}
302+
val assertInvariants = PrivateMethod[Unit]('assertInvariants)
303+
mm.invokePrivate[Unit](assertInvariants())
304+
}
305+
283306
}

0 commit comments

Comments
 (0)