Skip to content

Commit 69e22fe

Browse files
committed
Add separate on- and off-heap storage memory pools to MemoryManager
1 parent 86c8a49 commit 69e22fe

File tree

12 files changed

+273
-185
lines changed

12 files changed

+273
-185
lines changed

core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,18 @@ import org.apache.spark.internal.Logging
3737
* tasks was performed by the ShuffleMemoryManager.
3838
*
3939
* @param lock a [[MemoryManager]] instance to synchronize on
40-
* @param poolName a human-readable name for this pool, for use in log messages
40+
* @param memoryMode the type of memory tracked by this pool (on- or off-heap)
4141
*/
4242
private[memory] class ExecutionMemoryPool(
4343
lock: Object,
44-
poolName: String
44+
memoryMode: MemoryMode
4545
) extends MemoryPool(lock) with Logging {
4646

47+
private[this] val poolName: String = memoryMode match {
48+
case MemoryMode.ON_HEAP => "on-heap execution"
49+
case MemoryMode.OFF_HEAP => "off-heap execution"
50+
}
51+
4752
/**
4853
* Map from taskAttemptId -> memory consumption in bytes
4954
*/

core/src/main/scala/org/apache/spark/memory/MemoryManager.scala

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,42 +36,52 @@ import org.apache.spark.unsafe.memory.MemoryAllocator
3636
private[spark] abstract class MemoryManager(
3737
conf: SparkConf,
3838
numCores: Int,
39-
storageMemory: Long,
39+
onHeapStorageMemory: Long,
4040
onHeapExecutionMemory: Long) extends Logging {
4141

4242
// -- Methods related to memory allocation policies and bookkeeping ------------------------------
4343

4444
@GuardedBy("this")
45-
protected val storageMemoryPool = new StorageMemoryPool(this)
45+
protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP)
4646
@GuardedBy("this")
47-
protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, "on-heap execution")
47+
protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP)
4848
@GuardedBy("this")
49-
protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, "off-heap execution")
49+
protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP)
50+
@GuardedBy("this")
51+
protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)
5052

51-
storageMemoryPool.incrementPoolSize(storageMemory)
53+
onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory)
5254
onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)
53-
offHeapExecutionMemoryPool.incrementPoolSize(conf.getSizeAsBytes("spark.memory.offHeap.size", 0))
55+
56+
protected[this] val maxOffHeapMemory = conf.getSizeAsBytes("spark.memory.offHeap.size", 0)
57+
protected[this] val offHeapStorageMemory =
58+
(maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong
59+
60+
offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - offHeapStorageMemory)
61+
offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)
5462

5563
/**
5664
* Total available memory for storage, in bytes. This amount can vary over time, depending on
5765
* the MemoryManager implementation.
5866
* In this model, this is equivalent to the amount of memory not occupied by execution.
5967
*/
60-
def maxStorageMemory: Long
68+
def maxOnHeapStorageMemory: Long
6169

6270
/**
6371
* Set the [[MemoryStore]] used by this manager to evict cached blocks.
6472
* This must be set after construction due to initialization ordering constraints.
6573
*/
6674
final def setMemoryStore(store: MemoryStore): Unit = synchronized {
67-
storageMemoryPool.setMemoryStore(store)
75+
onHeapStorageMemoryPool.setMemoryStore(store)
76+
offHeapStorageMemoryPool.setMemoryStore(store)
6877
}
6978

7079
/**
7180
* Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
81+
*
7282
* @return whether all N bytes were successfully granted.
7383
*/
74-
def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean
84+
def acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
7585

7686
/**
7787
* Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary.
@@ -82,7 +92,7 @@ private[spark] abstract class MemoryManager(
8292
*
8393
* @return whether all N bytes were successfully granted.
8494
*/
85-
def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean
95+
def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
8696

