@@ -252,7 +252,7 @@ class ExternalAppendOnlyMap[K, V, C](
252252 if (it.hasNext) {
253253 var kc = it.next()
254254 kcPairs += kc
255- val minHash = kc._1.hashCode()
255+ val minHash = if (kc._1 == null ) nullHashCode else kc._1.hashCode()
256256 while (it.hasNext && it.head._1.hashCode() == minHash) {
257257 kc = it.next()
258258 kcPairs += kc
@@ -295,7 +295,8 @@ class ExternalAppendOnlyMap[K, V, C](
295295 val minBuffer = mergeHeap.dequeue()
296296 val (minPairs, minHash) = (minBuffer.pairs, minBuffer.minKeyHash)
297297 var (minKey, minCombiner) = minPairs.remove(0 )
298- assert(minKey.hashCode() == minHash)
298+ val actualMinKeyHash = if (minKey == null ) nullHashCode else minKey.hashCode()
299+ assert(actualMinKeyHash == minHash)
299300
300301 // For all other streams that may have this key (i.e. have the same minimum key hash),
301302 // merge in the corresponding value (if any) from that stream
@@ -327,15 +328,17 @@ class ExternalAppendOnlyMap[K, V, C](
327328 * StreamBuffers are ordered by the minimum key hash found across all of their own pairs.
328329 */
329330 private class StreamBuffer (
330- val iterator : BufferedIterator [(K , C )], val pairs : ArrayBuffer [(K , C )])
331+ val iterator : BufferedIterator [(K , C )],
332+ val pairs : ArrayBuffer [(K , C )])
331333 extends Comparable [StreamBuffer ] {
332334
333335 def isEmpty = pairs.length == 0
334336
335337 // Invalid if there are no more pairs in this stream
336338 def minKeyHash = {
337339 assert(pairs.length > 0 )
338- pairs.head._1.hashCode()
340+ val key = pairs.head._1
341+ if (key == null ) nullHashCode else key.hashCode()
339342 }
340343
341344 override def compareTo (other : StreamBuffer ): Int = {
@@ -422,10 +425,11 @@ class ExternalAppendOnlyMap[K, V, C](
422425}
423426
424427private [spark] object ExternalAppendOnlyMap {
428+ private val nullHashCode : Int = 0
425429 private class KCComparator [K , C ] extends Comparator [(K , C )] {
426430 def compare (kc1 : (K , C ), kc2 : (K , C )): Int = {
427- val hash1 = kc1._1.hashCode()
428- val hash2 = kc2._1.hashCode()
431+ val hash1 = if (kc1._1 == null ) nullHashCode else kc1._1.hashCode()
432+ val hash2 = if (kc2._1 == null ) nullHashCode else kc2._1.hashCode()
429433 if (hash1 < hash2) - 1 else if (hash1 == hash2) 0 else 1
430434 }
431435 }
0 commit comments