Skip to content

Commit a447cd8

Browse files
Xing SHIJoshRosen
authored andcommitted
[SPARK-17465][SPARK CORE] Inappropriate memory management in org.apache.spark.storage.MemoryStore may lead to memory leak
## What changes were proposed in this pull request? The expression like `if (memoryMap(taskAttemptId) == 0) memoryMap.remove(taskAttemptId)` in method `releaseUnrollMemoryForThisTask` and `releasePendingUnrollMemoryForThisTask` should be called after release memory operation, whatever `memoryToRelease` is > 0 or not. If the memory of a task has been set to 0 when calling a `releaseUnrollMemoryForThisTask` or a `releasePendingUnrollMemoryForThisTask` method, the key in the memory map corresponding to that task will never be removed from the hash map. See the details in [SPARK-17465](https://issues.apache.org/jira/browse/SPARK-17465). Author: Xing SHI <[email protected]> Closes #15022 from saturday-shi/SPARK-17465.
1 parent bf3f6d2 commit a447cd8

File tree

2 files changed

+7
-6
lines changed

2 files changed

+7
-6
lines changed

core/src/main/scala/org/apache/spark/scheduler/Task.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ private[spark] abstract class Task[T](
104104
Utils.tryLogNonFatalError {
105105
// Release memory used by this thread for unrolling blocks
106106
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask()
107+
SparkEnv.get.blockManager.memoryStore.releasePendingUnrollMemoryForThisTask()
107108
// Notify any tasks waiting for execution memory to be freed to wake up and try to
108109
// acquire memory again. This makes impossible the scenario where a task sleeps forever
109110
// because there are no other tasks left to notify it. Since this is safe to do but may

core/src/main/scala/org/apache/spark/storage/MemoryStore.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -511,11 +511,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
511511
val memoryToRelease = math.min(memory, unrollMemoryMap(taskAttemptId))
512512
if (memoryToRelease > 0) {
513513
unrollMemoryMap(taskAttemptId) -= memoryToRelease
514-
if (unrollMemoryMap(taskAttemptId) == 0) {
515-
unrollMemoryMap.remove(taskAttemptId)
516-
}
517514
memoryManager.releaseUnrollMemory(memoryToRelease)
518515
}
516+
if (unrollMemoryMap(taskAttemptId) == 0) {
517+
unrollMemoryMap.remove(taskAttemptId)
518+
}
519519
}
520520
}
521521
}
@@ -530,11 +530,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
530530
val memoryToRelease = math.min(memory, pendingUnrollMemoryMap(taskAttemptId))
531531
if (memoryToRelease > 0) {
532532
pendingUnrollMemoryMap(taskAttemptId) -= memoryToRelease
533-
if (pendingUnrollMemoryMap(taskAttemptId) == 0) {
534-
pendingUnrollMemoryMap.remove(taskAttemptId)
535-
}
536533
memoryManager.releaseUnrollMemory(memoryToRelease)
537534
}
535+
if (pendingUnrollMemoryMap(taskAttemptId) == 0) {
536+
pendingUnrollMemoryMap.remove(taskAttemptId)
537+
}
538538
}
539539
}
540540
}

0 commit comments

Comments
 (0)