@@ -20,6 +20,7 @@ package org.apache.spark.util.collection
2020import java .io .{InputStream , BufferedInputStream , FileInputStream , File , Serializable , EOFException }
2121import java .util .Comparator
2222
23+ import scala .collection .BufferedIterator
2324import scala .collection .mutable
2425import scala .collection .mutable .ArrayBuffer
2526
@@ -231,7 +232,7 @@ class ExternalAppendOnlyMap[K, V, C](
231232 // Input streams are derived both from the in-memory map and spilled maps on disk
232233 // The in-memory map is sorted in place, while the spilled maps are already in sorted order
233234 private val sortedMap = currentMap.destructiveSortedIterator(comparator)
234- private val inputStreams = Seq (sortedMap) ++ spilledMaps
235+ private val inputStreams = ( Seq (sortedMap) ++ spilledMaps).map(it => it.buffered)
235236
236237 inputStreams.foreach { it =>
237238 val kcPairs = getMorePairs(it)
@@ -246,13 +247,13 @@ class ExternalAppendOnlyMap[K, V, C](
246247 * In the event of key hash collisions, this ensures no pairs are hidden from being merged.
247248 * Assume the given iterator is in sorted order.
248249 */
249- private def getMorePairs (it : Iterator [(K , C )]): ArrayBuffer [(K , C )] = {
250+ private def getMorePairs (it : BufferedIterator [(K , C )]): ArrayBuffer [(K , C )] = {
250251 val kcPairs = new ArrayBuffer [(K , C )]
251252 if (it.hasNext) {
252253 var kc = it.next()
253254 kcPairs += kc
254255 val minHash = kc._1.hashCode()
255- while (it.hasNext && kc ._1.hashCode() == minHash) {
256+ while (it.hasNext && it.head ._1.hashCode() == minHash) {
256257 kc = it.next()
257258 kcPairs += kc
258259 }
@@ -325,7 +326,8 @@ class ExternalAppendOnlyMap[K, V, C](
325326 *
326327 * StreamBuffers are ordered by the minimum key hash found across all of their own pairs.
327328 */
328- private case class StreamBuffer (iterator : Iterator [(K , C )], pairs : ArrayBuffer [(K , C )])
329+ private class StreamBuffer (
330+ val iterator : BufferedIterator [(K , C )], val pairs : ArrayBuffer [(K , C )])
329331 extends Comparable [StreamBuffer ] {
330332
331333 def isEmpty = pairs.length == 0
0 commit comments