Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff
if (numKeys == MAX_CAPACITY
// The map could be reused from last spill (because of no enough memory to grow),
// then we don't try to grow again if hit the `growthThreshold`.
|| !canGrowArray && numKeys > growthThreshold) {
|| !canGrowArray && numKeys >= growthThreshold) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change makes sense to me because growthThreshold's Scaladoc says "The map will be expanded once the number of keys exceeds this threshold" and here we're considering the impact of adding an additional key (so this could have also been written as (numKeys + 1) > growthThreshold).

return false;
}

Expand Down Expand Up @@ -742,7 +742,7 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff
longArray.set(pos * 2 + 1, keyHashcode);
isDefined = true;

if (numKeys > growthThreshold && longArray.size() < MAX_CAPACITY) {
if (numKeys >= growthThreshold && longArray.size() < MAX_CAPACITY) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The re-allocated space might not be used if no further insertion. Shall we do growAndRehash at the beginning of append when numKeys == growthThreshold && !isDefined?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, we can't grow in the beginning, otherwise the pos will be wrong.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. LGTM.

try {
growAndRehash();
} catch (OutOfMemoryError oom) {
Expand Down Expand Up @@ -911,6 +911,7 @@ public void reset() {
freePage(dataPage);
}
allocate(initialCapacity);
canGrowArray = true;
currentPage = null;
pageCursor = 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,4 +342,44 @@ class UnsafeFixedWidthAggregationMapSuite
}
}

testWithMemoryLeakDetection("convert to external sorter after fail to grow (SPARK-19500)") {
val pageSize = 4096000
val map = new UnsafeFixedWidthAggregationMap(
emptyAggregationBuffer,
aggBufferSchema,
groupKeySchema,
taskMemoryManager,
128, // initial capacity
pageSize,
false // disable perf metrics
)

val rand = new Random(42)
for (i <- 1 to 63) {
val str = rand.nextString(1024)
val buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str)))
buf.setInt(0, str.length)
}
// Simulate running out of space
memoryManager.limit(0)
var str = rand.nextString(1024)
var buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str)))
assert(buf != null)
str = rand.nextString(1024)
buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str)))
assert(buf == null)

// Convert the map into a sorter. This used to fail before the fix for SPARK-10474
// because we would try to acquire space for the in-memory sorter pointer array before
// actually releasing the pages despite having spilled all of them.
var sorter: UnsafeKVExternalSorter = null
try {
sorter = map.destructAndCreateExternalSorter()
map.free()
} finally {
if (sorter != null) {
sorter.cleanupResources()
}
}
}
}