8797
/**
8898
* Try to acquire up to `numBytes` of execution memory for the current task and return the
@@ -126,22 +136,26 @@ private[spark] abstract class MemoryManager(
126136
/**
127137
* Release N bytes of storage memory.
128138
*/
129-
def releaseStorageMemory(numBytes: Long): Unit = synchronized {
130-
storageMemoryPool.releaseMemory(numBytes)
139+
def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {
140+
memoryMode match {
141+
case MemoryMode.ON_HEAP => onHeapStorageMemoryPool.releaseMemory(numBytes)
142+
case MemoryMode.OFF_HEAP => offHeapStorageMemoryPool.releaseMemory(numBytes)
143+
}
131144
}
132145

133146
/**
134147
* Release all storage memory acquired.
135148
*/
136149
final def releaseAllStorageMemory(): Unit = synchronized {
137-
storageMemoryPool.releaseAllMemory()
150+
onHeapStorageMemoryPool.releaseAllMemory()
151+
offHeapStorageMemoryPool.releaseAllMemory()
138152
}
139153

140154
/**
141155
* Release N bytes of unroll memory.
142156
*/
143-
final def releaseUnrollMemory(numBytes: Long): Unit = synchronized {
144-
releaseStorageMemory(numBytes)
157+
final def releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {
158+
releaseStorageMemory(numBytes, memoryMode)
145159
}
146160

147161
/**
@@ -155,7 +169,7 @@ private[spark] abstract class MemoryManager(
155169
* Storage memory currently in use, in bytes.
156170
*/
157171
final def storageMemoryUsed: Long = synchronized {
158-
storageMemoryPool.memoryUsed
172+
onHeapStorageMemoryPool.memoryUsed + offHeapStorageMemoryPool.memoryUsed
159173
}
160174

161175
/**

core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,12 @@ import org.apache.spark.storage.BlockId
3030
private[spark] class StaticMemoryManager(
3131
conf: SparkConf,
3232
maxOnHeapExecutionMemory: Long,
33-
override val maxStorageMemory: Long,
33+
override val maxOnHeapStorageMemory: Long,
3434
numCores: Int)
3535
extends MemoryManager(
3636
conf,
3737
numCores,
38-
maxStorageMemory,
38+
maxOnHeapStorageMemory,
3939
maxOnHeapExecutionMemory) {
4040

4141
def this(conf: SparkConf, numCores: Int) {
@@ -46,33 +46,47 @@ private[spark] class StaticMemoryManager(
4646
numCores)
4747
}
4848

49+
// The StaticMemoryManager does not support off-heap storage memory:
50+
offHeapExecutionMemoryPool.incrementPoolSize(offHeapStorageMemoryPool.poolSize)
51+
offHeapStorageMemoryPool.decrementPoolSize(offHeapStorageMemoryPool.poolSize)
52+
4953
// Max number of bytes worth of blocks to evict when unrolling
5054
private val maxUnrollMemory: Long = {
51-
(maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
55+
(maxOnHeapStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
5256
}
5357

54-
override def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized {
55-
if (numBytes > maxStorageMemory) {
58+
override def acquireStorageMemory(
59+
blockId: BlockId,
60+
numBytes: Long,
61+
memoryMode: MemoryMode): Boolean = synchronized {
62+
require(memoryMode != MemoryMode.OFF_HEAP,
63+
"StaticMemoryManager does not support off-heap storage memory")
64+
if (numBytes > maxOnHeapStorageMemory) {
5665
// Fail fast if the block simply won't fit
5766
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
58-
s"memory limit ($maxStorageMemory bytes)")
67+
s"memory limit ($maxOnHeapStorageMemory bytes)")
5968
false
6069
} else {
61-
storageMemoryPool.acquireMemory(blockId, numBytes)
70+
onHeapStorageMemoryPool.acquireMemory(blockId, numBytes)
6271
}
6372
}
6473

65-
override def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized {
66-
val currentUnrollMemory = storageMemoryPool.memoryStore.currentUnrollMemory
67-
val freeMemory = storageMemoryPool.memoryFree
74+
override def acquireUnrollMemory(
75+
blockId: BlockId,
76+
numBytes: Long,
77+
memoryMode: MemoryMode): Boolean = synchronized {
78+
require(memoryMode != MemoryMode.OFF_HEAP,
79+
"StaticMemoryManager does not support off-heap unroll memory")
80+
val currentUnrollMemory = onHeapStorageMemoryPool.memoryStore.currentUnrollMemory
81+
val freeMemory = onHeapStorageMemoryPool.memoryFree
6882
// When unrolling, we will use all of the existing free memory, and, if necessary,
6983
// some extra space freed from evicting cached blocks. We must place a cap on the
7084
// amount of memory to be evicted by unrolling, however, otherwise unrolling one
7185
// big block can blow away the entire cache.
7286
val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory)
7387
// Keep it within the range 0 <= X <= maxNumBytesToFree
7488
val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory))
75-
storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
89+
onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
7690
}
7791

7892
private[memory]

core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,12 @@ import org.apache.spark.storage.memory.MemoryStore
2828
* (caching).
2929
*
3030
* @param lock a [[MemoryManager]] instance to synchronize on
31+
* @param memoryMode the type of memory tracked by this pool (on- or off-heap)
3132
*/
32-
private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) with Logging {
33+
private[memory] class StorageMemoryPool(
34+
lock: Object,
35+
memoryMode: MemoryMode
36+
) extends MemoryPool(lock) with Logging {
3337

3438
@GuardedBy("lock")
3539
private[this] var _memoryUsed: Long = 0L
@@ -79,7 +83,8 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
7983
assert(numBytesToAcquire >= 0)
8084
assert(numBytesToFree >= 0)
8185
assert(memoryUsed <= poolSize)
82-
if (numBytesToFree > 0) {
86+
// Once we support off-heap caching, this will need to change:
87+
if (numBytesToFree > 0 && memoryMode == MemoryMode.ON_HEAP) {
8388
memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree)
8489
}
8590
// NOTE: If the memory store evicts blocks, then those evictions will synchronously call
@@ -117,7 +122,14 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
117122
val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
118123
if (remainingSpaceToFree > 0) {
119124
// If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
120-
val spaceFreedByEviction = memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree)
125+
val spaceFreedByEviction = {
126+
// Once we support off-heap caching, this will need to change:
127+
if (memoryMode == MemoryMode.ON_HEAP) {
128+
memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree)
129+
} else {
130+
0
131+
}
132+
}
121133
// When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
122134
// not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
123135
decrementPoolSize(spaceFreedByEviction)

0 commit comments

Comments
 (0)