@@ -44,6 +44,15 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
4444 // dropping blocks from the memory store.
4545 private val putLock = new Object ()
4646
47+ /**
48+ * Mapping from thread ID to memory used for unrolling blocks.
49+ *
50+ * To avoid potential deadlocks, all accesses of this map in MemoryStore are assumed to
51+ * first synchronize on `putLock` and then on `unrollMemoryMap`, in that particular order.
52+ * This is lazy because SparkEnv does not exist when we mock this class in tests.
53+ */
54+ private lazy val unrollMemoryMap = SparkEnv .get.unrollMemoryMap
55+
4756 /**
4857 * The amount of space ensured for unrolling values in memory, shared across all cores.
4958 * This space is not reserved in advance, but allocated dynamically by dropping existing blocks.
@@ -220,19 +229,20 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
220229 val memoryGrowthFactor = 1.5
221230
222231 val threadId = Thread .currentThread().getId
223- val unrollMemoryMap = SparkEnv .get.unrollMemoryMap
224232 var vector = new SizeTrackingVector [Any ]
225233
226- // Request memory for our vector and return whether request is granted.
227- // This involves synchronizing across all threads and can be expensive if called frequently .
234+ // Request memory for our vector and return whether the request is granted. This involves
235+ // synchronizing on putLock and unrollMemoryMap (in that order), which could be expensive .
228236 def requestMemory (memoryToRequest : Long ): Boolean = {
229- unrollMemoryMap.synchronized {
230- val previouslyOccupiedMemory = unrollMemoryMap.get(threadId).getOrElse(0L )
231- val otherThreadsMemory = unrollMemoryMap.values.sum - previouslyOccupiedMemory
232- val availableMemory = freeMemory - otherThreadsMemory
233- val granted = availableMemory > memoryToRequest
234- if (granted) { unrollMemoryMap(threadId) = memoryToRequest }
235- granted
237+ putLock.synchronized {
238+ unrollMemoryMap.synchronized {
239+ val previouslyOccupiedMemory = unrollMemoryMap.get(threadId).getOrElse(0L )
240+ val otherThreadsMemory = unrollMemoryMap.values.sum - previouslyOccupiedMemory
241+ val availableMemory = freeMemory - otherThreadsMemory
242+ val granted = availableMemory > memoryToRequest
243+ if (granted) { unrollMemoryMap(threadId) = memoryToRequest }
244+ granted
245+ }
236246 }
237247 }
238248
@@ -248,18 +258,20 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
248258 val currentSize = vector.estimateSize()
249259 if (currentSize >= memoryThreshold) {
250260 val amountToRequest = (currentSize * memoryGrowthFactor).toLong
251- // Hold the put lock, in case another thread concurrently puts a block that
252- // takes up the free space we just ensured for unrolling here
261+ // Hold the put lock, in case another thread concurrently puts a block that takes
262+ // up the unrolling space we just ensured here
253263 putLock.synchronized {
254- if (! requestMemory(amountToRequest)) {
255- // If the first request is not granted, try again after ensuring free space
256- // If there is still not enough space, give up and drop the partition
257- val extraSpaceNeeded = globalUnrollMemory - unrollMemoryMap.values.sum
258- val result = ensureFreeSpace(blockId, extraSpaceNeeded)
259- droppedBlocks ++= result.droppedBlocks
260- keepUnrolling = requestMemory(amountToRequest)
264+ unrollMemoryMap.synchronized {
265+ if (! requestMemory(amountToRequest)) {
266+ // If the first request is not granted, try again after ensuring free space
267+ // If there is still not enough space, give up and drop the partition
268+ val extraSpaceNeeded = globalUnrollMemory - unrollMemoryMap.values.sum
269+ val result = ensureFreeSpace(blockId, extraSpaceNeeded)
270+ droppedBlocks ++= result.droppedBlocks
271+ keepUnrolling = requestMemory(amountToRequest)
272+ }
273+ memoryThreshold = amountToRequest
261274 }
262- memoryThreshold = amountToRequest
263275 }
264276 }
265277 }
@@ -357,7 +369,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
357369 * from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
358370 * don't fit into memory that we want to avoid).
359371 *
360- * Assume that a lock is held by the caller to ensure only one thread is dropping blocks.
372+ * Assume that `putLock` is held by the caller to ensure only one thread is dropping blocks.
361373 * Otherwise, the freed space may fill up before the caller puts in their new value.
362374 *
363375 * Return whether there is enough free space, along with the blocks dropped in the process.
@@ -375,7 +387,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
375387 }
376388
377389 // Take into account the amount of memory currently occupied by unrolling blocks
378- val unrollMemoryMap = SparkEnv .get.unrollMemoryMap
379390 val freeSpace = unrollMemoryMap.synchronized { freeMemory - unrollMemoryMap.values.sum }
380391
381392 if (freeSpace < space) {
0 commit comments