From ed5adf954f5edbcf951a3809f0dc945c94c92d26 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 2 Jul 2014 14:42:55 -0700 Subject: [PATCH 1/2] Fix NPE for ExternalAppendOnlyMap --- .../collection/ExternalAppendOnlyMap.scala | 16 ++++++----- .../ExternalAppendOnlyMapSuite.scala | 27 ++++++++++++++++--- 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 288badd3160f..cb070cbcea13 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -252,7 +252,7 @@ class ExternalAppendOnlyMap[K, V, C]( if (it.hasNext) { var kc = it.next() kcPairs += kc - val minHash = kc._1.hashCode() + val minHash = if (kc._1 == null) nullHashCode else kc._1.hashCode() while (it.hasNext && it.head._1.hashCode() == minHash) { kc = it.next() kcPairs += kc @@ -295,7 +295,8 @@ class ExternalAppendOnlyMap[K, V, C]( val minBuffer = mergeHeap.dequeue() val (minPairs, minHash) = (minBuffer.pairs, minBuffer.minKeyHash) var (minKey, minCombiner) = minPairs.remove(0) - assert(minKey.hashCode() == minHash) + val actualMinKeyHash = if (minKey == null) nullHashCode else minKey.hashCode() + assert(actualMinKeyHash == minHash) // For all other streams that may have this key (i.e. have the same minimum key hash), // merge in the corresponding value (if any) from that stream @@ -327,7 +328,8 @@ class ExternalAppendOnlyMap[K, V, C]( * StreamBuffers are ordered by the minimum key hash found across all of their own pairs. */ private class StreamBuffer( - val iterator: BufferedIterator[(K, C)], val pairs: ArrayBuffer[(K, C)]) + val iterator: BufferedIterator[(K, C)], + val pairs: ArrayBuffer[(K, C)]) extends Comparable[StreamBuffer] { def isEmpty = pairs.length == 0 @@ -335,7 +337,8 @@ class ExternalAppendOnlyMap[K, V, C]( // Invalid if there are no more pairs in this stream def minKeyHash = { assert(pairs.length > 0) - pairs.head._1.hashCode() + val key = pairs.head._1 + if (key == null) nullHashCode else key.hashCode() } override def compareTo(other: StreamBuffer): Int = { @@ -422,10 +425,11 @@ class ExternalAppendOnlyMap[K, V, C]( } private[spark] object ExternalAppendOnlyMap { + private val nullHashCode: Int = 0 private class KCComparator[K, C] extends Comparator[(K, C)] { def compare(kc1: (K, C), kc2: (K, C)): Int = { - val hash1 = kc1._1.hashCode() - val hash2 = kc2._1.hashCode() + val hash1 = if (kc1._1 == null) nullHashCode else kc1._1.hashCode() + val hash2 = if (kc2._1 == null) nullHashCode else kc2._1.hashCode() if (hash1 < hash2) -1 else if (hash1 == hash2) 0 else 1 } } diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index deb780953579..428822949c08 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -334,8 +334,8 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { conf.set("spark.shuffle.memoryFraction", "0.001") sc = new SparkContext("local-cluster[1,1,512]", "test", conf) - val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]](createCombiner, - mergeValue, mergeCombiners) + val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]]( + createCombiner, mergeValue, mergeCombiners) (1 to 100000).foreach { i => map.insert(i, i) } map.insert(Int.MaxValue, Int.MaxValue) @@ -346,11 +346,32 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { it.next() } } + + test("spilling with null keys and values") { + val conf = new SparkConf(true) + conf.set("spark.shuffle.memoryFraction", "0.001") + sc = new SparkContext("local-cluster[1,1,512]", "test", conf) + + val map = new ExternalAppendOnlyMap[Int, Int, ArrayBuffer[Int]]( + createCombiner, mergeValue, mergeCombiners) + + (1 to 100000).foreach { i => map.insert(i, i) } + map.insert(null.asInstanceOf[Int], 1) + map.insert(1, null.asInstanceOf[Int]) + map.insert(null.asInstanceOf[Int], null.asInstanceOf[Int]) + + val it = map.iterator + while (it.hasNext) { + // Should not throw NullPointerException + it.next() + } + } + } /** * A dummy class that always returns the same hash code, to easily test hash collisions */ -case class FixedHashObject(val v: Int, val h: Int) extends Serializable { +case class FixedHashObject(v: Int, h: Int) extends Serializable { override def hashCode(): Int = h } From 312b8d805097127ba763a201fd480b7fef089519 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 2 Jul 2014 16:11:16 -0700 Subject: [PATCH 2/2] Abstract key hash code --- .../collection/ExternalAppendOnlyMap.scala | 30 ++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index cb070cbcea13..292d0962f4fd 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -252,7 +252,7 @@ class ExternalAppendOnlyMap[K, V, C]( if (it.hasNext) { var kc = it.next() kcPairs += kc - val minHash = if (kc._1 == null) nullHashCode else kc._1.hashCode() + val minHash = getKeyHashCode(kc) while (it.hasNext && it.head._1.hashCode() == minHash) { kc = it.next() kcPairs += kc @@ -294,9 +294,9 @@ class ExternalAppendOnlyMap[K, V, C]( // Select a key from the StreamBuffer that holds the lowest key hash val minBuffer = mergeHeap.dequeue() val (minPairs, minHash) = (minBuffer.pairs, minBuffer.minKeyHash) - var (minKey, minCombiner) = minPairs.remove(0) - val actualMinKeyHash = if (minKey == null) nullHashCode else minKey.hashCode() - assert(actualMinKeyHash == minHash) + val minPair = minPairs.remove(0) + var (minKey, minCombiner) = minPair + assert(getKeyHashCode(minPair) == minHash) // For all other streams that may have this key (i.e. have the same minimum key hash), // merge in the corresponding value (if any) from that stream @@ -335,10 +335,9 @@ class ExternalAppendOnlyMap[K, V, C]( def isEmpty = pairs.length == 0 // Invalid if there are no more pairs in this stream - def minKeyHash = { + def minKeyHash: Int = { assert(pairs.length > 0) - val key = pairs.head._1 - if (key == null) nullHashCode else key.hashCode() + getKeyHashCode(pairs.head) } override def compareTo(other: StreamBuffer): Int = { @@ -425,11 +424,22 @@ class ExternalAppendOnlyMap[K, V, C]( } private[spark] object ExternalAppendOnlyMap { - private val nullHashCode: Int = 0 + + /** + * Return the key hash code of the given (key, combiner) pair. + * If the key is null, return a special hash code. + */ + private def getKeyHashCode[K, C](kc: (K, C)): Int = { + if (kc._1 == null) 0 else kc._1.hashCode() + } + + /** + * A comparator for (key, combiner) pairs based on their key hash codes. + */ private class KCComparator[K, C] extends Comparator[(K, C)] { def compare(kc1: (K, C), kc2: (K, C)): Int = { - val hash1 = if (kc1._1 == null) nullHashCode else kc1._1.hashCode() - val hash2 = if (kc2._1 == null) nullHashCode else kc2._1.hashCode() + val hash1 = getKeyHashCode(kc1) + val hash2 = getKeyHashCode(kc2) if (hash1 < hash2) -1 else if (hash1 == hash2) 0 else 1 } }