@@ -252,7 +252,7 @@ class ExternalAppendOnlyMap[K, V, C](
252252 if (it.hasNext) {
253253 var kc = it.next()
254254 kcPairs += kc
255- val minHash = if (kc._1 == null ) nullHashCode else kc._1.hashCode( )
255+ val minHash = getKeyHashCode (kc)
256256 while (it.hasNext && it.head._1.hashCode() == minHash) {
257257 kc = it.next()
258258 kcPairs += kc
@@ -294,9 +294,9 @@ class ExternalAppendOnlyMap[K, V, C](
294294 // Select a key from the StreamBuffer that holds the lowest key hash
295295 val minBuffer = mergeHeap.dequeue()
296296 val (minPairs, minHash) = (minBuffer.pairs, minBuffer.minKeyHash)
297- var (minKey, minCombiner) = minPairs.remove(0 )
298- val actualMinKeyHash = if (minKey == null ) nullHashCode else minKey.hashCode()
299- assert(actualMinKeyHash == minHash)
297+ val minPair = minPairs.remove(0 )
298+ var (minKey, minCombiner) = minPair
299+ assert(getKeyHashCode(minPair) == minHash)
300300
301301 // For all other streams that may have this key (i.e. have the same minimum key hash),
302302 // merge in the corresponding value (if any) from that stream
@@ -335,10 +335,9 @@ class ExternalAppendOnlyMap[K, V, C](
335335 def isEmpty = pairs.length == 0
336336
337337 // Invalid if there are no more pairs in this stream
338- def minKeyHash = {
338+ def minKeyHash : Int = {
339339 assert(pairs.length > 0 )
340- val key = pairs.head._1
341- if (key == null ) nullHashCode else key.hashCode()
340+ getKeyHashCode(pairs.head)
342341 }
343342
344343 override def compareTo (other : StreamBuffer ): Int = {
@@ -425,11 +424,22 @@ class ExternalAppendOnlyMap[K, V, C](
425424}
426425
427426private [spark] object ExternalAppendOnlyMap {
428- private val nullHashCode : Int = 0
427+
428+ /**
429+ * Return the key hash code of the given (key, combiner) pair.
430+ * If the key is null, return a special hash code.
431+ */
432+ private def getKeyHashCode [K , C ](kc : (K , C )): Int = {
433+ if (kc._1 == null ) 0 else kc._1.hashCode()
434+ }
435+
436+ /**
437+ * A comparator for (key, combiner) pairs based on their key hash codes.
438+ */
429439 private class KCComparator [K , C ] extends Comparator [(K , C )] {
430440 def compare (kc1 : (K , C ), kc2 : (K , C )): Int = {
431- val hash1 = if (kc1._1 == null ) nullHashCode else kc1._1.hashCode( )
432- val hash2 = if (kc2._1 == null ) nullHashCode else kc2._1.hashCode( )
441+ val hash1 = getKeyHashCode (kc1)
442+ val hash2 = getKeyHashCode (kc2)
433443 if (hash1 < hash2) - 1 else if (hash1 == hash2) 0 else 1
434444 }
435445 }
0 commit comments