Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ class ExternalAppendOnlyMap[K, V, C](

// An intermediate stream that reads from exactly one batch
// This guards against pre-fetching and other arbitrary behavior of higher level streams
private var deserializeStream = nextBatchStream()
private var deserializeStream: DeserializationStream = null
private var nextItem: (K, C) = null
private var objectsRead = 0

Expand Down Expand Up @@ -528,27 +528,30 @@ class ExternalAppendOnlyMap[K, V, C](
override def hasNext: Boolean = {
if (nextItem == null) {
if (deserializeStream == null) {
return false
// In case of deserializeStream has not been initialized
deserializeStream = nextBatchStream()
if (deserializeStream == null) {
return false
}
}
nextItem = readNextItem()
}
nextItem != null
}

override def next(): (K, C) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

if (!hasNext) {
  throw new NoSuchElementException
}
val item = nextItem
nextItem = null
item

val item = if (nextItem == null) readNextItem() else nextItem
if (item == null) {
if (!hasNext) {
throw new NoSuchElementException
}
val item = nextItem
nextItem = null
item
}

private def cleanup() {
batchIndex = batchOffsets.length // Prevent reading any other batch
val ds = deserializeStream
if (ds != null) {
ds.close()
if (deserializeStream != null) {
deserializeStream.close()
deserializeStream = null
}
if (fileStream != null) {
Expand Down