Skip to content

Commit a5f602e

Browse files
SlavikBaranovsrowen
authored andcommitted
[SPARK-8309] [CORE] Support for more than 12M items in OpenHashMap
The problem occurs because the position mask `0xEFFFFFF` is incorrect. It has zero 25th bit, so when capacity grows beyond 2^24, `OpenHashMap` calculates incorrect index of value in `_values` array. I've also added a size check in `rehash()`, so that it fails instead of reporting invalid item indices. Author: Vyacheslav Baranov <[email protected]> Closes #6763 from SlavikBaranov/SPARK-8309 and squashes the following commits: 8557445 [Vyacheslav Baranov] Resolved review comments 4d5b954 [Vyacheslav Baranov] Resolved review comments eaf1e68 [Vyacheslav Baranov] Fixed failing test f9284fd [Vyacheslav Baranov] Resolved review comments 3920656 [Vyacheslav Baranov] SPARK-8309: Support for more than 12M items in OpenHashMap (cherry picked from commit c13da20) Signed-off-by: Sean Owen <[email protected]>
1 parent 877deb0 commit a5f602e

File tree

3 files changed

+19
-5
lines changed

3 files changed

+19
-5
lines changed

core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
4343
loadFactor: Double)
4444
extends Serializable {
4545

46-
require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
46+
require(initialCapacity <= OpenHashSet.MAX_CAPACITY,
47+
s"Can't make capacity bigger than ${OpenHashSet.MAX_CAPACITY} elements")
4748
require(initialCapacity >= 1, "Invalid initial capacity")
4849
require(loadFactor < 1.0, "Load factor must be less than 1.0")
4950
require(loadFactor > 0.0, "Load factor must be greater than 0.0")
@@ -213,6 +214,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
213214
*/
214215
private def rehash(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) {
215216
val newCapacity = _capacity * 2
217+
require(newCapacity > 0 && newCapacity <= OpenHashSet.MAX_CAPACITY,
218+
s"Can't contain more than ${(loadFactor * OpenHashSet.MAX_CAPACITY).toInt} elements")
216219
allocateFunc(newCapacity)
217220
val newBitset = new BitSet(newCapacity)
218221
val newData = new Array[T](newCapacity)
@@ -266,9 +269,10 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
266269
private[spark]
267270
object OpenHashSet {
268271

272+
val MAX_CAPACITY = 1 << 30
269273
val INVALID_POS = -1
270-
val NONEXISTENCE_MASK = 0x80000000
271-
val POSITION_MASK = 0xEFFFFFF
274+
val NONEXISTENCE_MASK = 1 << 31
275+
val POSITION_MASK = (1 << 31) - 1
272276

273277
/**
274278
* A set of specialized hash function implementation to avoid boxing hash code computation

core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ class OpenHashMapSuite extends SparkFunSuite with Matchers {
4444
val goodMap3 = new OpenHashMap[String, String](256)
4545
assert(goodMap3.size === 0)
4646
intercept[IllegalArgumentException] {
47-
new OpenHashMap[String, Int](1 << 30) // Invalid map size: bigger than 2^29
47+
new OpenHashMap[String, Int](1 << 30 + 1) // Invalid map size: bigger than 2^30
4848
}
4949
intercept[IllegalArgumentException] {
5050
new OpenHashMap[String, Int](-1)
@@ -186,4 +186,14 @@ class OpenHashMapSuite extends SparkFunSuite with Matchers {
186186
map(null) = 0
187187
assert(map.contains(null))
188188
}
189+
190+
test("support for more than 12M items") {
191+
val cnt = 12000000 // 12M
192+
val map = new OpenHashMap[Int, Int](cnt)
193+
for (i <- 0 until cnt) {
194+
map(i) = 1
195+
}
196+
val numInvalidValues = map.iterator.count(_._2 == 0)
197+
assertResult(0)(numInvalidValues)
198+
}
189199
}

core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMapSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ class PrimitiveKeyOpenHashMapSuite extends SparkFunSuite with Matchers {
4444
val goodMap3 = new PrimitiveKeyOpenHashMap[Int, Int](256)
4545
assert(goodMap3.size === 0)
4646
intercept[IllegalArgumentException] {
47-
new PrimitiveKeyOpenHashMap[Int, Int](1 << 30) // Invalid map size: bigger than 2^29
47+
new PrimitiveKeyOpenHashMap[Int, Int](1 << 30 + 1) // Invalid map size: bigger than 2^30
4848
}
4949
intercept[IllegalArgumentException] {
5050
new PrimitiveKeyOpenHashMap[Int, Int](-1)

0 commit comments

Comments
 (0)