From 39206563dc84e0d200c605717b69ff7485ea9c54 Mon Sep 17 00:00:00 2001 From: Vyacheslav Baranov Date: Thu, 11 Jun 2015 18:30:47 +0300 Subject: [PATCH 1/5] SPARK-8309: Support for more than 12M items in OpenHashMap --- .../org/apache/spark/util/collection/OpenHashSet.scala | 3 ++- .../spark/util/collection/OpenHashMapSuite.scala | 10 ++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala index 64e7102e3654c..f2d78990f555e 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala @@ -223,6 +223,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( */ private def rehash(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) { val newCapacity = _capacity * 2 + require(newCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements") allocateFunc(newCapacity) val newBitset = new BitSet(newCapacity) val newData = new Array[T](newCapacity) @@ -278,7 +279,7 @@ object OpenHashSet { val INVALID_POS = -1 val NONEXISTENCE_MASK = 0x80000000 - val POSITION_MASK = 0xEFFFFFF + val POSITION_MASK = 0x1FFFFFFF /** * A set of specialized hash function implementation to avoid boxing hash code computation diff --git a/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala index 94e011799921b..40c499da91c8c 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala @@ -186,4 +186,14 @@ class OpenHashMapSuite extends SparkFunSuite with Matchers { map(null) = 0 assert(map.contains(null)) } + + test("support for more than 12M items") { + val cnt = 12000000 // 12M + val map = new OpenHashMap[Int, Int](cnt) + for (i <- 0 until cnt) { + map(i) = 1 + } + val numInvalidValues = map.iterator.count(_._2 == 0) + assertResult(0)(numInvalidValues) + } } From f9284fdc171821f1c552f324711378cc6c168d12 Mon Sep 17 00:00:00 2001 From: Vyacheslav Baranov Date: Fri, 12 Jun 2015 19:06:32 +0300 Subject: [PATCH 2/5] Resolved review comments --- .../org/apache/spark/util/collection/OpenHashSet.scala | 6 +++--- .../util/collection/PrimitiveKeyOpenHashMapSuite.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala index f2d78990f555e..b7ac44909a07b 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala @@ -45,7 +45,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( loadFactor: Double) extends Serializable { - require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements") + require(initialCapacity <= (1 << 30), "Can't make capacity bigger than 2^30 elements") require(initialCapacity >= 1, "Invalid initial capacity") require(loadFactor < 1.0, "Load factor must be less than 1.0") require(loadFactor > 0.0, "Load factor must be greater than 0.0") @@ -223,7 +223,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( */ private def rehash(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) { val newCapacity = _capacity * 2 - require(newCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements") + require(newCapacity <= (1 << 30), "Can't make capacity bigger than 2^30 elements") allocateFunc(newCapacity) val newBitset = new BitSet(newCapacity) val newData = new Array[T](newCapacity) @@ -279,7 +279,7 @@ object OpenHashSet { val INVALID_POS = -1 val NONEXISTENCE_MASK = 0x80000000 - val POSITION_MASK = 0x1FFFFFFF + val POSITION_MASK = 0x7FFFFFFF /** * A set of specialized hash function implementation to avoid boxing hash code computation diff --git a/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMapSuite.scala index 462bc2f29f9f8..508e737b725bc 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMapSuite.scala @@ -44,7 +44,7 @@ class PrimitiveKeyOpenHashMapSuite extends SparkFunSuite with Matchers { val goodMap3 = new PrimitiveKeyOpenHashMap[Int, Int](256) assert(goodMap3.size === 0) intercept[IllegalArgumentException] { - new PrimitiveKeyOpenHashMap[Int, Int](1 << 30) // Invalid map size: bigger than 2^29 + new PrimitiveKeyOpenHashMap[Int, Int](1 << 30 + 1) // Invalid map size: bigger than 2^30 } intercept[IllegalArgumentException] { new PrimitiveKeyOpenHashMap[Int, Int](-1) From eaf1e686dc8c7bf6ad1f71c94bef6f33856ffea9 Mon Sep 17 00:00:00 2001 From: Vyacheslav Baranov Date: Fri, 12 Jun 2015 21:19:33 +0300 Subject: [PATCH 3/5] Fixed failing test --- .../org/apache/spark/util/collection/OpenHashMapSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala index 40c499da91c8c..3066e9996abda 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala @@ -44,7 +44,7 @@ class OpenHashMapSuite extends SparkFunSuite with Matchers { val goodMap3 = new OpenHashMap[String, String](256) assert(goodMap3.size === 0) intercept[IllegalArgumentException] { - new OpenHashMap[String, Int](1 << 30) // Invalid map size: bigger than 2^29 + new OpenHashMap[String, Int](1 << 30 + 1) // Invalid map size: bigger than 2^30 } intercept[IllegalArgumentException] { new OpenHashMap[String, Int](-1) From 4d5b9543b60611a54a1c1023783d3078f4b0b214 Mon Sep 17 00:00:00 2001 From: Vyacheslav Baranov Date: Mon, 15 Jun 2015 12:51:22 +0300 Subject: [PATCH 4/5] Resolved review comments --- .../apache/spark/util/collection/OpenHashSet.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala index b7ac44909a07b..3e3ece2da4375 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala @@ -45,7 +45,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( loadFactor: Double) extends Serializable { - require(initialCapacity <= (1 << 30), "Can't make capacity bigger than 2^30 elements") + require(initialCapacity <= OpenHashSet.MAX_CAPACITY, + s"Can't make capacity bigger than ${OpenHashSet.MAX_CAPACITY} elements") require(initialCapacity >= 1, "Invalid initial capacity") require(loadFactor < 1.0, "Load factor must be less than 1.0") require(loadFactor > 0.0, "Load factor must be greater than 0.0") @@ -223,7 +224,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( */ private def rehash(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) { val newCapacity = _capacity * 2 - require(newCapacity <= (1 << 30), "Can't make capacity bigger than 2^30 elements") + require(newCapacity <= OpenHashSet.MAX_CAPACITY, + s"Can't contain more than ${(loadFactor * OpenHashSet.MAX_CAPACITY).toInt} elements") allocateFunc(newCapacity) val newBitset = new BitSet(newCapacity) val newData = new Array[T](newCapacity) @@ -277,9 +279,10 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( private[spark] object OpenHashSet { + val MAX_CAPACITY = 1 << 30 val INVALID_POS = -1 - val NONEXISTENCE_MASK = 0x80000000 - val POSITION_MASK = 0x7FFFFFFF + val NONEXISTENCE_MASK = 1 << 31 + val POSITION_MASK = (1 << 31) - 1 /** * A set of specialized hash function implementation to avoid boxing hash code computation From 8557445e79c42460890ca76c0894dc090f7bf862 Mon Sep 17 00:00:00 2001 From: Vyacheslav Baranov Date: Tue, 16 Jun 2015 13:52:15 +0300 Subject: [PATCH 5/5] Resolved review comments --- .../scala/org/apache/spark/util/collection/OpenHashSet.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala index 3e3ece2da4375..60bf4dd7469f1 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala @@ -224,7 +224,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( */ private def rehash(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) { val newCapacity = _capacity * 2 - require(newCapacity <= OpenHashSet.MAX_CAPACITY, + require(newCapacity > 0 && newCapacity <= OpenHashSet.MAX_CAPACITY, s"Can't contain more than ${(loadFactor * OpenHashSet.MAX_CAPACITY).toInt} elements") allocateFunc(newCapacity) val newBitset = new BitSet(newCapacity)