Skip to content

Commit 45b4bbf

Browse files
caneGuycloud-fan
authored andcommitted
[SPARK-23129][CORE] Make deserializeStream of DiskMapIterator init lazily
## What changes were proposed in this pull request? Currently,the deserializeStream in ExternalAppendOnlyMap#DiskMapIterator init when DiskMapIterator instance created.This will cause memory use overhead when ExternalAppendOnlyMap spill too much times. We can avoid this by making deserializeStream init when it is used the first time. This patch make deserializeStream init lazily. ## How was this patch tested? Exist tests Author: zhoukang <[email protected]> Closes #20292 from caneGuy/zhoukang/lay-diskmapiterator.
1 parent 6f0ba84 commit 45b4bbf

File tree

1 file changed

+10
-7
lines changed

1 file changed

+10
-7
lines changed

core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,7 @@ class ExternalAppendOnlyMap[K, V, C](
463463

464464
// An intermediate stream that reads from exactly one batch
465465
// This guards against pre-fetching and other arbitrary behavior of higher level streams
466-
private var deserializeStream = nextBatchStream()
466+
private var deserializeStream: DeserializationStream = null
467467
private var nextItem: (K, C) = null
468468
private var objectsRead = 0
469469

@@ -528,27 +528,30 @@ class ExternalAppendOnlyMap[K, V, C](
528528
override def hasNext: Boolean = {
529529
if (nextItem == null) {
530530
if (deserializeStream == null) {
531-
return false
531+
// In case of deserializeStream has not been initialized
532+
deserializeStream = nextBatchStream()
533+
if (deserializeStream == null) {
534+
return false
535+
}
532536
}
533537
nextItem = readNextItem()
534538
}
535539
nextItem != null
536540
}
537541

538542
override def next(): (K, C) = {
539-
val item = if (nextItem == null) readNextItem() else nextItem
540-
if (item == null) {
543+
if (!hasNext) {
541544
throw new NoSuchElementException
542545
}
546+
val item = nextItem
543547
nextItem = null
544548
item
545549
}
546550

547551
private def cleanup() {
548552
batchIndex = batchOffsets.length // Prevent reading any other batch
549-
val ds = deserializeStream
550-
if (ds != null) {
551-
ds.close()
553+
if (deserializeStream != null) {
554+
deserializeStream.close()
552555
deserializeStream = null
553556
}
554557
if (fileStream != null) {

0 commit comments

Comments
 (0)