Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -385,4 +385,19 @@ package object config {
.checkValue(v => v > 0 && v <= Int.MaxValue,
s"The buffer size must be greater than 0 and less than ${Int.MaxValue}.")
.createWithDefault(1024 * 1024)

private[spark] val UNROLL_MEMORY_CHECK_PERIOD =
ConfigBuilder("spark.storage.unrollMemoryCheckPeriod")
.internal()
.doc("The memory check period is used to determine how often we should check whether "
+ "there is a need to request more memory when we try to unroll the given block in memory.")
.longConf
.createWithDefault(16)

private[spark] val UNROLL_MEMORY_GROWTH_FACTOR =
ConfigBuilder("spark.storage.unrollMemoryGrowthFactor")
.internal()
.doc("Memory to request as a multiple of the size that used to unroll the block.")
.doubleConf
.createWithDefault(1.5)
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.google.common.io.ByteStreams

import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{UNROLL_MEMORY_CHECK_PERIOD, UNROLL_MEMORY_GROWTH_FACTOR}
import org.apache.spark.memory.{MemoryManager, MemoryMode}
import org.apache.spark.serializer.{SerializationStream, SerializerManager}
import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel, StreamBlockId}
Expand Down Expand Up @@ -190,11 +191,11 @@ private[spark] class MemoryStore(
// Initial per-task memory to request for unrolling blocks (bytes).
val initialMemoryThreshold = unrollMemoryThreshold
// How often to check whether we need to request more memory
val memoryCheckPeriod = 16
val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD)
// Memory currently reserved by this task for this particular unrolling operation
var memoryThreshold = initialMemoryThreshold
// Memory to request as a multiple of current vector size
val memoryGrowthFactor = 1.5
val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)
// Keep track of unroll memory used by this particular block / putIterator() operation
var unrollMemoryUsedByThisBlock = 0L
// Underlying vector for unrolling the block
Expand Down Expand Up @@ -325,6 +326,12 @@ private[spark] class MemoryStore(

// Whether there is still enough memory for us to continue unrolling this block
var keepUnrolling = true
// Number of elements unrolled so far
var elementsUnrolled = 0L
// How often to check whether we need to request more memory
val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD)
// Memory to request as a multiple of current bbos size
val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)
// Initial per-task memory to request for unrolling blocks (bytes).
val initialMemoryThreshold = unrollMemoryThreshold
// Keep track of unroll memory used by this particular block / putIterator() operation
Expand Down Expand Up @@ -359,7 +366,7 @@ private[spark] class MemoryStore(

def reserveAdditionalMemoryIfNecessary(): Unit = {
if (bbos.size > unrollMemoryUsedByThisBlock) {
val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock
val amountToRequest = (bbos.size * memoryGrowthFactor - unrollMemoryUsedByThisBlock).toLong
keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
if (keepUnrolling) {
unrollMemoryUsedByThisBlock += amountToRequest
Expand All @@ -370,7 +377,10 @@ private[spark] class MemoryStore(
// Unroll this block safely, checking whether we have exceeded our threshold
while (values.hasNext && keepUnrolling) {
serializationStream.writeObject(values.next())(classTag)
reserveAdditionalMemoryIfNecessary()
elementsUnrolled += 1
if (elementsUnrolled % memoryCheckPeriod == 0) {
reserveAdditionalMemoryIfNecessary()
}
}

// Make sure that we have enough memory to store the block. By this point, it is possible that
Expand Down