@@ -434,6 +434,15 @@ class ExternalAppendOnlyMap[K, V, C](
434434 private var batchIndex = 0 // Which batch we're in
435435 private var fileStream : FileInputStream = null
436436
437+ @ volatile private var closed = false
438+
439+ // A volatile variable to remember which DeserializationStream is using. Need to set it when we
440+ // open a DeserializationStream. But we should use `deserializeStream` rather than
441+ // `deserializeStreamToBeClosed` to read the content because touching a volatile variable will
442+ // reduce the performance. It must be volatile so that we can see its correct value in the
443+ // `finalize` method, which could run in any thread.
444+ @ volatile private var deserializeStreamToBeClosed : DeserializationStream = null
445+
437446 // An intermediate stream that reads from exactly one batch
438447 // This guards against pre-fetching and other arbitrary behavior of higher level streams
439448 private var deserializeStream = nextBatchStream()
@@ -448,6 +457,7 @@ class ExternalAppendOnlyMap[K, V, C](
448457 // we're still in a valid batch.
449458 if (batchIndex < batchOffsets.length - 1 ) {
450459 if (deserializeStream != null ) {
460+ deserializeStreamToBeClosed = null
451461 deserializeStream.close()
452462 fileStream.close()
453463 deserializeStream = null
@@ -466,7 +476,11 @@ class ExternalAppendOnlyMap[K, V, C](
466476
467477 val bufferedStream = new BufferedInputStream (ByteStreams .limit(fileStream, end - start))
468478 val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream)
469- ser.deserializeStream(compressedStream)
479+ // Before returning the stream, assign it to `deserializeStreamToBeClosed` so that we can
480+ // close it in `finalize` and also avoid to touch the volatile `deserializeStreamToBeClosed`
481+ // during reading the (K, C) pairs.
482+ deserializeStreamToBeClosed = ser.deserializeStream(compressedStream)
483+ deserializeStreamToBeClosed
470484 } else {
471485 // No more batches left
472486 cleanup()
@@ -515,14 +529,34 @@ class ExternalAppendOnlyMap[K, V, C](
515529 item
516530 }
517531
518- // TODO: Ensure this gets called even if the iterator isn't drained.
519- private def cleanup () {
520- batchIndex = batchOffsets.length // Prevent reading any other batch
521- val ds = deserializeStream
522- deserializeStream = null
523- fileStream = null
524- ds.close()
525- file.delete()
532+ // TODO: Now only use `finalize` to ensure `close` gets called to clean up the resources. In the
533+ // future, we need some mechanism to ensure this gets called once the resources are not used.
534+ private def cleanup (): Unit = {
535+ if (! closed) {
536+ closed = true
537+ batchIndex = batchOffsets.length // Prevent reading any other batch
538+ fileStream = null
539+ try {
540+ val ds = deserializeStreamToBeClosed
541+ deserializeStreamToBeClosed = null
542+ deserializeStream = null
543+ if (ds != null ) {
544+ ds.close()
545+ }
546+ } finally {
547+ if (file.exists()) {
548+ file.delete()
549+ }
550+ }
551+ }
552+ }
553+
554+ override def finalize (): Unit = {
555+ try {
556+ cleanup()
557+ } finally {
558+ super .finalize()
559+ }
526560 }
527561 }
528562
0 commit comments