Skip to content

Commit 5207fb4

Browse files
author
Adam Budde
committed
[SPARK-13122] Fix race condition in MemoryStore.unrollSafely()
https://issues.apache.org/jira/browse/SPARK-13122 A race condition can occur in MemoryStore's unrollSafely() method if two threads that return the same value for currentTaskAttemptId() execute this method concurrently. This change makes the operation of reading the initial amount of unroll memory used, performing the unroll, and updating the associated memory maps atomic in order to avoid this race condition.
1 parent 0df3cfb commit 5207fb4

File tree

1 file changed

+9
-5
lines changed

1 file changed

+9
-5
lines changed

core/src/main/scala/org/apache/spark/storage/MemoryStore.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
255255
var memoryThreshold = initialMemoryThreshold
256256
// Memory to request as a multiple of current vector size
257257
val memoryGrowthFactor = 1.5
258-
// Previous unroll memory held by this task, for releasing later (only at the very end)
259-
val previousMemoryReserved = currentUnrollMemoryForThisTask
258+
// Keep track of pending unroll memory reserved by this method.
259+
var pendingMemoryReserved = 0L
260260
// Underlying vector for unrolling the block
261261
var vector = new SizeTrackingVector[Any]
262262

@@ -266,6 +266,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
266266
if (!keepUnrolling) {
267267
logWarning(s"Failed to reserve initial memory threshold of " +
268268
s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
269+
} else {
270+
pendingMemoryReserved += initialMemoryThreshold
269271
}
270272

271273
// Unroll this block safely, checking whether we have exceeded our threshold periodically
@@ -278,6 +280,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
278280
if (currentSize >= memoryThreshold) {
279281
val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
280282
keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
283+
if (keepUnrolling) {
284+
pendingMemoryReserved += amountToRequest
285+
}
281286
// New threshold is currentSize * memoryGrowthFactor
282287
memoryThreshold += amountToRequest
283288
}
@@ -304,10 +309,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
304309
// release the unroll memory yet. Instead, we transfer it to pending unroll memory
305310
// so `tryToPut` can further transfer it to normal storage memory later.
306311
// TODO: we can probably express this without pending unroll memory (SPARK-10907)
307-
val amountToTransferToPending = currentUnrollMemoryForThisTask - previousMemoryReserved
308-
unrollMemoryMap(taskAttemptId) -= amountToTransferToPending
312+
unrollMemoryMap(taskAttemptId) -= pendingMemoryReserved
309313
pendingUnrollMemoryMap(taskAttemptId) =
310-
pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + amountToTransferToPending
314+
pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + pendingMemoryReserved
311315
}
312316
} else {
313317
// Otherwise, if we return an iterator, we can only release the unroll memory when

0 commit comments

Comments
 (0)