From 0bf0d941c412561ebd27a806f4cc54178626b760 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 8 Oct 2015 14:44:28 -0700 Subject: [PATCH 1/2] Add comment to pending unroll. --- .../apache/spark/storage/MemoryStore.scala | 35 +++++++------------ 1 file changed, 13 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 4ad12ab7897e8..1431e63c4680b 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -304,9 +304,21 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo // later when the task finishes. if (keepUnrolling) { accountingLock.synchronized { + // Here, we are logically transferring memory from unroll memory to pending unroll memory. + // We release and re-acquire the memory from the MemoryManager. As of today, this is not + // race-prone because all calls to [acquire|release]UnrollMemoryForThisTask() occur in + // MemoryStore and are guarded by `accountingLock`, MemoryStore is the only component + // which allocates storage memory, and unroll memory is currently counted towards + // storage memory. If we ever change things so that unroll memory is counted towards + // execution memory, then we will need to revisit this argument as it may no longer hold. + // TODO: revisit this as part of SPARK-10983. val amountToRelease = currentUnrollMemoryForThisTask - previousMemoryReserved releaseUnrollMemoryForThisTask(amountToRelease) - reservePendingUnrollMemoryForThisTask(blockId, amountToRelease, droppedBlocks) + val acquired = memoryManager.acquireUnrollMemory(blockId, amountToRelease, droppedBlocks) + assert(acquired == amountToRelease) + val taskAttemptId = currentTaskAttemptId() + pendingUnrollMemoryMap(taskAttemptId) = + pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + amountToRelease } } } @@ -516,27 +528,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo } } - /** - * Reserve the unroll memory of current unroll successful block used by this task - * until actually put the block into memory entry. - * @return whether the request is granted. - */ - private def reservePendingUnrollMemoryForThisTask( - blockId: BlockId, - memory: Long, - droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { - val taskAttemptId = currentTaskAttemptId() - accountingLock.synchronized { - val acquired = memoryManager.acquireUnrollMemory(blockId, memory, droppedBlocks) - val success = acquired == memory - if (success) { - pendingUnrollMemoryMap(taskAttemptId) = - pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory - } - success - } - } - /** * Release pending unroll memory of current unroll successful block used by this task */ From b1e8fd16d9f08ff2ff693c44b09143c810c05544 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 8 Oct 2015 15:00:32 -0700 Subject: [PATCH 2/2] Pending unroll transfer. --- .../apache/spark/storage/MemoryStore.scala | 29 +++++++++---------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 1431e63c4680b..0013ae6b39918 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -303,22 +303,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo // Otherwise, if we return an iterator, we release the memory reserved here // later when the task finishes. if (keepUnrolling) { + val taskAttemptId = currentTaskAttemptId() accountingLock.synchronized { - // Here, we are logically transferring memory from unroll memory to pending unroll memory. - // We release and re-acquire the memory from the MemoryManager. As of today, this is not - // race-prone because all calls to [acquire|release]UnrollMemoryForThisTask() occur in - // MemoryStore and are guarded by `accountingLock`, MemoryStore is the only component - // which allocates storage memory, and unroll memory is currently counted towards - // storage memory. If we ever change things so that unroll memory is counted towards - // execution memory, then we will need to revisit this argument as it may no longer hold. - // TODO: revisit this as part of SPARK-10983. - val amountToRelease = currentUnrollMemoryForThisTask - previousMemoryReserved - releaseUnrollMemoryForThisTask(amountToRelease) - val acquired = memoryManager.acquireUnrollMemory(blockId, amountToRelease, droppedBlocks) - assert(acquired == amountToRelease) - val taskAttemptId = currentTaskAttemptId() + // Here, we transfer memory from unroll to pending unroll because we expect to cache this + // block in `tryToPut`. We do not release and re-acquire memory from the MemoryManager in + // order to avoid race conditions where another component steals the memory that we're + // trying to transfer. + val amountToTransferToPending = currentUnrollMemoryForThisTask - previousMemoryReserved + unrollMemoryMap(taskAttemptId) -= amountToTransferToPending pendingUnrollMemoryMap(taskAttemptId) = - pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + amountToRelease + pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + amountToTransferToPending } } } @@ -374,7 +368,12 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo // Note: if we have previously unrolled this block successfully, then pending unroll // memory should be non-zero. This is the amount that we already reserved during the // unrolling process. In this case, we can just reuse this space to cache our block. - // This must be synchronized so the release and re-acquire can happen atomically. + // + // Note: the StaticMemoryManager counts unroll memory as storage memory. Here, the + // synchronization on `accountingLock` guarantees that the release of unroll memory and + // acquisition of storage memory happens atomically. However, if storage memory is acquired + // outside of MemoryStore or if unroll memory is counted as execution memory, then we will + // have to revisit this assumption. See SPARK-10983 for more context. releasePendingUnrollMemoryForThisTask() val numBytesAcquired = memoryManager.acquireStorageMemory(blockId, size, droppedBlocks) val enoughMemory = numBytesAcquired == size