@@ -387,6 +387,15 @@ class ExternalAppendOnlyMap[K, V, C](
387387 private var batchIndex = 0 // Which batch we're in
388388 private var fileStream : FileInputStream = null
389389
390+ @ volatile private var closed = false
391+
392+ // A volatile variable to remember which DeserializationStream is using. Need to set it when we
393+ // open a DeserializationStream. But we should use `deserializeStream` rather than
394+ // `deserializeStreamToBeClosed` to read the content because touching a volatile variable will
395+ // reduce the performance. It must be volatile so that we can see its correct value in the
396+ // `finalize` method, which could run in any thread.
397+ @ volatile private var deserializeStreamToBeClosed : DeserializationStream = null
398+
390399 // An intermediate stream that reads from exactly one batch
391400 // This guards against pre-fetching and other arbitrary behavior of higher level streams
392401 private var deserializeStream = nextBatchStream()
@@ -401,6 +410,7 @@ class ExternalAppendOnlyMap[K, V, C](
401410 // we're still in a valid batch.
402411 if (batchIndex < batchOffsets.length - 1 ) {
403412 if (deserializeStream != null ) {
413+ deserializeStreamToBeClosed = null
404414 deserializeStream.close()
405415 fileStream.close()
406416 deserializeStream = null
@@ -419,7 +429,11 @@ class ExternalAppendOnlyMap[K, V, C](
419429
420430 val bufferedStream = new BufferedInputStream (ByteStreams .limit(fileStream, end - start))
421431 val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream)
422- ser.deserializeStream(compressedStream)
432+ // Before returning the stream, assign it to `deserializeStreamToBeClosed` so that we can
433+ // close it in `finalize` and also avoid to touch the volatile `deserializeStreamToBeClosed`
434+ // during reading the (K, C) pairs.
435+ deserializeStreamToBeClosed = ser.deserializeStream(compressedStream)
436+ deserializeStreamToBeClosed
423437 } else {
424438 // No more batches left
425439 cleanup()
@@ -468,14 +482,34 @@ class ExternalAppendOnlyMap[K, V, C](
468482 item
469483 }
470484
471- // TODO: Ensure this gets called even if the iterator isn't drained.
472- private def cleanup () {
473- batchIndex = batchOffsets.length // Prevent reading any other batch
474- val ds = deserializeStream
475- deserializeStream = null
476- fileStream = null
477- ds.close()
478- file.delete()
485+ // TODO: Now only use `finalize` to ensure `close` gets called to clean up the resources. In the
486+ // future, we need some mechanism to ensure this gets called once the resources are not used.
487+ private def cleanup (): Unit = {
488+ if (! closed) {
489+ closed = true
490+ batchIndex = batchOffsets.length // Prevent reading any other batch
491+ fileStream = null
492+ try {
493+ val ds = deserializeStreamToBeClosed
494+ deserializeStreamToBeClosed = null
495+ deserializeStream = null
496+ if (ds != null ) {
497+ ds.close()
498+ }
499+ } finally {
500+ if (file.exists()) {
501+ file.delete()
502+ }
503+ }
504+ }
505+ }
506+
507+ override def finalize (): Unit = {
508+ try {
509+ cleanup()
510+ } finally {
511+ super .finalize()
512+ }
479513 }
480514 }
481515
0 commit comments