Skip to content

Commit c7c8832

Browse files
committed
Simplify logic + update a few comments
1 parent 269d07b commit c7c8832

File tree

2 files changed

+27
-30
lines changed

2 files changed

+27
-30
lines changed

core/src/main/scala/org/apache/spark/storage/MemoryStore.scala

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -222,23 +222,13 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
222222
var memoryThreshold = initialMemoryThreshold
223223
// Memory to request as a multiple of current vector size
224224
val memoryGrowthFactor = 1.5
225-
// Previous unroll memory held by this thread, for releasing later
225+
// Previous unroll memory held by this thread, for releasing later (only at the very end)
226226
val previousMemoryReserved = currentUnrollMemoryForThisThread
227227
// Underlying vector for unrolling the block
228228
var vector = new SizeTrackingVector[Any]
229229

230-
// Request additional memory for our vector and return whether the request is granted
231-
// This involves synchronizing across all threads, which is expensive if called frequently
232-
def requestMemory(memoryToRequest: Long): Boolean = {
233-
accountingLock.synchronized {
234-
val granted = freeMemory > currentUnrollMemory + memoryToRequest
235-
if (granted) { reserveUnrollMemoryForThisThread(memoryToRequest) }
236-
granted
237-
}
238-
}
239-
240230
// Request enough memory to begin unrolling
241-
keepUnrolling = requestMemory(initialMemoryThreshold)
231+
keepUnrolling = reserveUnrollMemoryForThisThread(initialMemoryThreshold)
242232

243233
// Unroll this block safely, checking whether we have exceeded our threshold periodically
244234
try {
@@ -249,18 +239,18 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
249239
val currentSize = vector.estimateSize()
250240
if (currentSize >= memoryThreshold) {
251241
val amountToRequest = (currentSize * (memoryGrowthFactor - 1)).toLong
252-
// Hold the put lock, in case another thread concurrently puts a block that takes
253-
// up the unrolling space we just ensured here
242+
// Hold the accounting lock, in case another thread concurrently puts a block that
243+
// takes up the unrolling space we just ensured here
254244
accountingLock.synchronized {
255-
if (!requestMemory(amountToRequest)) {
245+
if (!reserveUnrollMemoryForThisThread(amountToRequest)) {
256246
// If the first request is not granted, try again after ensuring free space
257247
// If there is still not enough space, give up and drop the partition
258248
val spaceToEnsure = maxUnrollMemory - currentUnrollMemory
259249
if (spaceToEnsure > 0) {
260250
val result = ensureFreeSpace(blockId, spaceToEnsure)
261251
droppedBlocks ++= result.droppedBlocks
262252
}
263-
keepUnrolling = requestMemory(amountToRequest)
253+
keepUnrolling = reserveUnrollMemoryForThisThread(amountToRequest)
264254
}
265255
}
266256
// New threshold is currentSize * memoryGrowthFactor
@@ -283,7 +273,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
283273
// we can immediately free up space for other threads. Otherwise, if we return an iterator,
284274
// we release the memory claimed by this thread later on when the task finishes.
285275
if (keepUnrolling) {
286-
vector = null
287276
val amountToRelease = currentUnrollMemoryForThisThread - previousMemoryReserved
288277
releaseUnrollMemoryForThisThread(amountToRelease)
289278
}
@@ -433,11 +422,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
433422

434423
/**
435424
* Reserve additional memory for unrolling blocks used by this thread.
425+
* Return whether the request is granted.
436426
*/
437-
private[spark] def reserveUnrollMemoryForThisThread(memory: Long): Unit = {
438-
val threadId = Thread.currentThread().getId
427+
private[spark] def reserveUnrollMemoryForThisThread(memory: Long): Boolean = {
439428
accountingLock.synchronized {
440-
unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, 0L) + memory
429+
val granted = freeMemory > currentUnrollMemory + memory
430+
if (granted) {
431+
val threadId = Thread.currentThread().getId
432+
unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, 0L) + memory
433+
}
434+
granted
441435
}
442436
}
443437

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1005,21 +1005,24 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
10051005
assert(memoryStore.currentUnrollMemoryForThisThread === 0)
10061006

10071007
// Reserve
1008-
memoryStore.reserveUnrollMemoryForThisThread(1480)
1009-
assert(memoryStore.currentUnrollMemoryForThisThread === 1480)
1008+
memoryStore.reserveUnrollMemoryForThisThread(100)
1009+
assert(memoryStore.currentUnrollMemoryForThisThread === 100)
1010+
memoryStore.reserveUnrollMemoryForThisThread(200)
1011+
assert(memoryStore.currentUnrollMemoryForThisThread === 300)
1012+
memoryStore.reserveUnrollMemoryForThisThread(500)
1013+
assert(memoryStore.currentUnrollMemoryForThisThread === 800)
10101014
memoryStore.reserveUnrollMemoryForThisThread(1000000)
1011-
assert(memoryStore.currentUnrollMemoryForThisThread === 1001480)
1012-
1015+
assert(memoryStore.currentUnrollMemoryForThisThread === 800) // not granted
10131016
// Release
1014-
memoryStore.releaseUnrollMemoryForThisThread(1000000)
1015-
assert(memoryStore.currentUnrollMemoryForThisThread === 1480)
10161017
memoryStore.releaseUnrollMemoryForThisThread(100)
1017-
assert(memoryStore.currentUnrollMemoryForThisThread === 1380)
1018-
1018+
assert(memoryStore.currentUnrollMemoryForThisThread === 700)
1019+
memoryStore.releaseUnrollMemoryForThisThread(100)
1020+
assert(memoryStore.currentUnrollMemoryForThisThread === 600)
10191021
// Reserve again
1020-
memoryStore.reserveUnrollMemoryForThisThread(3620)
1022+
memoryStore.reserveUnrollMemoryForThisThread(4400)
10211023
assert(memoryStore.currentUnrollMemoryForThisThread === 5000)
1022-
1024+
memoryStore.reserveUnrollMemoryForThisThread(20000)
1025+
assert(memoryStore.currentUnrollMemoryForThisThread === 5000) // not granted
10231026
// Release again
10241027
memoryStore.releaseUnrollMemoryForThisThread(1000)
10251028
assert(memoryStore.currentUnrollMemoryForThisThread === 4000)

0 commit comments

Comments
 (0)