Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -246,18 +246,18 @@ private[spark] class MemoryStore(
val amountToRequest = size - unrollMemoryUsedByThisBlock
keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
if (keepUnrolling) {
unrollMemoryUsedByThisBlock += amountToRequest
unrollMemoryUsedByThisBlock = size
}
} else if (size < unrollMemoryUsedByThisBlock) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#19285 miss this check before? Oops! This could waste storage memory. Thanks for adding this.
Also, cc @cloud-fan @ConeyLiu

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #19285, we first release unrollMemoryUsedByThisBlock unroll memory, and then we request entry.size storage memory. So, there is no waste of resources here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no problem before,because it releases all unroll memory in 257 line

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see, glad to hear that.

releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock - size)
unrollMemoryUsedByThisBlock = size
}

if (keepUnrolling) {
val entry = entryBuilder.build()
// Synchronize so that transfer is atomic
memoryManager.synchronized {
releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock)
val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode)
assert(success, "transferring unroll memory to storage memory failed")
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a concept difference between unroll memory and storage memory. And it seems reasonable to transfer unroll memory to storage memory from the view of their concept, though, it makes no big difference underlying storage memory. I'm not sure will this change cause some side effect, if none, I'll approve of it.

// In fact, unroll memory is also storage memory, it is unnecessary to
// release unroll memory really
releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock, false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you check other calls on this function? Are you sure all other calls' third param default to be true, except here only?

Copy link
Contributor Author

@10110346 10110346 Feb 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sure all other calls' third param default to be true

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it seems a little awkward. How about change transferUnrollToStorage()'s(which is removed after #19285, see L242 ) behavior , rather than releaseUnrollMemoryForThisTask ?


entries.synchronized {
entries.put(blockId, entry)
Expand Down Expand Up @@ -565,7 +565,8 @@ private[spark] class MemoryStore(
* Release memory used by this task for unrolling blocks.
* If the amount is not specified, remove the current task's allocation altogether.
*/
def releaseUnrollMemoryForThisTask(memoryMode: MemoryMode, memory: Long = Long.MaxValue): Unit = {
def releaseUnrollMemoryForThisTask(memoryMode: MemoryMode, memory: Long = Long.MaxValue,
releaseMemoryReally: Boolean = true): Unit = {
val taskAttemptId = currentTaskAttemptId()
memoryManager.synchronized {
val unrollMemoryMap = memoryMode match {
Expand All @@ -576,7 +577,9 @@ private[spark] class MemoryStore(
val memoryToRelease = math.min(memory, unrollMemoryMap(taskAttemptId))
if (memoryToRelease > 0) {
unrollMemoryMap(taskAttemptId) -= memoryToRelease
memoryManager.releaseUnrollMemory(memoryToRelease, memoryMode)
if (releaseMemoryReally) {
memoryManager.releaseUnrollMemory(memoryToRelease, memoryMode)
}
}
if (unrollMemoryMap(taskAttemptId) == 0) {
unrollMemoryMap.remove(taskAttemptId)
Expand Down