Skip to content

Commit 2592111

Browse files
andrewor14mateiz
authored andcommitted
[SPARK-2609] Log thread ID when spilling ExternalAppendOnlyMap
It's useful to know whether one thread is constantly spilling or multiple threads are spilling relatively infrequently. Right now everything looks a little jumbled and we can't tell which lines belong to the same thread. For instance: ``` 06:14:37 ExternalAppendOnlyMap: Spilling in-memory map of 4 MB to disk (194 times so far) 06:14:37 ExternalAppendOnlyMap: Spilling in-memory map of 4 MB to disk (198 times so far) 06:14:37 ExternalAppendOnlyMap: Spilling in-memory map of 4 MB to disk (198 times so far) 06:14:37 ExternalAppendOnlyMap: Spilling in-memory map of 10 MB to disk (197 times so far) 06:14:38 ExternalAppendOnlyMap: Spilling in-memory map of 9 MB to disk (45 times so far) 06:14:38 ExternalAppendOnlyMap: Spilling in-memory map of 23 MB to disk (198 times so far) 06:14:38 ExternalAppendOnlyMap: Spilling in-memory map of 38 MB to disk (25 times so far) 06:14:38 ExternalAppendOnlyMap: Spilling in-memory map of 161 MB to disk (25 times so far) 06:14:39 ExternalAppendOnlyMap: Spilling in-memory map of 0 MB to disk (199 times so far) 06:14:39 ExternalAppendOnlyMap: Spilling in-memory map of 4 MB to disk (166 times so far) 06:14:39 ExternalAppendOnlyMap: Spilling in-memory map of 4 MB to disk (199 times so far) 06:14:39 ExternalAppendOnlyMap: Spilling in-memory map of 4 MB to disk (200 times so far) ``` Author: Andrew Or <[email protected]> Closes #1517 from andrewor14/external-log and squashes the following commits: 90e48bb [Andrew Or] Log thread ID when spilling
1 parent 4c7243e commit 2592111

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed

core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ class ExternalAppendOnlyMap[K, V, C](
106106
private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
107107
private val keyComparator = new HashComparator[K]
108108
private val ser = serializer.newInstance()
109+
private val threadId = Thread.currentThread().getId
109110

110111
/**
111112
* Insert the given key and value into the map.
@@ -128,7 +129,6 @@ class ExternalAppendOnlyMap[K, V, C](
128129
// Atomically check whether there is sufficient memory in the global pool for
129130
// this map to grow and, if possible, allocate the required amount
130131
shuffleMemoryMap.synchronized {
131-
val threadId = Thread.currentThread().getId
132132
val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId)
133133
val availableMemory = maxMemoryThreshold -
134134
(shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L))
@@ -153,8 +153,8 @@ class ExternalAppendOnlyMap[K, V, C](
153153
*/
154154
private def spill(mapSize: Long) {
155155
spillCount += 1
156-
logWarning("Spilling in-memory map of %d MB to disk (%d time%s so far)"
157-
.format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
156+
logWarning("Thread %d spilling in-memory map of %d MB to disk (%d time%s so far)"
157+
.format(threadId, mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
158158
val (blockId, file) = diskBlockManager.createTempBlock()
159159
var writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize)
160160
var objectsWritten = 0

0 commit comments

Comments
 (0)