@@ -463,21 +463,21 @@ class ExternalAppendOnlyMap[K, V, C](
463463
464464 // An intermediate stream that reads from exactly one batch
465465 // This guards against pre-fetching and other arbitrary behavior of higher level streams
466- private var deserializeStream : Option [ DeserializationStream ] = None
466+ private var deserializeStream : DeserializationStream = null
467467 private var nextItem : (K , C ) = null
468468 private var objectsRead = 0
469469
470470 /**
471471 * Construct a stream that reads only from the next batch.
472472 */
473- private def nextBatchStream (): Option [ DeserializationStream ] = {
473+ private def nextBatchStream (): DeserializationStream = {
474474 // Note that batchOffsets.length = numBatches + 1 since we did a scan above; check whether
475475 // we're still in a valid batch.
476476 if (batchIndex < batchOffsets.length - 1 ) {
477- if (deserializeStream.isDefined ) {
478- deserializeStream.get. close()
477+ if (deserializeStream != null ) {
478+ deserializeStream.close()
479479 fileStream.close()
480- deserializeStream = None
480+ deserializeStream = null
481481 fileStream = null
482482 }
483483
@@ -493,11 +493,11 @@ class ExternalAppendOnlyMap[K, V, C](
493493
494494 val bufferedStream = new BufferedInputStream (ByteStreams .limit(fileStream, end - start))
495495 val wrappedStream = serializerManager.wrapStream(blockId, bufferedStream)
496- Some ( ser.deserializeStream(wrappedStream) )
496+ ser.deserializeStream(wrappedStream)
497497 } else {
498498 // No more batches left
499499 cleanup()
500- None
500+ null
501501 }
502502 }
503503
@@ -509,8 +509,8 @@ class ExternalAppendOnlyMap[K, V, C](
509509 */
510510 private def readNextItem (): (K , C ) = {
511511 try {
512- val k = deserializeStream.get. readKey().asInstanceOf [K ]
513- val c = deserializeStream.get. readValue().asInstanceOf [C ]
512+ val k = deserializeStream.readKey().asInstanceOf [K ]
513+ val c = deserializeStream.readValue().asInstanceOf [C ]
514514 val item = (k, c)
515515 objectsRead += 1
516516 if (objectsRead == serializerBatchSize) {
@@ -527,10 +527,10 @@ class ExternalAppendOnlyMap[K, V, C](
527527
528528 override def hasNext : Boolean = {
529529 if (nextItem == null ) {
530- if (deserializeStream.isEmpty ) {
531- // In case that deserializeStream has not been initialized
530+ if (deserializeStream == null ) {
531+ // In case of deserializeStream has not been initialized
532532 deserializeStream = nextBatchStream()
533- if (deserializeStream.isEmpty ) {
533+ if (deserializeStream == null ) {
534534 return false
535535 }
536536 }
@@ -540,8 +540,8 @@ class ExternalAppendOnlyMap[K, V, C](
540540 }
541541
542542 override def next (): (K , C ) = {
543- if (deserializeStream.isEmpty ) {
544- // In case that deserializeStream has not been initialized when call next() directly
543+ if (deserializeStream == null ) {
544+ // In case of deserializeStream has not been initialized when call next() directly
545545 deserializeStream = nextBatchStream()
546546 }
547547 val item = if (nextItem == null ) readNextItem() else nextItem
@@ -554,10 +554,9 @@ class ExternalAppendOnlyMap[K, V, C](
554554
555555 private def cleanup () {
556556 batchIndex = batchOffsets.length // Prevent reading any other batch
557- val ds = deserializeStream
558- if (ds.isDefined) {
559- ds.get.close()
560- deserializeStream = None
557+ if (deserializeStream != null ) {
558+ deserializeStream.close()
559+ deserializeStream = null
561560 }
562561 if (fileStream != null ) {
563562 fileStream.close()
0 commit comments