From 0d95847c53a6799fd81e667c70f112cde5dc4c02 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 10 May 2016 17:14:57 -0700 Subject: [PATCH 1/3] Add test --- .../spark/memory/MemoryManagerSuite.scala | 15 ++++++++++++ .../memory/UnifiedMemoryManagerSuite.scala | 23 +++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala index 555b640cb4244..6a195ef7fe5b3 100644 --- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala @@ -76,6 +76,21 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft ms } + /** + * Make a mocked [[MemoryStore]] whose [[MemoryStore.evictBlocksToFreeSpace]] method is + * stubbed to always throw [[RuntimeException]]. + */ + protected def makeBadMemoryStore(mm: MemoryManager): MemoryStore = { + val ms = mock(classOf[MemoryStore], RETURNS_SMART_NULLS) + when(ms.evictBlocksToFreeSpace(any(), anyLong(), any())).thenAnswer(new Answer[Long] { + override def answer(invocation: InvocationOnMock): Long = { + throw new RuntimeException("bad memory store!") + } + }) + mm.setMemoryStore(ms) + ms + } + /** * Simulate the part of [[MemoryStore.evictBlocksToFreeSpace]] that releases storage memory. * diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index 6cc48597d38f9..a0b8df04694f3 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -255,4 +255,27 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes assert(evictedBlocks.nonEmpty) } + test("SPARK-15260: atomically resize memory pools") { + val conf = new SparkConf() + .set("spark.memory.fraction", "1") + .set("spark.memory.storageFraction", "0") + .set("spark.testing.memory", "1000") + val mm = UnifiedMemoryManager(conf, numCores = 2) + makeBadMemoryStore(mm) + val memoryMode = MemoryMode.ON_HEAP + // Acquire 1000 then release 600 bytes of storage memory, leaving the + // storage memory pool at 1000 bytes but only 400 bytes of which are used. + assert(mm.acquireStorageMemory(dummyBlock, 1000L, memoryMode)) + mm.releaseStorageMemory(600L, memoryMode) + // Before the fix for SPARK-15260, we would first shrink the storage pool by the amount of + // unused storage memory (600 bytes), try to evict blocks, then enlarge the execution pool + // by the same amount. If the eviction threw an exception, then we would shrink one pool + // without enlarging the other, resulting in an assertion failure. + intercept[RuntimeException] { + mm.acquireExecutionMemory(1000L, 0, memoryMode) + } + val assertInvariants = PrivateMethod[Unit]('assertInvariants) + mm.invokePrivate[Unit](assertInvariants()) + } + } From 952d95c46478d803ec489add952c4ab120b9007f Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 11 May 2016 13:54:06 -0700 Subject: [PATCH 2/3] Update tests for branch-1.6 --- .../org/apache/spark/memory/UnifiedMemoryManager.scala | 10 +++++++--- .../spark/memory/UnifiedMemoryManagerSuite.scala | 8 ++++---- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index 829f054dba0e9..91a84ab743c92 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -57,8 +57,12 @@ private[spark] class UnifiedMemoryManager private[memory] ( storageRegionSize, maxMemory - storageRegionSize) { + assertInvariant() + // We always maintain this invariant: - assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory) + private def assertInvariant(): Unit = { + assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory) + } override def maxStorageMemory: Long = synchronized { maxMemory - onHeapExecutionMemoryPool.memoryUsed @@ -77,7 +81,7 @@ private[spark] class UnifiedMemoryManager private[memory] ( numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Long = synchronized { - assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory) + assertInvariant() assert(numBytes >= 0) memoryMode match { case MemoryMode.ON_HEAP => @@ -137,7 +141,7 @@ private[spark] class UnifiedMemoryManager private[memory] ( blockId: BlockId, numBytes: Long, evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized { - assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory) + assertInvariant() assert(numBytes >= 0) if (numBytes > maxStorageMemory) { // Fail fast if the block simply won't fit diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index a0b8df04694f3..46b6916a12fc2 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -265,8 +265,8 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes val memoryMode = MemoryMode.ON_HEAP // Acquire 1000 then release 600 bytes of storage memory, leaving the // storage memory pool at 1000 bytes but only 400 bytes of which are used. - assert(mm.acquireStorageMemory(dummyBlock, 1000L, memoryMode)) - mm.releaseStorageMemory(600L, memoryMode) + assert(mm.acquireStorageMemory(dummyBlock, 1000L, evictedBlocks)) + mm.releaseStorageMemory(600L) // Before the fix for SPARK-15260, we would first shrink the storage pool by the amount of // unused storage memory (600 bytes), try to evict blocks, then enlarge the execution pool // by the same amount. If the eviction threw an exception, then we would shrink one pool @@ -274,8 +274,8 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes intercept[RuntimeException] { mm.acquireExecutionMemory(1000L, 0, memoryMode) } - val assertInvariants = PrivateMethod[Unit]('assertInvariants) - mm.invokePrivate[Unit](assertInvariants()) + val assertInvariant = PrivateMethod[Unit]('assertInvariant) + mm.invokePrivate[Unit](assertInvariant()) } } From df1b9976f8cefdbfa6e530da218c47a8d59cca97 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 11 May 2016 14:01:52 -0700 Subject: [PATCH 3/3] Atomically resize the memory pools --- .../org/apache/spark/memory/StorageMemoryPool.scala | 11 +++++------ .../apache/spark/memory/UnifiedMemoryManager.scala | 5 +++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala index 70af83b5ee092..89edaf58ebc29 100644 --- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala +++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala @@ -119,13 +119,13 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w } /** - * Try to shrink the size of this storage memory pool by `spaceToFree` bytes. Return the number - * of bytes removed from the pool's capacity. + * Free space to shrink the size of this storage memory pool by `spaceToFree` bytes. + * Note: this method doesn't actually reduce the pool size but relies on the caller to do so. + * + * @return number of bytes to be removed from the pool's capacity. */ - def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized { - // First, shrink the pool by reclaiming free memory: + def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized { val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree) - decrementPoolSize(spaceFreedByReleasingUnusedMemory) val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory if (remainingSpaceToFree > 0) { // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks: @@ -134,7 +134,6 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum // When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do // not need to decrement _memoryUsed here. However, we do need to decrement the pool size. - decrementPoolSize(spaceFreedByEviction) spaceFreedByReleasingUnusedMemory + spaceFreedByEviction } else { spaceFreedByReleasingUnusedMemory diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index 91a84ab743c92..802087c82b713 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -103,9 +103,10 @@ private[spark] class UnifiedMemoryManager private[memory] ( math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize) if (memoryReclaimableFromStorage > 0) { // Only reclaim as much space as is necessary and available: - val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace( + val spaceToReclaim = storageMemoryPool.freeSpaceToShrinkPool( math.min(extraMemoryNeeded, memoryReclaimableFromStorage)) - onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed) + storageMemoryPool.decrementPoolSize(spaceToReclaim) + onHeapExecutionMemoryPool.incrementPoolSize(spaceToReclaim) } } }