Skip to content

Commit 3d0c3af

Browse files
Davies Liudavies
authored andcommitted
[SPARK-19500] [SQL] Fix off-by-one bug in BytesToBytesMap
## What changes were proposed in this pull request? Radix sort require that half of array as free (as temporary space), so we use 0.5 as the scale factor to make sure that BytesToBytesMap will not have more items than 1/2 of capacity. Turned out this is not true, the current implementation of append() could leave 1 more item than the threshold (1/2 of capacity) in the array, which break the requirement of radix sort (fail the assert in 2.2, or fail to insert into InMemorySorter in 2.1). This PR fix the off-by-one bug in BytesToBytesMap. This PR also fix a bug that the array will never grow if it fail to grow once (stay as initial capacity), introduced by #15722 . ## How was this patch tested? Added regression test. Author: Davies Liu <[email protected]> Closes #16844 from davies/off_by_one.
1 parent 021062a commit 3d0c3af

File tree

2 files changed

+43
-2
lines changed

2 files changed

+43
-2
lines changed

core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -698,7 +698,7 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff
698698
if (numKeys == MAX_CAPACITY
699699
// The map could be reused from last spill (because of no enough memory to grow),
700700
// then we don't try to grow again if hit the `growthThreshold`.
701-
|| !canGrowArray && numKeys > growthThreshold) {
701+
|| !canGrowArray && numKeys >= growthThreshold) {
702702
return false;
703703
}
704704

@@ -742,7 +742,7 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff
742742
longArray.set(pos * 2 + 1, keyHashcode);
743743
isDefined = true;
744744

745-
if (numKeys > growthThreshold && longArray.size() < MAX_CAPACITY) {
745+
if (numKeys >= growthThreshold && longArray.size() < MAX_CAPACITY) {
746746
try {
747747
growAndRehash();
748748
} catch (OutOfMemoryError oom) {
@@ -911,6 +911,7 @@ public void reset() {
911911
freePage(dataPage);
912912
}
913913
allocate(initialCapacity);
914+
canGrowArray = true;
914915
currentPage = null;
915916
pageCursor = 0;
916917
}

sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,4 +342,44 @@ class UnsafeFixedWidthAggregationMapSuite
342342
}
343343
}
344344

345+
testWithMemoryLeakDetection("convert to external sorter after fail to grow (SPARK-19500)") {
346+
val pageSize = 4096000
347+
val map = new UnsafeFixedWidthAggregationMap(
348+
emptyAggregationBuffer,
349+
aggBufferSchema,
350+
groupKeySchema,
351+
taskMemoryManager,
352+
128, // initial capacity
353+
pageSize,
354+
false // disable perf metrics
355+
)
356+
357+
val rand = new Random(42)
358+
for (i <- 1 to 63) {
359+
val str = rand.nextString(1024)
360+
val buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str)))
361+
buf.setInt(0, str.length)
362+
}
363+
// Simulate running out of space
364+
memoryManager.limit(0)
365+
var str = rand.nextString(1024)
366+
var buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str)))
367+
assert(buf != null)
368+
str = rand.nextString(1024)
369+
buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str)))
370+
assert(buf == null)
371+
372+
// Convert the map into a sorter. This used to fail before the fix for SPARK-10474
373+
// because we would try to acquire space for the in-memory sorter pointer array before
374+
// actually releasing the pages despite having spilled all of them.
375+
var sorter: UnsafeKVExternalSorter = null
376+
try {
377+
sorter = map.destructAndCreateExternalSorter()
378+
map.free()
379+
} finally {
380+
if (sorter != null) {
381+
sorter.cleanupResources()
382+
}
383+
}
384+
}
345385
}

0 commit comments

Comments
 (0)