From 012cb4ba9ba1e32337f2f7bb612abb53fe4070be Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 6 Dec 2015 12:57:45 -0800 Subject: [PATCH 01/17] Reset ensureFreeSpaceCalled before each test. This reduces coupling between failed tests. --- .../org/apache/spark/memory/MemoryManagerSuite.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala index f55d435fa33a6..e0c4f5a666a6a 100644 --- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala @@ -27,6 +27,7 @@ import org.mockito.Matchers.{any, anyLong} import org.mockito.Mockito.{mock, when} import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer +import org.scalatest.BeforeAndAfterEach import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkFunSuite @@ -36,7 +37,7 @@ import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, StorageLevel /** * Helper trait for sharing code among [[MemoryManager]] tests. */ -private[memory] trait MemoryManagerSuite extends SparkFunSuite { +private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAfterEach { import MemoryManagerSuite.DEFAULT_ENSURE_FREE_SPACE_CALLED @@ -52,6 +53,11 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite { */ private val ensureFreeSpaceCalled = new AtomicLong(DEFAULT_ENSURE_FREE_SPACE_CALLED) + override def beforeEach(): Unit = { + super.beforeEach() + ensureFreeSpaceCalled.set(DEFAULT_ENSURE_FREE_SPACE_CALLED) + } + /** * Make a mocked [[MemoryStore]] whose [[MemoryStore.ensureFreeSpace]] method is stubbed. * From b519fe628a9a2b8238dfedbfd9b74bdd2ddc0de4 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 6 Dec 2015 16:00:23 -0800 Subject: [PATCH 02/17] Add regression test for storage eviction bug. --- .../spark/memory/MemoryManagerSuite.scala | 75 ++++++++++--------- .../memory/StaticMemoryManagerSuite.scala | 5 +- .../memory/UnifiedMemoryManagerSuite.scala | 30 +++++++- 3 files changed, 67 insertions(+), 43 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala index e0c4f5a666a6a..35fcc56807374 100644 --- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala @@ -39,6 +39,8 @@ import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, StorageLevel */ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAfterEach { + protected val evictedBlocks = new mutable.ArrayBuffer[(BlockId, BlockStatus)] + import MemoryManagerSuite.DEFAULT_ENSURE_FREE_SPACE_CALLED // Note: Mockito's verify mechanism does not provide a way to reset method call counts @@ -55,6 +57,7 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft override def beforeEach(): Unit = { super.beforeEach() + evictedBlocks.clear() ensureFreeSpaceCalled.set(DEFAULT_ENSURE_FREE_SPACE_CALLED) } @@ -74,7 +77,18 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft } /** - * Make an [[Answer]] that stubs [[MemoryStore.ensureFreeSpace]] with the right arguments. + * Make an [[Answer]] that stubs [[MemoryStore.ensureFreeSpace]] and simulates the part of that + * method which releases storage memory. + * + * This is a significant simplification of the real method, which actually drops existing + * blocks based on the size of each block. Instead, here we simply release as many bytes + * as needed to ensure the requested amount of free space. This allows us to set up the + * test without relying on the [[org.apache.spark.storage.BlockManager]], which brings in + * many other dependencies. + * + * Every call to this method will set a global variable, [[ensureFreeSpaceCalled]], that + * records the number of bytes this is called with. This variable is expected to be cleared + * by the test code later through [[assertEnsureFreeSpaceCalled]]. */ private def ensureFreeSpaceAnswer(mm: MemoryManager, numBytesPos: Int): Answer[Boolean] = { new Answer[Boolean] { @@ -85,44 +99,35 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft require(args(numBytesPos).isInstanceOf[Long], s"bad test: expected ensureFreeSpace " + s"argument at index $numBytesPos to be a Long: ${args.mkString(", ")}") val numBytes = args(numBytesPos).asInstanceOf[Long] - val success = mockEnsureFreeSpace(mm, numBytes) - if (success) { + require(ensureFreeSpaceCalled.get() === DEFAULT_ENSURE_FREE_SPACE_CALLED, + "bad test: ensure free space variable was not reset") + // Record the number of bytes we freed this call + ensureFreeSpaceCalled.set(numBytes) + + def freeMemory = mm.maxStorageMemory - mm.storageMemoryUsed + + // Fail fast if the block simply won't fit + if (numBytes > mm.maxStorageMemory) { + return false + } + + // No need to evict anything if there is already enough free space + if (freeMemory >= numBytes) { + return true + } + + val spaceToRelease = numBytes - freeMemory + if (spaceToRelease <= mm.storageMemoryUsed) { + // We can evict enough blocks to fulfill the request for space + mm.releaseStorageMemory(spaceToRelease) args.last.asInstanceOf[mutable.Buffer[(BlockId, BlockStatus)]].append( (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytes, 0L, 0L))) + true + } else { + // No blocks were evicted because eviction would not free enough space. + false } - success - } - } - } - - /** - * Simulate the part of [[MemoryStore.ensureFreeSpace]] that releases storage memory. - * - * This is a significant simplification of the real method, which actually drops existing - * blocks based on the size of each block. Instead, here we simply release as many bytes - * as needed to ensure the requested amount of free space. This allows us to set up the - * test without relying on the [[org.apache.spark.storage.BlockManager]], which brings in - * many other dependencies. - * - * Every call to this method will set a global variable, [[ensureFreeSpaceCalled]], that - * records the number of bytes this is called with. This variable is expected to be cleared - * by the test code later through [[assertEnsureFreeSpaceCalled]]. - */ - private def mockEnsureFreeSpace(mm: MemoryManager, numBytes: Long): Boolean = mm.synchronized { - require(ensureFreeSpaceCalled.get() === DEFAULT_ENSURE_FREE_SPACE_CALLED, - "bad test: ensure free space variable was not reset") - // Record the number of bytes we freed this call - ensureFreeSpaceCalled.set(numBytes) - if (numBytes <= mm.maxStorageMemory) { - def freeMemory = mm.maxStorageMemory - mm.storageMemoryUsed - val spaceToRelease = numBytes - freeMemory - if (spaceToRelease > 0) { - mm.releaseStorageMemory(spaceToRelease) } - freeMemory >= numBytes - } else { - // We attempted to free more bytes than our max allowable memory - false } } diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala index 54cb28c389c2f..eae5408b71025 100644 --- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala @@ -17,16 +17,13 @@ package org.apache.spark.memory -import scala.collection.mutable.ArrayBuffer - import org.mockito.Mockito.when import org.apache.spark.SparkConf -import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, TestBlockId} +import org.apache.spark.storage.{MemoryStore, TestBlockId} class StaticMemoryManagerSuite extends MemoryManagerSuite { private val conf = new SparkConf().set("spark.storage.unrollFraction", "0.4") - private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] /** * Make a [[StaticMemoryManager]] and a [[MemoryStore]] with limited class dependencies. diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index e97c898a44783..012af21a9baf4 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -17,16 +17,13 @@ package org.apache.spark.memory -import scala.collection.mutable.ArrayBuffer - import org.scalatest.PrivateMethodTester import org.apache.spark.SparkConf -import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, TestBlockId} +import org.apache.spark.storage.{MemoryStore, TestBlockId} class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTester { private val dummyBlock = TestBlockId("--") - private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] private val storageFraction: Double = 0.5 @@ -84,14 +81,18 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks)) assertEnsureFreeSpaceCalled(ms, 100L) assert(mm.storageMemoryUsed === 110L) + assert(evictedBlocks.isEmpty) // Acquire more than the max, not granted assert(!mm.acquireStorageMemory(dummyBlock, maxMemory + 1L, evictedBlocks)) assertEnsureFreeSpaceCalled(ms, maxMemory + 1L) assert(mm.storageMemoryUsed === 110L) + assert(evictedBlocks.isEmpty) // Acquire up to the max, requests after this are still granted due to LRU eviction assert(mm.acquireStorageMemory(dummyBlock, maxMemory, evictedBlocks)) assertEnsureFreeSpaceCalled(ms, 1000L) assert(mm.storageMemoryUsed === 1000L) + assert(evictedBlocks.nonEmpty) + evictedBlocks.clear() assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) assertEnsureFreeSpaceCalled(ms, 1L) assert(mm.storageMemoryUsed === 1000L) @@ -120,17 +121,21 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes assertEnsureFreeSpaceCalled(ms, 750L) assert(mm.executionMemoryUsed === 0L) assert(mm.storageMemoryUsed === 750L) + assert(evictedBlocks.isEmpty) // Execution needs to request 250 bytes to evict storage memory assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L) assert(mm.executionMemoryUsed === 100L) assert(mm.storageMemoryUsed === 750L) assertEnsureFreeSpaceNotCalled(ms) + assert(evictedBlocks.isEmpty) // Execution wants 200 bytes but only 150 are free, so storage is evicted assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) === 200L) assert(mm.executionMemoryUsed === 300L) assertEnsureFreeSpaceCalled(ms, 50L) assert(mm.executionMemoryUsed === 300L) + assert(evictedBlocks.nonEmpty) mm.releaseAllStorageMemory() + evictedBlocks.clear() require(mm.executionMemoryUsed === 300L) require(mm.storageMemoryUsed === 0, "bad test: all storage memory should have been released") // Acquire some storage memory again, but this time keep it within the storage region @@ -144,6 +149,23 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes assert(mm.executionMemoryUsed === 600L) assert(mm.storageMemoryUsed === 400L) assertEnsureFreeSpaceNotCalled(ms) + assert(evictedBlocks.isEmpty) + } + + test("execution can evict storage blocks when storage memory is below max mem (SPARK-12155)") { + val maxMemory = 1000L + val taskAttemptId = 0L + val (mm, ms) = makeThings(maxMemory) + // Acquire enough storage memory to exceed the storage region size + assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks)) + assertEnsureFreeSpaceCalled(ms, 750L) + assert(mm.executionMemoryUsed === 0L) + assert(mm.storageMemoryUsed === 750L) + // Should now be able to require up to 500 bytes of memory + assert(mm.acquireExecutionMemory(500L, taskAttemptId, MemoryMode.ON_HEAP) === 500L) + assert(mm.storageMemoryUsed === 500L) + assert(mm.executionMemoryUsed === 500L) + assert(evictedBlocks.nonEmpty) } test("storage does not evict execution") { From 7c68ca09cb1b12f157400866983f753ac863380e Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 6 Dec 2015 14:40:07 -0800 Subject: [PATCH 03/17] Add MemoryStore.freeSpaceForExecution() method, which forces blocks to be dropped. Previously, ensureFreeSpace() might end up not dropping blocks if the total storage memory pool usage was less than the maximum possible storage pool usage. --- .../spark/memory/StorageMemoryPool.scala | 6 +- .../apache/spark/storage/MemoryStore.scala | 56 +++++--- .../spark/memory/MemoryManagerSuite.scala | 126 ++++++++++++------ .../memory/UnifiedMemoryManagerSuite.scala | 3 +- 4 files changed, 134 insertions(+), 57 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala index 6a322eabf81ed..c4180e397d221 100644 --- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala +++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala @@ -128,9 +128,11 @@ class StorageMemoryPool(lock: Object) extends MemoryPool(lock) with Logging { } else { // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks: val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - memoryStore.ensureFreeSpace(spaceToFree - spaceFreedByReleasingUnusedMemory, evictedBlocks) + memoryStore.freeSpaceForExecution( + spaceToFree - spaceFreedByReleasingUnusedMemory, evictedBlocks) val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum - _memoryUsed -= spaceFreedByEviction + // When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do + // not need to decrement _memoryUsed here. However, we do need to decrement the pool size. decrementPoolSize(spaceFreedByEviction) spaceFreedByReleasingUnusedMemory + spaceFreedByEviction } diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 4dbac388e098b..f6e6720f0efa0 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -406,16 +406,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo } /** - * Try to free up a given amount of space by evicting existing blocks. + * Try to free up the given amount of storage memory for use by execution by evicting blocks. * * @param space the amount of memory to free, in bytes * @param droppedBlocks a holder for blocks evicted in the process * @return whether the requested free space is freed. */ - private[spark] def ensureFreeSpace( + private[spark] def freeSpaceForExecution( space: Long, droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { - ensureFreeSpace(None, space, droppedBlocks) + evictBlocksToFreeSpace(None, space, droppedBlocks) } /** @@ -449,42 +449,66 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { memoryManager.synchronized { val freeMemory = maxMemory - memoryUsed - val rddToAdd = blockId.flatMap(getRddId) - val selectedBlocks = new ArrayBuffer[BlockId] - var selectedMemory = 0L logInfo(s"Ensuring $space bytes of free space " + blockId.map { id => s"for block $id" }.getOrElse("") + s"(free: $freeMemory, max: $maxMemory)") - // Fail fast if the block simply won't fit if (space > maxMemory) { + // Fail fast if the block simply won't fit logInfo("Will not " + blockId.map { id => s"store $id" }.getOrElse("free memory") + s" as the required space ($space bytes) exceeds our memory limit ($maxMemory bytes)") - return false - } - - // No need to evict anything if there is already enough free space - if (freeMemory >= space) { - return true + false + } else if (freeMemory >= space) { + // No need to evict anything if there is already enough free space + true + } else { + // Evict blocks as necessary + evictBlocksToFreeSpace(blockId, freeMemory - space, droppedBlocks) } + } + } + /** + * Try to evict blocks to free up a given amount of space to store a particular block. + * Can fail if either the block is bigger than our memory or it would require replacing + * another block from the same RDD (which leads to a wasteful cyclic replacement pattern for + * RDDs that don't fit into memory that we want to avoid). + * + * Compared to [[ensureFreeSpace()]], this method will drop blocks without first checking whether + * there is free storage memory which could be used to store a new block; as a result, this + * method should be used when evicting stroage blocks in order to reclaim memory for use by + * execution. + * + * @param blockId the ID of the block we are freeing space for, if any + * @param space the size of this block + * @param droppedBlocks a holder for blocks evicted in the process + * @return whether the requested free space is freed. + */ + private def evictBlocksToFreeSpace( + blockId: Option[BlockId], + space: Long, + droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { + memoryManager.synchronized { + var freedMemory = 0L + val rddToAdd = blockId.flatMap(getRddId) + val selectedBlocks = new ArrayBuffer[BlockId] // This is synchronized to ensure that the set of entries is not changed // (because of getValue or getBytes) while traversing the iterator, as that // can lead to exceptions. entries.synchronized { val iterator = entries.entrySet().iterator() - while (freeMemory + selectedMemory < space && iterator.hasNext) { + while (freedMemory < space && iterator.hasNext) { val pair = iterator.next() val blockId = pair.getKey if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) { selectedBlocks += blockId - selectedMemory += pair.getValue.size + freedMemory += pair.getValue.size } } } - if (freeMemory + selectedMemory >= space) { + if (freedMemory >= space) { logInfo(s"${selectedBlocks.size} blocks selected for dropping") for (blockId <- selectedBlocks) { val entry = entries.synchronized { entries.get(blockId) } diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala index 35fcc56807374..9c17387d4522c 100644 --- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala @@ -24,7 +24,7 @@ import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionContext, Future} import org.mockito.Matchers.{any, anyLong} -import org.mockito.Mockito.{mock, when} +import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS} import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer import org.scalatest.BeforeAndAfterEach @@ -41,67 +41,67 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft protected val evictedBlocks = new mutable.ArrayBuffer[(BlockId, BlockStatus)] + import MemoryManagerSuite.DEFAULT_FREE_SPACE_FOR_EXECUTION_CALLED import MemoryManagerSuite.DEFAULT_ENSURE_FREE_SPACE_CALLED // Note: Mockito's verify mechanism does not provide a way to reset method call counts // without also resetting stubbed methods. Since our test code relies on the latter, - // we need to use our own variable to track invocations of `ensureFreeSpace`. + // we need to use our own variable to track invocations of `freeSpaceForUseByExecution` and + // `ensureFreeSpace`. /** - * The amount of free space requested in the last call to [[MemoryStore.ensureFreeSpace]] + * The amount of free space requested in the last call to [[MemoryStore.freeSpaceForExecution]]. * - * This set whenever [[MemoryStore.ensureFreeSpace]] is called, and cleared when the test - * code makes explicit assertions on this variable through [[assertEnsureFreeSpaceCalled]]. + * This set whenever [[MemoryStore.freeSpaceForExecution]] is called, and cleared when the test + * code makes explicit assertions on this variable through + * [[assertFreeSpaceForExecutionCalled]]. */ - private val ensureFreeSpaceCalled = new AtomicLong(DEFAULT_ENSURE_FREE_SPACE_CALLED) + private val freeSpaceForExecutionCalled = new AtomicLong(0) + + private val ensureFreeSpaceCalled = new AtomicLong(0) override def beforeEach(): Unit = { super.beforeEach() evictedBlocks.clear() + freeSpaceForExecutionCalled.set(DEFAULT_FREE_SPACE_FOR_EXECUTION_CALLED) ensureFreeSpaceCalled.set(DEFAULT_ENSURE_FREE_SPACE_CALLED) } /** - * Make a mocked [[MemoryStore]] whose [[MemoryStore.ensureFreeSpace]] method is stubbed. + * Make a mocked [[MemoryStore]] whose [[MemoryStore.ensureFreeSpace]] and + * [[MemoryStore.freeSpaceForExecution]] methods are stubbed. * - * This allows our test code to release storage memory when [[MemoryStore.ensureFreeSpace]] - * is called without relying on [[org.apache.spark.storage.BlockManager]] and all of its - * dependencies. + * This allows our test code to release storage memory when these methods are called + * without relying on [[org.apache.spark.storage.BlockManager]] and all of its dependencies. */ protected def makeMemoryStore(mm: MemoryManager): MemoryStore = { - val ms = mock(classOf[MemoryStore]) - when(ms.ensureFreeSpace(anyLong(), any())).thenAnswer(ensureFreeSpaceAnswer(mm, 0)) - when(ms.ensureFreeSpace(any(), anyLong(), any())).thenAnswer(ensureFreeSpaceAnswer(mm, 1)) + val ms = mock(classOf[MemoryStore], RETURNS_SMART_NULLS) + when(ms.freeSpaceForExecution(anyLong(), any())).thenAnswer(freeSpaceForExecutionAnswer(mm)) + when(ms.ensureFreeSpace(any(), anyLong(), any())).thenAnswer(ensureFreeSpaceAnswer(mm)) mm.setMemoryStore(ms) ms } /** - * Make an [[Answer]] that stubs [[MemoryStore.ensureFreeSpace]] and simulates the part of that - * method which releases storage memory. - * - * This is a significant simplification of the real method, which actually drops existing - * blocks based on the size of each block. Instead, here we simply release as many bytes - * as needed to ensure the requested amount of free space. This allows us to set up the - * test without relying on the [[org.apache.spark.storage.BlockManager]], which brings in - * many other dependencies. - * - * Every call to this method will set a global variable, [[ensureFreeSpaceCalled]], that - * records the number of bytes this is called with. This variable is expected to be cleared - * by the test code later through [[assertEnsureFreeSpaceCalled]]. - */ - private def ensureFreeSpaceAnswer(mm: MemoryManager, numBytesPos: Int): Answer[Boolean] = { + * Simulate the part of [[MemoryStore.ensureFreeSpace]] that releases storage memory. + * + * This is a significant simplification of the real method, which actually drops existing + * blocks based on the size of each block. Instead, here we simply release as many bytes + * as needed to ensure the requested amount of free space. This allows us to set up the + * test without relying on the [[org.apache.spark.storage.BlockManager]], which brings in + * many other dependencies. + * + * Every call to this method will set a global variable, [[ensureFreeSpaceCalled]], that + * records the number of bytes this is called with. This variable is expected to be cleared + * by the test code later through [[assertEnsureFreeSpaceCalled]]. + */ + private def ensureFreeSpaceAnswer(mm: MemoryManager): Answer[Boolean] = { new Answer[Boolean] { override def answer(invocation: InvocationOnMock): Boolean = { val args = invocation.getArguments - require(args.size > numBytesPos, s"bad test: expected >$numBytesPos arguments " + - s"in ensureFreeSpace, found ${args.size}") - require(args(numBytesPos).isInstanceOf[Long], s"bad test: expected ensureFreeSpace " + - s"argument at index $numBytesPos to be a Long: ${args.mkString(", ")}") - val numBytes = args(numBytesPos).asInstanceOf[Long] + val numBytes = args(1).asInstanceOf[Long] require(ensureFreeSpaceCalled.get() === DEFAULT_ENSURE_FREE_SPACE_CALLED, - "bad test: ensure free space variable was not reset") - // Record the number of bytes we freed this call + "bad test: ensureFreeSpace() variable was not reset") ensureFreeSpaceCalled.set(numBytes) def freeMemory = mm.maxStorageMemory - mm.storageMemoryUsed @@ -132,20 +132,69 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft } /** + * Simulate the part of [[MemoryStore.freeSpaceForExecution]] that releases storage memory. + * + * This is a significant simplification of the real method, which actually drops existing + * blocks based on the size of each block. Instead, here we simply release as many bytes + * as needed to ensure the requested amount of free space. This allows us to set up the + * test without relying on the [[org.apache.spark.storage.BlockManager]], which brings in + * many other dependencies. + * + * Every call to this method will set a global variable, [[freeSpaceForExecutionCalled]], that + * records the number of bytes this is called with. This variable is expected to be cleared + * by the test code later through [[assertFreeSpaceForExecutionCalled]]. + */ + private def freeSpaceForExecutionAnswer(mm: MemoryManager): Answer[Boolean] = { + new Answer[Boolean] { + override def answer(invocation: InvocationOnMock): Boolean = { + val args = invocation.getArguments + val numBytes = args(0).asInstanceOf[Long] + require(freeSpaceForExecutionCalled.get() === DEFAULT_FREE_SPACE_FOR_EXECUTION_CALLED, + "bad test: freeSpaceForExecution() variable was not reset") + freeSpaceForExecutionCalled.set(numBytes) + assert(numBytes <= mm.storageMemoryUsed) + mm.releaseStorageMemory(numBytes) + args.last.asInstanceOf[mutable.Buffer[(BlockId, BlockStatus)]].append( + (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytes, 0L, 0L))) + evictedBlocks.append( + (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytes, 0L, 0L))) + true + } + } + } + + /** + * Assert that [[MemoryStore.freeSpaceForExecution]] is called with the given parameters. + */ + protected def assertFreeSpaceForExecutionCalled(ms: MemoryStore, numBytes: Long): Unit = { + assert(freeSpaceForExecutionCalled.get() === numBytes, + s"expected freeSpaceForExecution() to be called with $numBytes") + freeSpaceForExecutionCalled.set(DEFAULT_FREE_SPACE_FOR_EXECUTION_CALLED) + } + + /** + * Assert that [[MemoryStore.freeSpaceForExecution]] is NOT called. + */ + protected def assertFreeSpaceForExecutionNotCalled[T](ms: MemoryStore): Unit = { + assert(freeSpaceForExecutionCalled.get() === DEFAULT_FREE_SPACE_FOR_EXECUTION_CALLED, + "freeSpaceForExecution() should not have been called!") + } + + /** * Assert that [[MemoryStore.ensureFreeSpace]] is called with the given parameters. */ protected def assertEnsureFreeSpaceCalled(ms: MemoryStore, numBytes: Long): Unit = { assert(ensureFreeSpaceCalled.get() === numBytes, - s"expected ensure free space to be called with $numBytes") - ensureFreeSpaceCalled.set(DEFAULT_ENSURE_FREE_SPACE_CALLED) + s"expected ensureFreeSpace() to be called with $numBytes") + ensureFreeSpaceCalled.set(DEFAULT_FREE_SPACE_FOR_EXECUTION_CALLED) } /** * Assert that [[MemoryStore.ensureFreeSpace]] is NOT called. */ protected def assertEnsureFreeSpaceNotCalled[T](ms: MemoryStore): Unit = { - assert(ensureFreeSpaceCalled.get() === DEFAULT_ENSURE_FREE_SPACE_CALLED, - "ensure free space should not have been called!") + assert(ensureFreeSpaceCalled.get() === DEFAULT_FREE_SPACE_FOR_EXECUTION_CALLED, + "ensureFreeSpace() should not have been called!") } /** @@ -302,5 +351,6 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft } private object MemoryManagerSuite { + private val DEFAULT_FREE_SPACE_FOR_EXECUTION_CALLED = -1L private val DEFAULT_ENSURE_FREE_SPACE_CALLED = -1L } diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index 012af21a9baf4..6f81450b42c41 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -131,7 +131,8 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes // Execution wants 200 bytes but only 150 are free, so storage is evicted assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) === 200L) assert(mm.executionMemoryUsed === 300L) - assertEnsureFreeSpaceCalled(ms, 50L) + assertEnsureFreeSpaceNotCalled(ms) + assertFreeSpaceForExecutionCalled(ms, 50L) assert(mm.executionMemoryUsed === 300L) assert(evictedBlocks.nonEmpty) mm.releaseAllStorageMemory() From 53841174760a24a0df3eb1562af1f33dbe340eb9 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 6 Dec 2015 17:47:29 -0800 Subject: [PATCH 04/17] Remove a layer of confusing inheritance. --- .../apache/spark/memory/MemoryManager.scala | 11 ++--------- .../spark/memory/StaticMemoryManager.scala | 18 ++++++++++++++++++ .../spark/memory/UnifiedMemoryManager.scala | 2 +- 3 files changed, 21 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala index ceb8ea434e1be..ae9e1ac0e246b 100644 --- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala @@ -77,9 +77,7 @@ private[spark] abstract class MemoryManager( def acquireStorageMemory( blockId: BlockId, numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized { - storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks) - } + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean /** * Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary. @@ -109,12 +107,7 @@ private[spark] abstract class MemoryManager( def acquireExecutionMemory( numBytes: Long, taskAttemptId: Long, - memoryMode: MemoryMode): Long = synchronized { - memoryMode match { - case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId) - case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId) - } - } + memoryMode: MemoryMode): Long /** * Release numBytes of execution memory belonging to the given task. diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala index 12a094306861f..00c84813aaa25 100644 --- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala @@ -53,6 +53,13 @@ private[spark] class StaticMemoryManager( (maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong } + override def acquireStorageMemory( + blockId: BlockId, + numBytes: Long, + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized { + storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks) + } + override def acquireUnrollMemory( blockId: BlockId, numBytes: Long, @@ -62,6 +69,17 @@ private[spark] class StaticMemoryManager( val numBytesToFree = math.min(numBytes, maxNumBytesToFree) storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree, evictedBlocks) } + + private[memory] + override def acquireExecutionMemory( + numBytes: Long, + taskAttemptId: Long, + memoryMode: MemoryMode): Long = synchronized { + memoryMode match { + case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId) + case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId) + } + } } diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index 48b4e23433e43..887a9b93f388a 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -100,7 +100,7 @@ private[spark] class UnifiedMemoryManager private[memory] ( case MemoryMode.OFF_HEAP => // For now, we only support on-heap caching of data, so we do not need to interact with // the storage pool when allocating off-heap memory. This will change in the future, though. - super.acquireExecutionMemory(numBytes, taskAttemptId, memoryMode) + offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId) } } From 13ba7ada77f87ef1ec362aec35c89a924e6987cb Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 6 Dec 2015 17:53:45 -0800 Subject: [PATCH 05/17] Put fail-fast for non-fitting blocks earlier in call chain. --- .../org/apache/spark/memory/StaticMemoryManager.scala | 9 ++++++++- .../org/apache/spark/memory/UnifiedMemoryManager.scala | 6 ++++++ .../scala/org/apache/spark/storage/MemoryStore.scala | 9 +++------ .../apache/spark/memory/StaticMemoryManagerSuite.scala | 2 +- .../apache/spark/memory/UnifiedMemoryManagerSuite.scala | 2 +- 5 files changed, 19 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala index 00c84813aaa25..0382d055b0c35 100644 --- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala @@ -57,7 +57,14 @@ private[spark] class StaticMemoryManager( blockId: BlockId, numBytes: Long, evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized { - storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks) + if (numBytes > storageMemoryPool.poolSize) { + // Fail fast if the block simply won't fit + logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " + + s"memory limit (${storageMemoryPool.poolSize} bytes)") + false + } else { + storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks) + } } override def acquireUnrollMemory( diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index 887a9b93f388a..d66464ab6fda9 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -110,6 +110,12 @@ private[spark] class UnifiedMemoryManager private[memory] ( evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized { assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory) assert(numBytes >= 0) + if (numBytes > maxMemory) { + // Fail fast if the block simply won't fit + logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " + + s"memory limit ($maxMemory bytes)") + return false + } if (numBytes > storageMemoryPool.memoryFree) { // There is not enough free memory in the storage pool, so try to borrow free memory from // the execution pool. diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index f6e6720f0efa0..c1db83f3af3d9 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -454,12 +454,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo blockId.map { id => s"for block $id" }.getOrElse("") + s"(free: $freeMemory, max: $maxMemory)") - if (space > maxMemory) { - // Fail fast if the block simply won't fit - logInfo("Will not " + blockId.map { id => s"store $id" }.getOrElse("free memory") + - s" as the required space ($space bytes) exceeds our memory limit ($maxMemory bytes)") - false - } else if (freeMemory >= space) { + assert(space <= maxMemory) + + if (freeMemory >= space) { // No need to evict anything if there is already enough free space true } else { diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala index eae5408b71025..5124bda63936c 100644 --- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala @@ -90,7 +90,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { assert(mm.storageMemoryUsed === 110L) // Acquire more than the max, not granted assert(!mm.acquireStorageMemory(dummyBlock, maxStorageMem + 1L, evictedBlocks)) - assertEnsureFreeSpaceCalled(ms, maxStorageMem + 1L) + assertEnsureFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 110L) // Acquire up to the max, requests after this are still granted due to LRU eviction assert(mm.acquireStorageMemory(dummyBlock, maxStorageMem, evictedBlocks)) diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index 6f81450b42c41..7267170d2dcef 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -84,7 +84,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes assert(evictedBlocks.isEmpty) // Acquire more than the max, not granted assert(!mm.acquireStorageMemory(dummyBlock, maxMemory + 1L, evictedBlocks)) - assertEnsureFreeSpaceCalled(ms, maxMemory + 1L) + assertEnsureFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 110L) assert(evictedBlocks.isEmpty) // Acquire up to the max, requests after this are still granted due to LRU eviction From eec4f6c87423d5e482b710e098486b3bbc4daf06 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 6 Dec 2015 17:56:48 -0800 Subject: [PATCH 06/17] Collapse ensureFreeSpace overloads --- .../apache/spark/storage/MemoryStore.scala | 23 ++----------------- 1 file changed, 2 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index c1db83f3af3d9..d872e89a58fa8 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -429,29 +429,10 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo blockId: BlockId, space: Long, droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { - ensureFreeSpace(Some(blockId), space, droppedBlocks) - } - - /** - * Try to free up a given amount of space to store a particular block, but can fail if - * either the block is bigger than our memory or it would require replacing another block - * from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that - * don't fit into memory that we want to avoid). - * - * @param blockId the ID of the block we are freeing space for, if any - * @param space the size of this block - * @param droppedBlocks a holder for blocks evicted in the process - * @return whether the requested free space is freed. - */ - private def ensureFreeSpace( - blockId: Option[BlockId], - space: Long, - droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { memoryManager.synchronized { val freeMemory = maxMemory - memoryUsed - logInfo(s"Ensuring $space bytes of free space " + - blockId.map { id => s"for block $id" }.getOrElse("") + + logInfo(s"Ensuring $space bytes of free space for block $blockId" + s"(free: $freeMemory, max: $maxMemory)") assert(space <= maxMemory) @@ -461,7 +442,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo true } else { // Evict blocks as necessary - evictBlocksToFreeSpace(blockId, freeMemory - space, droppedBlocks) + evictBlocksToFreeSpace(Some(blockId), freeMemory - space, droppedBlocks) } } } From 2dc842aea82c8895125d46a00aa43dfb0d121de9 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 6 Dec 2015 18:56:11 -0800 Subject: [PATCH 07/17] Replace ensureFreeSpace() with evictBlocksToFreeSpace(). --- .../spark/memory/StorageMemoryPool.scala | 30 ++-- .../apache/spark/storage/MemoryStore.scala | 50 +------ .../spark/memory/MemoryManagerSuite.scala | 135 +++++------------- .../memory/StaticMemoryManagerSuite.scala | 24 ++-- .../memory/UnifiedMemoryManagerSuite.scala | 44 +++--- 5 files changed, 88 insertions(+), 195 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala index c4180e397d221..97959b2f3dd0c 100644 --- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala +++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala @@ -73,27 +73,31 @@ class StorageMemoryPool(lock: Object) extends MemoryPool(lock) with Logging { * * @param blockId the ID of the block we are acquiring storage memory for * @param numBytesToAcquire the size of this block - * @param numBytesToFree the size of space to be freed through evicting blocks + * @param maxNumBytesToFree the maximum amount of space to be freed through evicting blocks * @return whether all N bytes were successfully granted. */ def acquireMemory( blockId: BlockId, numBytesToAcquire: Long, - numBytesToFree: Long, + maxNumBytesToFree: Long, evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = lock.synchronized { assert(numBytesToAcquire >= 0) - assert(numBytesToFree >= 0) + assert(maxNumBytesToFree >= 0) assert(memoryUsed <= poolSize) - memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks) - // Register evicted blocks, if any, with the active task metrics - Option(TaskContext.get()).foreach { tc => - val metrics = tc.taskMetrics() - val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]()) - metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq) + if (numBytesToAcquire > memoryFree && maxNumBytesToFree > 0) { + val additionalMemoryRequired = numBytesToAcquire - memoryFree + memoryStore.evictBlocksToFreeSpace( + Some(blockId), Math.min(maxNumBytesToFree, additionalMemoryRequired), evictedBlocks) + // Register evicted blocks, if any, with the active task metrics + Option(TaskContext.get()).foreach { tc => + val metrics = tc.taskMetrics() + val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]()) + metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq) + } } // NOTE: If the memory store evicts blocks, then those evictions will synchronously call - // back into this StorageMemoryPool in order to free. Therefore, these variables should have - // been updated. + // back into this StorageMemoryPool in order to free memory. Therefore, these variables + // should have been updated. val enoughMemory = numBytesToAcquire <= memoryFree if (enoughMemory) { _memoryUsed += numBytesToAcquire @@ -128,8 +132,8 @@ class StorageMemoryPool(lock: Object) extends MemoryPool(lock) with Logging { } else { // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks: val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - memoryStore.freeSpaceForExecution( - spaceToFree - spaceFreedByReleasingUnusedMemory, evictedBlocks) + memoryStore.evictBlocksToFreeSpace( + None, spaceToFree - spaceFreedByReleasingUnusedMemory, evictedBlocks) val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum // When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do // not need to decrement _memoryUsed here. However, we do need to decrement the pool size. diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index d872e89a58fa8..bdab8c2332fae 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -405,68 +405,22 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo } } - /** - * Try to free up the given amount of storage memory for use by execution by evicting blocks. - * - * @param space the amount of memory to free, in bytes - * @param droppedBlocks a holder for blocks evicted in the process - * @return whether the requested free space is freed. - */ - private[spark] def freeSpaceForExecution( - space: Long, - droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { - evictBlocksToFreeSpace(None, space, droppedBlocks) - } - - /** - * Try to free up a given amount of space to store a block by evicting existing ones. - * - * @param space the amount of memory to free, in bytes - * @param droppedBlocks a holder for blocks evicted in the process - * @return whether the requested free space is freed. - */ - private[spark] def ensureFreeSpace( - blockId: BlockId, - space: Long, - droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { - memoryManager.synchronized { - val freeMemory = maxMemory - memoryUsed - - logInfo(s"Ensuring $space bytes of free space for block $blockId" + - s"(free: $freeMemory, max: $maxMemory)") - - assert(space <= maxMemory) - - if (freeMemory >= space) { - // No need to evict anything if there is already enough free space - true - } else { - // Evict blocks as necessary - evictBlocksToFreeSpace(Some(blockId), freeMemory - space, droppedBlocks) - } - } - } - /** * Try to evict blocks to free up a given amount of space to store a particular block. * Can fail if either the block is bigger than our memory or it would require replacing * another block from the same RDD (which leads to a wasteful cyclic replacement pattern for * RDDs that don't fit into memory that we want to avoid). * - * Compared to [[ensureFreeSpace()]], this method will drop blocks without first checking whether - * there is free storage memory which could be used to store a new block; as a result, this - * method should be used when evicting stroage blocks in order to reclaim memory for use by - * execution. - * * @param blockId the ID of the block we are freeing space for, if any * @param space the size of this block * @param droppedBlocks a holder for blocks evicted in the process * @return whether the requested free space is freed. */ - private def evictBlocksToFreeSpace( + private[spark] def evictBlocksToFreeSpace( blockId: Option[BlockId], space: Long, droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { + assert(space > 0) memoryManager.synchronized { var freedMemory = 0L val rddToAdd = blockId.flatMap(getRddId) diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala index 9c17387d4522c..d9e1e825b1d41 100644 --- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala @@ -41,49 +41,43 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft protected val evictedBlocks = new mutable.ArrayBuffer[(BlockId, BlockStatus)] - import MemoryManagerSuite.DEFAULT_FREE_SPACE_FOR_EXECUTION_CALLED - import MemoryManagerSuite.DEFAULT_ENSURE_FREE_SPACE_CALLED + import MemoryManagerSuite.DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED // Note: Mockito's verify mechanism does not provide a way to reset method call counts // without also resetting stubbed methods. Since our test code relies on the latter, - // we need to use our own variable to track invocations of `freeSpaceForUseByExecution` and - // `ensureFreeSpace`. + // we need to use our own variable to track invocations of `evictBlocksToFreeSpace`. /** - * The amount of free space requested in the last call to [[MemoryStore.freeSpaceForExecution]]. + * The amount of space requested in the last call to [[MemoryStore.evictBlocksToFreeSpace]]. * - * This set whenever [[MemoryStore.freeSpaceForExecution]] is called, and cleared when the test + * This set whenever [[MemoryStore.evictBlocksToFreeSpace]] is called, and cleared when the test * code makes explicit assertions on this variable through - * [[assertFreeSpaceForExecutionCalled]]. + * [[assertEvictBlocksToFreeSpaceCalled]]. */ - private val freeSpaceForExecutionCalled = new AtomicLong(0) - - private val ensureFreeSpaceCalled = new AtomicLong(0) + private val evictBlocksToFreeSpaceCalled = new AtomicLong(0) override def beforeEach(): Unit = { super.beforeEach() evictedBlocks.clear() - freeSpaceForExecutionCalled.set(DEFAULT_FREE_SPACE_FOR_EXECUTION_CALLED) - ensureFreeSpaceCalled.set(DEFAULT_ENSURE_FREE_SPACE_CALLED) + evictBlocksToFreeSpaceCalled.set(DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED) } /** - * Make a mocked [[MemoryStore]] whose [[MemoryStore.ensureFreeSpace]] and - * [[MemoryStore.freeSpaceForExecution]] methods are stubbed. + * Make a mocked [[MemoryStore]] whose [[MemoryStore.evictBlocksToFreeSpace]] method is stubbed. * * This allows our test code to release storage memory when these methods are called * without relying on [[org.apache.spark.storage.BlockManager]] and all of its dependencies. */ protected def makeMemoryStore(mm: MemoryManager): MemoryStore = { val ms = mock(classOf[MemoryStore], RETURNS_SMART_NULLS) - when(ms.freeSpaceForExecution(anyLong(), any())).thenAnswer(freeSpaceForExecutionAnswer(mm)) - when(ms.ensureFreeSpace(any(), anyLong(), any())).thenAnswer(ensureFreeSpaceAnswer(mm)) + when(ms.evictBlocksToFreeSpace(any(), anyLong(), any())) + .thenAnswer(evictBlocksToFreeSpaceAnswer(mm)) mm.setMemoryStore(ms) ms } /** - * Simulate the part of [[MemoryStore.ensureFreeSpace]] that releases storage memory. + * Simulate the part of [[MemoryStore.evictBlocksToFreeSpace]] that releases storage memory. * * This is a significant simplification of the real method, which actually drops existing * blocks based on the size of each block. Instead, here we simply release as many bytes @@ -91,37 +85,26 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft * test without relying on the [[org.apache.spark.storage.BlockManager]], which brings in * many other dependencies. * - * Every call to this method will set a global variable, [[ensureFreeSpaceCalled]], that + * Every call to this method will set a global variable, [[evictBlocksToFreeSpaceCalled]], that * records the number of bytes this is called with. This variable is expected to be cleared - * by the test code later through [[assertEnsureFreeSpaceCalled]]. + * by the test code later through [[assertEvictBlocksToFreeSpaceCalled]]. */ - private def ensureFreeSpaceAnswer(mm: MemoryManager): Answer[Boolean] = { + private def evictBlocksToFreeSpaceAnswer(mm: MemoryManager): Answer[Boolean] = { new Answer[Boolean] { override def answer(invocation: InvocationOnMock): Boolean = { val args = invocation.getArguments - val numBytes = args(1).asInstanceOf[Long] - require(ensureFreeSpaceCalled.get() === DEFAULT_ENSURE_FREE_SPACE_CALLED, - "bad test: ensureFreeSpace() variable was not reset") - ensureFreeSpaceCalled.set(numBytes) - - def freeMemory = mm.maxStorageMemory - mm.storageMemoryUsed - - // Fail fast if the block simply won't fit - if (numBytes > mm.maxStorageMemory) { - return false - } - - // No need to evict anything if there is already enough free space - if (freeMemory >= numBytes) { - return true - } - - val spaceToRelease = numBytes - freeMemory - if (spaceToRelease <= mm.storageMemoryUsed) { + val numBytesToFree = args(1).asInstanceOf[Long] + assert(numBytesToFree > 0) + require(evictBlocksToFreeSpaceCalled.get() === DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED, + "bad test: evictBlocksToFreeSpace() variable was not reset") + evictBlocksToFreeSpaceCalled.set(numBytesToFree) + if (numBytesToFree <= mm.storageMemoryUsed) { // We can evict enough blocks to fulfill the request for space - mm.releaseStorageMemory(spaceToRelease) + mm.releaseStorageMemory(numBytesToFree) args.last.asInstanceOf[mutable.Buffer[(BlockId, BlockStatus)]].append( - (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytes, 0L, 0L))) + (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L, 0L))) + evictedBlocks.append( + (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L, 0L))) true } else { // No blocks were evicted because eviction would not free enough space. @@ -132,69 +115,20 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft } /** - * Simulate the part of [[MemoryStore.freeSpaceForExecution]] that releases storage memory. - * - * This is a significant simplification of the real method, which actually drops existing - * blocks based on the size of each block. Instead, here we simply release as many bytes - * as needed to ensure the requested amount of free space. This allows us to set up the - * test without relying on the [[org.apache.spark.storage.BlockManager]], which brings in - * many other dependencies. - * - * Every call to this method will set a global variable, [[freeSpaceForExecutionCalled]], that - * records the number of bytes this is called with. This variable is expected to be cleared - * by the test code later through [[assertFreeSpaceForExecutionCalled]]. - */ - private def freeSpaceForExecutionAnswer(mm: MemoryManager): Answer[Boolean] = { - new Answer[Boolean] { - override def answer(invocation: InvocationOnMock): Boolean = { - val args = invocation.getArguments - val numBytes = args(0).asInstanceOf[Long] - require(freeSpaceForExecutionCalled.get() === DEFAULT_FREE_SPACE_FOR_EXECUTION_CALLED, - "bad test: freeSpaceForExecution() variable was not reset") - freeSpaceForExecutionCalled.set(numBytes) - assert(numBytes <= mm.storageMemoryUsed) - mm.releaseStorageMemory(numBytes) - args.last.asInstanceOf[mutable.Buffer[(BlockId, BlockStatus)]].append( - (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytes, 0L, 0L))) - evictedBlocks.append( - (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytes, 0L, 0L))) - true - } - } - } - - /** - * Assert that [[MemoryStore.freeSpaceForExecution]] is called with the given parameters. - */ - protected def assertFreeSpaceForExecutionCalled(ms: MemoryStore, numBytes: Long): Unit = { - assert(freeSpaceForExecutionCalled.get() === numBytes, - s"expected freeSpaceForExecution() to be called with $numBytes") - freeSpaceForExecutionCalled.set(DEFAULT_FREE_SPACE_FOR_EXECUTION_CALLED) - } - - /** - * Assert that [[MemoryStore.freeSpaceForExecution]] is NOT called. - */ - protected def assertFreeSpaceForExecutionNotCalled[T](ms: MemoryStore): Unit = { - assert(freeSpaceForExecutionCalled.get() === DEFAULT_FREE_SPACE_FOR_EXECUTION_CALLED, - "freeSpaceForExecution() should not have been called!") - } - - /** - * Assert that [[MemoryStore.ensureFreeSpace]] is called with the given parameters. + * Assert that [[MemoryStore.evictBlocksToFreeSpace]] is called with the given parameters. */ - protected def assertEnsureFreeSpaceCalled(ms: MemoryStore, numBytes: Long): Unit = { - assert(ensureFreeSpaceCalled.get() === numBytes, - s"expected ensureFreeSpace() to be called with $numBytes") - ensureFreeSpaceCalled.set(DEFAULT_FREE_SPACE_FOR_EXECUTION_CALLED) + protected def assertEvictBlocksToFreeSpaceCalled(ms: MemoryStore, numBytes: Long): Unit = { + assert(evictBlocksToFreeSpaceCalled.get() === numBytes, + s"expected evictBlocksToFreeSpace() to be called with $numBytes") + evictBlocksToFreeSpaceCalled.set(DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED) } /** - * Assert that [[MemoryStore.ensureFreeSpace]] is NOT called. + * Assert that [[MemoryStore.evictBlocksToFreeSpace]] is NOT called. */ - protected def assertEnsureFreeSpaceNotCalled[T](ms: MemoryStore): Unit = { - assert(ensureFreeSpaceCalled.get() === DEFAULT_FREE_SPACE_FOR_EXECUTION_CALLED, - "ensureFreeSpace() should not have been called!") + protected def assertEvictBlocksToFreeSpaceNotCalled[T](ms: MemoryStore): Unit = { + assert(evictBlocksToFreeSpaceCalled.get() === DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED, + "evictBlocksToFreeSpace() should not have been called!") } /** @@ -351,6 +285,5 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft } private object MemoryManagerSuite { - private val DEFAULT_FREE_SPACE_FOR_EXECUTION_CALLED = -1L - private val DEFAULT_ENSURE_FREE_SPACE_CALLED = -1L + private val DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED = -1L } diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala index 5124bda63936c..a97e70d7aab19 100644 --- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala @@ -82,33 +82,33 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem) assert(mm.storageMemoryUsed === 0L) assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks)) - // `ensureFreeSpace` should be called with the number of bytes requested - assertEnsureFreeSpaceCalled(ms, 10L) + assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 10L) + assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks)) - assertEnsureFreeSpaceCalled(ms, 100L) + assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 110L) // Acquire more than the max, not granted assert(!mm.acquireStorageMemory(dummyBlock, maxStorageMem + 1L, evictedBlocks)) - assertEnsureFreeSpaceNotCalled(ms) + assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 110L) // Acquire up to the max, requests after this are still granted due to LRU eviction assert(mm.acquireStorageMemory(dummyBlock, maxStorageMem, evictedBlocks)) - assertEnsureFreeSpaceCalled(ms, 1000L) + assertEvictBlocksToFreeSpaceCalled(ms, 110L) assert(mm.storageMemoryUsed === 1000L) assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) - assertEnsureFreeSpaceCalled(ms, 1L) + assertEvictBlocksToFreeSpaceCalled(ms, 1L) assert(mm.storageMemoryUsed === 1000L) mm.releaseStorageMemory(800L) assert(mm.storageMemoryUsed === 200L) // Acquire after release assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) - assertEnsureFreeSpaceCalled(ms, 1L) + assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 201L) mm.releaseAllStorageMemory() assert(mm.storageMemoryUsed === 0L) assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) - assertEnsureFreeSpaceCalled(ms, 1L) + assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 1L) // Release beyond what was acquired mm.releaseStorageMemory(100L) @@ -130,7 +130,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { assert(mm.executionMemoryUsed === 200L) // Only storage memory should increase assert(mm.acquireStorageMemory(dummyBlock, 50L, evictedBlocks)) - assertEnsureFreeSpaceCalled(ms, 50L) + assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 50L) assert(mm.executionMemoryUsed === 200L) // Only execution memory should be released @@ -148,7 +148,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { val dummyBlock = TestBlockId("lonely water") val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem) assert(mm.acquireUnrollMemory(dummyBlock, 100L, evictedBlocks)) - assertEnsureFreeSpaceCalled(ms, 100L) + assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 100L) mm.releaseUnrollMemory(40L) assert(mm.storageMemoryUsed === 60L) @@ -156,13 +156,13 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { assert(mm.acquireUnrollMemory(dummyBlock, 500L, evictedBlocks)) // `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400 bytes. // Since we already occupy 60 bytes, we will try to ensure only 400 - 60 = 340 bytes. - assertEnsureFreeSpaceCalled(ms, 340L) + assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 560L) when(ms.currentUnrollMemory).thenReturn(560L) assert(!mm.acquireUnrollMemory(dummyBlock, 800L, evictedBlocks)) assert(mm.storageMemoryUsed === 560L) // We already have 560 bytes > the max unroll space of 400 bytes, so no bytes are freed - assertEnsureFreeSpaceCalled(ms, 0L) + assertEvictBlocksToFreeSpaceNotCalled(ms) // Release beyond what was acquired mm.releaseUnrollMemory(maxStorageMem) assert(mm.storageMemoryUsed === 0L) diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index 7267170d2dcef..65b9763d6d5e7 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -75,37 +75,37 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes val (mm, ms) = makeThings(maxMemory) assert(mm.storageMemoryUsed === 0L) assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks)) - // `ensureFreeSpace` should be called with the number of bytes requested - assertEnsureFreeSpaceCalled(ms, 10L) + assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 10L) + assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks)) - assertEnsureFreeSpaceCalled(ms, 100L) + assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 110L) assert(evictedBlocks.isEmpty) // Acquire more than the max, not granted assert(!mm.acquireStorageMemory(dummyBlock, maxMemory + 1L, evictedBlocks)) - assertEnsureFreeSpaceNotCalled(ms) + assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 110L) assert(evictedBlocks.isEmpty) // Acquire up to the max, requests after this are still granted due to LRU eviction assert(mm.acquireStorageMemory(dummyBlock, maxMemory, evictedBlocks)) - assertEnsureFreeSpaceCalled(ms, 1000L) + assertEvictBlocksToFreeSpaceCalled(ms, 110L) assert(mm.storageMemoryUsed === 1000L) assert(evictedBlocks.nonEmpty) evictedBlocks.clear() assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) - assertEnsureFreeSpaceCalled(ms, 1L) + assertEvictBlocksToFreeSpaceCalled(ms, 1L) assert(mm.storageMemoryUsed === 1000L) mm.releaseStorageMemory(800L) assert(mm.storageMemoryUsed === 200L) // Acquire after release assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) - assertEnsureFreeSpaceCalled(ms, 1L) + assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 201L) mm.releaseAllStorageMemory() assert(mm.storageMemoryUsed === 0L) assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) - assertEnsureFreeSpaceCalled(ms, 1L) + assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 1L) // Release beyond what was acquired mm.releaseStorageMemory(100L) @@ -118,7 +118,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes val (mm, ms) = makeThings(maxMemory) // Acquire enough storage memory to exceed the storage region assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks)) - assertEnsureFreeSpaceCalled(ms, 750L) + assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.executionMemoryUsed === 0L) assert(mm.storageMemoryUsed === 750L) assert(evictedBlocks.isEmpty) @@ -126,13 +126,12 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L) assert(mm.executionMemoryUsed === 100L) assert(mm.storageMemoryUsed === 750L) - assertEnsureFreeSpaceNotCalled(ms) + assertEvictBlocksToFreeSpaceNotCalled(ms) assert(evictedBlocks.isEmpty) // Execution wants 200 bytes but only 150 are free, so storage is evicted assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) === 200L) assert(mm.executionMemoryUsed === 300L) - assertEnsureFreeSpaceNotCalled(ms) - assertFreeSpaceForExecutionCalled(ms, 50L) + assertEvictBlocksToFreeSpaceCalled(ms, 50L) assert(mm.executionMemoryUsed === 300L) assert(evictedBlocks.nonEmpty) mm.releaseAllStorageMemory() @@ -141,7 +140,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes require(mm.storageMemoryUsed === 0, "bad test: all storage memory should have been released") // Acquire some storage memory again, but this time keep it within the storage region assert(mm.acquireStorageMemory(dummyBlock, 400L, evictedBlocks)) - assertEnsureFreeSpaceCalled(ms, 400L) + assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 400L) assert(mm.executionMemoryUsed === 300L) // Execution cannot evict storage because the latter is within the storage fraction, @@ -149,7 +148,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes assert(mm.acquireExecutionMemory(400L, taskAttemptId, MemoryMode.ON_HEAP) === 300L) assert(mm.executionMemoryUsed === 600L) assert(mm.storageMemoryUsed === 400L) - assertEnsureFreeSpaceNotCalled(ms) + assertEvictBlocksToFreeSpaceNotCalled(ms) assert(evictedBlocks.isEmpty) } @@ -159,11 +158,12 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes val (mm, ms) = makeThings(maxMemory) // Acquire enough storage memory to exceed the storage region size assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks)) - assertEnsureFreeSpaceCalled(ms, 750L) + assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.executionMemoryUsed === 0L) assert(mm.storageMemoryUsed === 750L) // Should now be able to require up to 500 bytes of memory assert(mm.acquireExecutionMemory(500L, taskAttemptId, MemoryMode.ON_HEAP) === 500L) + assertEvictBlocksToFreeSpaceCalled(ms, 250L) assert(mm.storageMemoryUsed === 500L) assert(mm.executionMemoryUsed === 500L) assert(evictedBlocks.nonEmpty) @@ -177,32 +177,34 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes assert(mm.acquireExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP) === 800L) assert(mm.executionMemoryUsed === 800L) assert(mm.storageMemoryUsed === 0L) - assertEnsureFreeSpaceNotCalled(ms) + assertEvictBlocksToFreeSpaceNotCalled(ms) // Storage should not be able to evict execution assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks)) assert(mm.executionMemoryUsed === 800L) assert(mm.storageMemoryUsed === 100L) - assertEnsureFreeSpaceCalled(ms, 100L) + assertEvictBlocksToFreeSpaceNotCalled(ms) assert(!mm.acquireStorageMemory(dummyBlock, 250L, evictedBlocks)) assert(mm.executionMemoryUsed === 800L) assert(mm.storageMemoryUsed === 100L) - assertEnsureFreeSpaceCalled(ms, 250L) + assertEvictBlocksToFreeSpaceCalled(ms, 150L) // try to evict blocks ... + assert(evictedBlocks.isEmpty) // ... but don't evict since evicting will not be sufficient mm.releaseExecutionMemory(maxMemory, taskAttemptId, MemoryMode.ON_HEAP) mm.releaseStorageMemory(maxMemory) // Acquire some execution memory again, but this time keep it within the execution region assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) === 200L) assert(mm.executionMemoryUsed === 200L) assert(mm.storageMemoryUsed === 0L) - assertEnsureFreeSpaceNotCalled(ms) + assertEvictBlocksToFreeSpaceNotCalled(ms) // Storage should still not be able to evict execution assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks)) assert(mm.executionMemoryUsed === 200L) assert(mm.storageMemoryUsed === 750L) - assertEnsureFreeSpaceCalled(ms, 750L) + assertEvictBlocksToFreeSpaceNotCalled(ms) // since there was 800 bytes free assert(!mm.acquireStorageMemory(dummyBlock, 850L, evictedBlocks)) assert(mm.executionMemoryUsed === 200L) assert(mm.storageMemoryUsed === 750L) - assertEnsureFreeSpaceCalled(ms, 850L) + assertEvictBlocksToFreeSpaceCalled(ms, 800L) // try to evict blocks... + assert(evictedBlocks.isEmpty) // ... but don't evict since evicting will not be sufficient } test("small heap") { From 0eac7da041326cfbec2c1db2f279cb655744f90e Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 6 Dec 2015 19:39:37 -0800 Subject: [PATCH 08/17] Update JIRA in test case. --- .../org/apache/spark/memory/UnifiedMemoryManagerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index 65b9763d6d5e7..3d259d3058457 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -152,7 +152,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes assert(evictedBlocks.isEmpty) } - test("execution can evict storage blocks when storage memory is below max mem (SPARK-12155)") { + test("execution can evict storage blocks when storage memory is below max mem (SPARK-12165)") { val maxMemory = 1000L val taskAttemptId = 0L val (mm, ms) = makeThings(maxMemory) From b82ee149e2f13c4cebe5e2ecc46ad115aa7115bc Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 8 Dec 2015 13:58:13 -0800 Subject: [PATCH 09/17] Rewrite unroll test after changes in eviction --- .../memory/StaticMemoryManagerSuite.scala | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala index a97e70d7aab19..60e7705631d29 100644 --- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala @@ -148,20 +148,26 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { val dummyBlock = TestBlockId("lonely water") val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem) assert(mm.acquireUnrollMemory(dummyBlock, 100L, evictedBlocks)) + when(ms.currentUnrollMemory).thenReturn(100L) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 100L) mm.releaseUnrollMemory(40L) assert(mm.storageMemoryUsed === 60L) when(ms.currentUnrollMemory).thenReturn(60L) - assert(mm.acquireUnrollMemory(dummyBlock, 500L, evictedBlocks)) - // `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400 bytes. - // Since we already occupy 60 bytes, we will try to ensure only 400 - 60 = 340 bytes. + assert(mm.acquireStorageMemory(dummyBlock, 800L, evictedBlocks)) assertEvictBlocksToFreeSpaceNotCalled(ms) - assert(mm.storageMemoryUsed === 560L) - when(ms.currentUnrollMemory).thenReturn(560L) + assert(mm.storageMemoryUsed === 860L) + // `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400 bytes. + // Since we already occupy 60 bytes, we will try to evict only 400 - 60 = 340 bytes. assert(!mm.acquireUnrollMemory(dummyBlock, 800L, evictedBlocks)) - assert(mm.storageMemoryUsed === 560L) - // We already have 560 bytes > the max unroll space of 400 bytes, so no bytes are freed + assertEvictBlocksToFreeSpaceCalled(ms, 340L) + assert(mm.storageMemoryUsed === 520L) + // Acquire more unroll memory to exceed our "max unroll space" + assert(mm.acquireUnrollMemory(dummyBlock, 440L, evictedBlocks)) + when(ms.currentUnrollMemory).thenReturn(500L) + assert(mm.storageMemoryUsed === 960L) + assert(!mm.acquireUnrollMemory(dummyBlock, 300L, evictedBlocks)) + // We already have 500 bytes > the max unroll space of 400 bytes, so no bytes are freed assertEvictBlocksToFreeSpaceNotCalled(ms) // Release beyond what was acquired mm.releaseUnrollMemory(maxStorageMem) From 528cd85dc7e1d4e7559cd99ad4fd4d713d581516 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 8 Dec 2015 15:52:19 -0800 Subject: [PATCH 10/17] Clarify regression test for SPARK-12165 This commit also adds a regression test for SPARK-12189 through the existing test "execution evicts storage" by adding an assert. --- .../spark/memory/UnifiedMemoryManagerSuite.scala | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index 3d259d3058457..1f2500b1c5043 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -131,8 +131,8 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes // Execution wants 200 bytes but only 150 are free, so storage is evicted assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) === 200L) assert(mm.executionMemoryUsed === 300L) + assert(mm.storageMemoryUsed === 700L) assertEvictBlocksToFreeSpaceCalled(ms, 50L) - assert(mm.executionMemoryUsed === 300L) assert(evictedBlocks.nonEmpty) mm.releaseAllStorageMemory() evictedBlocks.clear() @@ -152,18 +152,21 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes assert(evictedBlocks.isEmpty) } - test("execution can evict storage blocks when storage memory is below max mem (SPARK-12165)") { + test("execution memory requests smaller than free memory should evict storage (SPARK-12165)") { val maxMemory = 1000L val taskAttemptId = 0L val (mm, ms) = makeThings(maxMemory) // Acquire enough storage memory to exceed the storage region size - assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 700L, evictedBlocks)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.executionMemoryUsed === 0L) - assert(mm.storageMemoryUsed === 750L) - // Should now be able to require up to 500 bytes of memory + assert(mm.storageMemoryUsed === 700L) + // SPARK-12165: previously, MemoryStore would not evict anything because it would + // mistakenly think that the 300 bytes of free space was still available even after + // using it to expand the execution pool. Consequently, no storage memory was released + // and the following call granted only 300 bytes to execution. assert(mm.acquireExecutionMemory(500L, taskAttemptId, MemoryMode.ON_HEAP) === 500L) - assertEvictBlocksToFreeSpaceCalled(ms, 250L) + assertEvictBlocksToFreeSpaceCalled(ms, 200L) assert(mm.storageMemoryUsed === 500L) assert(mm.executionMemoryUsed === 500L) assert(evictedBlocks.nonEmpty) From e48ef21c4c97107f2800d2b33fc4f6e7e196f632 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 8 Dec 2015 16:38:52 -0800 Subject: [PATCH 11/17] Incorporate Andrew's redability suggestions. --- .../apache/spark/memory/StorageMemoryPool.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala index 97959b2f3dd0c..23ae590b3ad7d 100644 --- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala +++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala @@ -86,8 +86,8 @@ class StorageMemoryPool(lock: Object) extends MemoryPool(lock) with Logging { assert(memoryUsed <= poolSize) if (numBytesToAcquire > memoryFree && maxNumBytesToFree > 0) { val additionalMemoryRequired = numBytesToAcquire - memoryFree - memoryStore.evictBlocksToFreeSpace( - Some(blockId), Math.min(maxNumBytesToFree, additionalMemoryRequired), evictedBlocks) + val numBytesToFree = math.min(maxNumBytesToFree, additionalMemoryRequired) + memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, evictedBlocks) // Register evicted blocks, if any, with the active task metrics Option(TaskContext.get()).foreach { tc => val metrics = tc.taskMetrics() @@ -125,20 +125,20 @@ class StorageMemoryPool(lock: Object) extends MemoryPool(lock) with Logging { */ def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized { // First, shrink the pool by reclaiming free memory: - val spaceFreedByReleasingUnusedMemory = Math.min(spaceToFree, memoryFree) + val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree) decrementPoolSize(spaceFreedByReleasingUnusedMemory) - if (spaceFreedByReleasingUnusedMemory == spaceToFree) { - spaceFreedByReleasingUnusedMemory - } else { + val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory + if (remainingSpaceToFree > 0) { // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks: val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - memoryStore.evictBlocksToFreeSpace( - None, spaceToFree - spaceFreedByReleasingUnusedMemory, evictedBlocks) + memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, evictedBlocks) val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum // When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do // not need to decrement _memoryUsed here. However, we do need to decrement the pool size. decrementPoolSize(spaceFreedByEviction) spaceFreedByReleasingUnusedMemory + spaceFreedByEviction + } else { + spaceFreedByReleasingUnusedMemory } } } From 994585fde74840d9e4c9a4797196314e0ee72f99 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 8 Dec 2015 16:40:25 -0800 Subject: [PATCH 12/17] maxMemory -> maxStorageMemory in check for blocks that can never fit --- .../org/apache/spark/memory/StaticMemoryManager.scala | 4 ++-- .../org/apache/spark/memory/UnifiedMemoryManager.scala | 4 ++-- .../apache/spark/memory/UnifiedMemoryManagerSuite.scala | 8 ++++---- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala index 0382d055b0c35..d2ffba27f3434 100644 --- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala @@ -57,10 +57,10 @@ private[spark] class StaticMemoryManager( blockId: BlockId, numBytes: Long, evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized { - if (numBytes > storageMemoryPool.poolSize) { + if (numBytes > maxStorageMemory) { // Fail fast if the block simply won't fit logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " + - s"memory limit (${storageMemoryPool.poolSize} bytes)") + s"memory limit ($maxStorageMemory bytes)") false } else { storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks) diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index d66464ab6fda9..785e2b3f38f92 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -110,10 +110,10 @@ private[spark] class UnifiedMemoryManager private[memory] ( evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized { assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory) assert(numBytes >= 0) - if (numBytes > maxMemory) { + if (numBytes > maxStorageMemory) { // Fail fast if the block simply won't fit logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " + - s"memory limit ($maxMemory bytes)") + s"memory limit ($maxStorageMemory bytes)") return false } if (numBytes > storageMemoryPool.memoryFree) { diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index 1f2500b1c5043..a1e2b8e73aa2f 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -189,8 +189,8 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes assert(!mm.acquireStorageMemory(dummyBlock, 250L, evictedBlocks)) assert(mm.executionMemoryUsed === 800L) assert(mm.storageMemoryUsed === 100L) - assertEvictBlocksToFreeSpaceCalled(ms, 150L) // try to evict blocks ... - assert(evictedBlocks.isEmpty) // ... but don't evict since evicting will not be sufficient + // Do not attempt to evict blocks, since evicting will not free enough memory: + assertEvictBlocksToFreeSpaceNotCalled(ms) mm.releaseExecutionMemory(maxMemory, taskAttemptId, MemoryMode.ON_HEAP) mm.releaseStorageMemory(maxMemory) // Acquire some execution memory again, but this time keep it within the execution region @@ -206,8 +206,8 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes assert(!mm.acquireStorageMemory(dummyBlock, 850L, evictedBlocks)) assert(mm.executionMemoryUsed === 200L) assert(mm.storageMemoryUsed === 750L) - assertEvictBlocksToFreeSpaceCalled(ms, 800L) // try to evict blocks... - assert(evictedBlocks.isEmpty) // ... but don't evict since evicting will not be sufficient + // Do not attempt to evict blocks, since evicting will not free enough memory: + assertEvictBlocksToFreeSpaceNotCalled(ms) } test("small heap") { From f6fb4066e52baa6bf6c3680db6cdc967330a110c Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 8 Dec 2015 16:46:20 -0800 Subject: [PATCH 13/17] Avoid double-add to evictedBlocks. --- .../org/apache/spark/memory/MemoryManagerSuite.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala index d9e1e825b1d41..9d0b9aa03a3b3 100644 --- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala @@ -103,8 +103,14 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft mm.releaseStorageMemory(numBytesToFree) args.last.asInstanceOf[mutable.Buffer[(BlockId, BlockStatus)]].append( (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L, 0L))) - evictedBlocks.append( - (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L, 0L))) + // We need to add this call so that that the suite-level `evictedBlocks` is updated when + // execution evicts storage; in that case, args.last will not be equal to evictedBlocks + // because it will be a temporary buffer created inside of the MemoryManager rather than + // being passed in by the test code. + if (!(evictedBlocks eq args.last)) { + evictedBlocks.append( + (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L, 0L))) + } true } else { // No blocks were evicted because eviction would not free enough space. From 52b544c2fa3089d59810f8e087bbba4baf26cf83 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 8 Dec 2015 17:19:18 -0800 Subject: [PATCH 14/17] was -> were --- .../org/apache/spark/memory/UnifiedMemoryManagerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index a1e2b8e73aa2f..6f43bb1d2a52c 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -202,7 +202,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks)) assert(mm.executionMemoryUsed === 200L) assert(mm.storageMemoryUsed === 750L) - assertEvictBlocksToFreeSpaceNotCalled(ms) // since there was 800 bytes free + assertEvictBlocksToFreeSpaceNotCalled(ms) // since there were 800 bytes free assert(!mm.acquireStorageMemory(dummyBlock, 850L, evictedBlocks)) assert(mm.executionMemoryUsed === 200L) assert(mm.storageMemoryUsed === 750L) From e2090d1e11109b8bfd232d3a33d9acbf775e0872 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 8 Dec 2015 17:41:23 -0800 Subject: [PATCH 15/17] Add comments to explain potentially-confusing 1-byte block eviction test. --- .../org/apache/spark/memory/StaticMemoryManagerSuite.scala | 3 +++ .../org/apache/spark/memory/UnifiedMemoryManagerSuite.scala | 3 +++ 2 files changed, 6 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala index 60e7705631d29..ed319d9cad092 100644 --- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala @@ -98,6 +98,9 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { assert(mm.storageMemoryUsed === 1000L) assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) assertEvictBlocksToFreeSpaceCalled(ms, 1L) + // Note: We evicted 1 byte to put another 1-byte block in, so the storage memory used remains at + // 1000 bytes. This is different from real behavior, where the 1-byte block would have evicted + // the 1000-byte block entirely. This is set up differently so we can write finer-grained tests. assert(mm.storageMemoryUsed === 1000L) mm.releaseStorageMemory(800L) assert(mm.storageMemoryUsed === 200L) diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index 6f43bb1d2a52c..93cd1cca759d0 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -95,6 +95,9 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes evictedBlocks.clear() assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) assertEvictBlocksToFreeSpaceCalled(ms, 1L) + // Note: We evicted 1 byte to put another 1-byte block in, so the storage memory used remains at + // 1000 bytes. This is different from real behavior, where the 1-byte block would have evicted + // the 1000-byte block entirely. This is set up differently so we can write finer-grained tests. assert(mm.storageMemoryUsed === 1000L) mm.releaseStorageMemory(800L) assert(mm.storageMemoryUsed === 200L) From a43ed343f81dc2001718dcd1812ed85a1d849d72 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 8 Dec 2015 17:56:06 -0800 Subject: [PATCH 16/17] Push evictedBlocks checks into assertEvictBlocksToFreeSpaceNotCalled. --- .../org/apache/spark/memory/MemoryManagerSuite.scala | 1 + .../apache/spark/memory/StaticMemoryManagerSuite.scala | 4 ++++ .../apache/spark/memory/UnifiedMemoryManagerSuite.scala | 9 +++------ 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala index 9d0b9aa03a3b3..555b640cb4244 100644 --- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala @@ -135,6 +135,7 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft protected def assertEvictBlocksToFreeSpaceNotCalled[T](ms: MemoryStore): Unit = { assert(evictBlocksToFreeSpaceCalled.get() === DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED, "evictBlocksToFreeSpace() should not have been called!") + assert(evictedBlocks.isEmpty) } /** diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala index ed319d9cad092..ded5f759d9734 100644 --- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala @@ -98,6 +98,8 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { assert(mm.storageMemoryUsed === 1000L) assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) assertEvictBlocksToFreeSpaceCalled(ms, 1L) + assert(evictedBlocks.nonEmpty) + evictedBlocks.clear() // Note: We evicted 1 byte to put another 1-byte block in, so the storage memory used remains at // 1000 bytes. This is different from real behavior, where the 1-byte block would have evicted // the 1000-byte block entirely. This is set up differently so we can write finer-grained tests. @@ -164,6 +166,8 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { // Since we already occupy 60 bytes, we will try to evict only 400 - 60 = 340 bytes. assert(!mm.acquireUnrollMemory(dummyBlock, 800L, evictedBlocks)) assertEvictBlocksToFreeSpaceCalled(ms, 340L) + assert(evictedBlocks.nonEmpty) + evictedBlocks.clear() assert(mm.storageMemoryUsed === 520L) // Acquire more unroll memory to exceed our "max unroll space" assert(mm.acquireUnrollMemory(dummyBlock, 440L, evictedBlocks)) diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index 93cd1cca759d0..71221deeb4c28 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -81,12 +81,10 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 110L) - assert(evictedBlocks.isEmpty) // Acquire more than the max, not granted assert(!mm.acquireStorageMemory(dummyBlock, maxMemory + 1L, evictedBlocks)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 110L) - assert(evictedBlocks.isEmpty) // Acquire up to the max, requests after this are still granted due to LRU eviction assert(mm.acquireStorageMemory(dummyBlock, maxMemory, evictedBlocks)) assertEvictBlocksToFreeSpaceCalled(ms, 110L) @@ -95,6 +93,8 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes evictedBlocks.clear() assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) assertEvictBlocksToFreeSpaceCalled(ms, 1L) + assert(evictedBlocks.nonEmpty) + evictedBlocks.clear() // Note: We evicted 1 byte to put another 1-byte block in, so the storage memory used remains at // 1000 bytes. This is different from real behavior, where the 1-byte block would have evicted // the 1000-byte block entirely. This is set up differently so we can write finer-grained tests. @@ -124,21 +124,19 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.executionMemoryUsed === 0L) assert(mm.storageMemoryUsed === 750L) - assert(evictedBlocks.isEmpty) // Execution needs to request 250 bytes to evict storage memory assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L) assert(mm.executionMemoryUsed === 100L) assert(mm.storageMemoryUsed === 750L) assertEvictBlocksToFreeSpaceNotCalled(ms) - assert(evictedBlocks.isEmpty) // Execution wants 200 bytes but only 150 are free, so storage is evicted assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) === 200L) assert(mm.executionMemoryUsed === 300L) assert(mm.storageMemoryUsed === 700L) assertEvictBlocksToFreeSpaceCalled(ms, 50L) assert(evictedBlocks.nonEmpty) - mm.releaseAllStorageMemory() evictedBlocks.clear() + mm.releaseAllStorageMemory() require(mm.executionMemoryUsed === 300L) require(mm.storageMemoryUsed === 0, "bad test: all storage memory should have been released") // Acquire some storage memory again, but this time keep it within the storage region @@ -152,7 +150,6 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes assert(mm.executionMemoryUsed === 600L) assert(mm.storageMemoryUsed === 400L) assertEvictBlocksToFreeSpaceNotCalled(ms) - assert(evictedBlocks.isEmpty) } test("execution memory requests smaller than free memory should evict storage (SPARK-12165)") { From 855c0bc7f301c91ca260870ee7728a64df72b440 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 8 Dec 2015 20:18:43 -0800 Subject: [PATCH 17/17] Fix unroll semantics to match old behavior Before this commit, unrolling would evict too many blocks, resulting in test failures in BlockManagerSuite. The root cause is that we used `maxUnrollMemory` as a cap for the extra amount of memory to evict for unrolling, which is incorrect. Instead, we should use it as a cap for the total amount of unroll memory and calculate the amount of memory to evict from there. The goal of this commit is to preserve the old behavior (in 1.5) as much as possible. This can be seen from the fact that BlockManagerSuite now passes without any modifications. --- .../spark/memory/StaticMemoryManager.scala | 12 +++++++++--- .../spark/memory/StorageMemoryPool.scala | 13 ++++++------- .../memory/StaticMemoryManagerSuite.scala | 18 +++++++----------- 3 files changed, 22 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala index d2ffba27f3434..3554b558f2123 100644 --- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala @@ -49,7 +49,7 @@ private[spark] class StaticMemoryManager( } // Max number of bytes worth of blocks to evict when unrolling - private val maxMemoryToEvictForUnroll: Long = { + private val maxUnrollMemory: Long = { (maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong } @@ -72,8 +72,14 @@ private[spark] class StaticMemoryManager( numBytes: Long, evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized { val currentUnrollMemory = storageMemoryPool.memoryStore.currentUnrollMemory - val maxNumBytesToFree = math.max(0, maxMemoryToEvictForUnroll - currentUnrollMemory) - val numBytesToFree = math.min(numBytes, maxNumBytesToFree) + val freeMemory = storageMemoryPool.memoryFree + // When unrolling, we will use all of the existing free memory, and, if necessary, + // some extra space freed from evicting cached blocks. We must place a cap on the + // amount of memory to be evicted by unrolling, however, otherwise unrolling one + // big block can blow away the entire cache. + val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory) + // Keep it within the range 0 <= X <= maxNumBytesToFree + val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory)) storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree, evictedBlocks) } diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala index 23ae590b3ad7d..abea3e1626d8d 100644 --- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala +++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala @@ -65,7 +65,8 @@ class StorageMemoryPool(lock: Object) extends MemoryPool(lock) with Logging { blockId: BlockId, numBytes: Long, evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = lock.synchronized { - acquireMemory(blockId, numBytes, numBytes, evictedBlocks) + val numBytesToFree = math.max(0, numBytes - memoryFree) + acquireMemory(blockId, numBytes, numBytesToFree, evictedBlocks) } /** @@ -73,20 +74,18 @@ class StorageMemoryPool(lock: Object) extends MemoryPool(lock) with Logging { * * @param blockId the ID of the block we are acquiring storage memory for * @param numBytesToAcquire the size of this block - * @param maxNumBytesToFree the maximum amount of space to be freed through evicting blocks + * @param numBytesToFree the amount of space to be freed through evicting blocks * @return whether all N bytes were successfully granted. */ def acquireMemory( blockId: BlockId, numBytesToAcquire: Long, - maxNumBytesToFree: Long, + numBytesToFree: Long, evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = lock.synchronized { assert(numBytesToAcquire >= 0) - assert(maxNumBytesToFree >= 0) + assert(numBytesToFree >= 0) assert(memoryUsed <= poolSize) - if (numBytesToAcquire > memoryFree && maxNumBytesToFree > 0) { - val additionalMemoryRequired = numBytesToAcquire - memoryFree - val numBytesToFree = math.min(maxNumBytesToFree, additionalMemoryRequired) + if (numBytesToFree > 0) { memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, evictedBlocks) // Register evicted blocks, if any, with the active task metrics Option(TaskContext.get()).foreach { tc => diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala index ded5f759d9734..6700b94f0f57f 100644 --- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala @@ -164,18 +164,14 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { assert(mm.storageMemoryUsed === 860L) // `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400 bytes. // Since we already occupy 60 bytes, we will try to evict only 400 - 60 = 340 bytes. - assert(!mm.acquireUnrollMemory(dummyBlock, 800L, evictedBlocks)) - assertEvictBlocksToFreeSpaceCalled(ms, 340L) - assert(evictedBlocks.nonEmpty) + assert(mm.acquireUnrollMemory(dummyBlock, 240L, evictedBlocks)) + assertEvictBlocksToFreeSpaceCalled(ms, 100L) + when(ms.currentUnrollMemory).thenReturn(300L) // 60 + 240 + assert(mm.storageMemoryUsed === 1000L) evictedBlocks.clear() - assert(mm.storageMemoryUsed === 520L) - // Acquire more unroll memory to exceed our "max unroll space" - assert(mm.acquireUnrollMemory(dummyBlock, 440L, evictedBlocks)) - when(ms.currentUnrollMemory).thenReturn(500L) - assert(mm.storageMemoryUsed === 960L) - assert(!mm.acquireUnrollMemory(dummyBlock, 300L, evictedBlocks)) - // We already have 500 bytes > the max unroll space of 400 bytes, so no bytes are freed - assertEvictBlocksToFreeSpaceNotCalled(ms) + assert(!mm.acquireUnrollMemory(dummyBlock, 150L, evictedBlocks)) + assertEvictBlocksToFreeSpaceCalled(ms, 100L) // 400 - 300 + assert(mm.storageMemoryUsed === 900L) // 100 bytes were evicted // Release beyond what was acquired mm.releaseUnrollMemory(maxStorageMem) assert(mm.storageMemoryUsed === 0L)