Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,18 @@ import org.apache.spark.internal.Logging
* tasks was performed by the ShuffleMemoryManager.
*
* @param lock a [[MemoryManager]] instance to synchronize on
* @param poolName a human-readable name for this pool, for use in log messages
* @param memoryMode the type of memory tracked by this pool (on- or off-heap)
*/
private[memory] class ExecutionMemoryPool(
lock: Object,
poolName: String
memoryMode: MemoryMode
) extends MemoryPool(lock) with Logging {

private[this] val poolName: String = memoryMode match {
case MemoryMode.ON_HEAP => "on-heap execution"
case MemoryMode.OFF_HEAP => "off-heap execution"
}

/**
* Map from taskAttemptId -> memory consumption in bytes
*/
Expand Down
46 changes: 30 additions & 16 deletions core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,42 +36,52 @@ import org.apache.spark.unsafe.memory.MemoryAllocator
private[spark] abstract class MemoryManager(
conf: SparkConf,
numCores: Int,
storageMemory: Long,
onHeapStorageMemory: Long,
onHeapExecutionMemory: Long) extends Logging {

// -- Methods related to memory allocation policies and bookkeeping ------------------------------

@GuardedBy("this")
protected val storageMemoryPool = new StorageMemoryPool(this)
protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP)
@GuardedBy("this")
protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, "on-heap execution")
protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP)
@GuardedBy("this")
protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, "off-heap execution")
protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP)
@GuardedBy("this")
protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)

storageMemoryPool.incrementPoolSize(storageMemory)
onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory)
onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)
offHeapExecutionMemoryPool.incrementPoolSize(conf.getSizeAsBytes("spark.memory.offHeap.size", 0))

protected[this] val maxOffHeapMemory = conf.getSizeAsBytes("spark.memory.offHeap.size", 0)
protected[this] val offHeapStorageMemory =
(maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong

offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - offHeapStorageMemory)
offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)

/**
* Total available memory for storage, in bytes. This amount can vary over time, depending on
* the MemoryManager implementation.
* In this model, this is equivalent to the amount of memory not occupied by execution.
*/
def maxStorageMemory: Long
def maxOnHeapStorageMemory: Long

/**
* Set the [[MemoryStore]] used by this manager to evict cached blocks.
* This must be set after construction due to initialization ordering constraints.
*/
final def setMemoryStore(store: MemoryStore): Unit = synchronized {
storageMemoryPool.setMemoryStore(store)
onHeapStorageMemoryPool.setMemoryStore(store)
offHeapStorageMemoryPool.setMemoryStore(store)
}

/**
* Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
*
* @return whether all N bytes were successfully granted.
*/
def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean
def acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean

/**
* Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary.
Expand All @@ -82,7 +92,7 @@ private[spark] abstract class MemoryManager(
*
* @return whether all N bytes were successfully granted.
*/
def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean
def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean

/**
* Try to acquire up to `numBytes` of execution memory for the current task and return the
Expand Down Expand Up @@ -126,22 +136,26 @@ private[spark] abstract class MemoryManager(
/**
* Release N bytes of storage memory.
*/
def releaseStorageMemory(numBytes: Long): Unit = synchronized {
storageMemoryPool.releaseMemory(numBytes)
def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {
memoryMode match {
case MemoryMode.ON_HEAP => onHeapStorageMemoryPool.releaseMemory(numBytes)
case MemoryMode.OFF_HEAP => offHeapStorageMemoryPool.releaseMemory(numBytes)
}
}

/**
* Release all storage memory acquired.
*/
final def releaseAllStorageMemory(): Unit = synchronized {
storageMemoryPool.releaseAllMemory()
onHeapStorageMemoryPool.releaseAllMemory()
offHeapStorageMemoryPool.releaseAllMemory()
}

/**
* Release N bytes of unroll memory.
*/
final def releaseUnrollMemory(numBytes: Long): Unit = synchronized {
releaseStorageMemory(numBytes)
final def releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {
releaseStorageMemory(numBytes, memoryMode)
}

/**
Expand All @@ -155,7 +169,7 @@ private[spark] abstract class MemoryManager(
* Storage memory currently in use, in bytes.
*/
final def storageMemoryUsed: Long = synchronized {
storageMemoryPool.memoryUsed
onHeapStorageMemoryPool.memoryUsed + offHeapStorageMemoryPool.memoryUsed
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ import org.apache.spark.storage.BlockId
private[spark] class StaticMemoryManager(
conf: SparkConf,
maxOnHeapExecutionMemory: Long,
override val maxStorageMemory: Long,
override val maxOnHeapStorageMemory: Long,
numCores: Int)
extends MemoryManager(
conf,
numCores,
maxStorageMemory,
maxOnHeapStorageMemory,
maxOnHeapExecutionMemory) {

def this(conf: SparkConf, numCores: Int) {
Expand All @@ -46,33 +46,47 @@ private[spark] class StaticMemoryManager(
numCores)
}

// The StaticMemoryManager does not support off-heap storage memory:
offHeapExecutionMemoryPool.incrementPoolSize(offHeapStorageMemoryPool.poolSize)
offHeapStorageMemoryPool.decrementPoolSize(offHeapStorageMemoryPool.poolSize)

// Max number of bytes worth of blocks to evict when unrolling
private val maxUnrollMemory: Long = {
(maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
(maxOnHeapStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
}

override def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized {
if (numBytes > maxStorageMemory) {
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
memoryMode: MemoryMode): Boolean = synchronized {
require(memoryMode != MemoryMode.OFF_HEAP,
"StaticMemoryManager does not support off-heap storage memory")
if (numBytes > maxOnHeapStorageMemory) {
// Fail fast if the block simply won't fit
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
s"memory limit ($maxStorageMemory bytes)")
s"memory limit ($maxOnHeapStorageMemory bytes)")
false
} else {
storageMemoryPool.acquireMemory(blockId, numBytes)
onHeapStorageMemoryPool.acquireMemory(blockId, numBytes)
}
}

override def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized {
val currentUnrollMemory = storageMemoryPool.memoryStore.currentUnrollMemory
val freeMemory = storageMemoryPool.memoryFree
override def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
memoryMode: MemoryMode): Boolean = synchronized {
require(memoryMode != MemoryMode.OFF_HEAP,
"StaticMemoryManager does not support off-heap unroll memory")
val currentUnrollMemory = onHeapStorageMemoryPool.memoryStore.currentUnrollMemory
val freeMemory = onHeapStorageMemoryPool.memoryFree
// When unrolling, we will use all of the existing free memory, and, if necessary,
// some extra space freed from evicting cached blocks. We must place a cap on the
// amount of memory to be evicted by unrolling, however, otherwise unrolling one
// big block can blow away the entire cache.
val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory)
// Keep it within the range 0 <= X <= maxNumBytesToFree
val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory))
storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
}

private[memory]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@ import org.apache.spark.storage.memory.MemoryStore
* (caching).
*
* @param lock a [[MemoryManager]] instance to synchronize on
* @param memoryMode the type of memory tracked by this pool (on- or off-heap)
*/
private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) with Logging {
private[memory] class StorageMemoryPool(
lock: Object,
memoryMode: MemoryMode
) extends MemoryPool(lock) with Logging {

@GuardedBy("lock")
private[this] var _memoryUsed: Long = 0L
Expand Down Expand Up @@ -79,7 +83,8 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
assert(numBytesToAcquire >= 0)
assert(numBytesToFree >= 0)
assert(memoryUsed <= poolSize)
if (numBytesToFree > 0) {
// Once we support off-heap caching, this will need to change:
if (numBytesToFree > 0 && memoryMode == MemoryMode.ON_HEAP) {
memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree)
}
// NOTE: If the memory store evicts blocks, then those evictions will synchronously call
Expand Down Expand Up @@ -117,7 +122,14 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w
val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
if (remainingSpaceToFree > 0) {
// If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
val spaceFreedByEviction = memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree)
val spaceFreedByEviction = {
// Once we support off-heap caching, this will need to change:
if (memoryMode == MemoryMode.ON_HEAP) {
memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree)
} else {
0
}
}
// When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
// not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
decrementPoolSize(spaceFreedByEviction)
Expand Down
Loading