From 61bceffdd352b6c03ae5efe7386363be213ad018 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 7 Feb 2017 14:02:33 -0800 Subject: [PATCH 1/4] Fix off-by-one bug in BytesToBytesMap --- .../spark/unsafe/map/BytesToBytesMap.java | 4 +- .../UnsafeFixedWidthAggregationMapSuite.scala | 40 +++++++++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 44120e591f2fb..0351a234fa3d9 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -698,7 +698,7 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff if (numKeys == MAX_CAPACITY // The map could be reused from last spill (because of no enough memory to grow), // then we don't try to grow again if hit the `growthThreshold`. - || !canGrowArray && numKeys > growthThreshold) { + || !canGrowArray && numKeys >= growthThreshold) { return false; } @@ -742,7 +742,7 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff longArray.set(pos * 2 + 1, keyHashcode); isDefined = true; - if (numKeys > growthThreshold && longArray.size() < MAX_CAPACITY) { + if (numKeys >= growthThreshold && longArray.size() < MAX_CAPACITY) { try { growAndRehash(); } catch (OutOfMemoryError oom) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala index c1555114e8b3e..6cf18de0cc768 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala @@ -342,4 +342,44 @@ class UnsafeFixedWidthAggregationMapSuite } } + testWithMemoryLeakDetection("convert to external sorter after fail to grow (SPARK-19500)") { + val pageSize = 4096000 + val map = new UnsafeFixedWidthAggregationMap( + emptyAggregationBuffer, + aggBufferSchema, + groupKeySchema, + taskMemoryManager, + 128, // initial capacity + pageSize, + false // disable perf metrics + ) + + val rand = new Random(42) + for (i <- 1 to 63) { + val str = rand.nextString(1024) + val buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str))) + buf.setInt(0, str.length) + } + // Simulate running out of space + memoryManager.limit(0) + var str = rand.nextString(1024) + var buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str))) + assert(buf != null) + str = rand.nextString(1024) + buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str))) + assert(buf == null) + + // Convert the map into a sorter. This used to fail before the fix for SPARK-10474 + // because we would try to acquire space for the in-memory sorter pointer array before + // actually releasing the pages despite having spilled all of them. + var sorter: UnsafeKVExternalSorter = null + try { + sorter = map.destructAndCreateExternalSorter() + map.free() + } finally { + if (sorter != null) { + sorter.cleanupResources() + } + } + } } From d9aa2081c514577399ba77cfe2145a00ed477ef8 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 8 Feb 2017 10:01:46 -0800 Subject: [PATCH 2/4] address comment, also fix another issue that the array will never grow if it fail to grow once (stay as intial capacity). --- .../spark/unsafe/map/BytesToBytesMap.java | 28 +++++++------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 0351a234fa3d9..9003375218e12 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -118,11 +118,6 @@ public final class BytesToBytesMap extends MemoryConsumer { // full base addresses in the page table for off-heap mode so that we can reconstruct the full // absolute memory addresses. - /** - * Whether or not the longArray can grow. We will not insert more elements if it's false. - */ - private boolean canGrowArray = true; - private final double loadFactor; /** @@ -695,11 +690,16 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff assert (vlen % 8 == 0); assert (longArray != null); - if (numKeys == MAX_CAPACITY - // The map could be reused from last spill (because of no enough memory to grow), - // then we don't try to grow again if hit the `growthThreshold`. - || !canGrowArray && numKeys >= growthThreshold) { - return false; + if (numKeys >= growthThreshold) { + if (longArray.size() / 2 == MAX_CAPACITY) { + // Should not grow beyond the max capacity + return false; + } + try { + growAndRehash(); + } catch (OutOfMemoryError oom) { + return false; + } } // Here, we'll copy the data into our data pages. Because we only store a relative offset from @@ -741,14 +741,6 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff numKeys++; longArray.set(pos * 2 + 1, keyHashcode); isDefined = true; - - if (numKeys >= growthThreshold && longArray.size() < MAX_CAPACITY) { - try { - growAndRehash(); - } catch (OutOfMemoryError oom) { - canGrowArray = false; - } - } } return true; } From dc3c95d49c61c4cf1072528e3764a128045ade7d Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 15 Feb 2017 11:24:36 -0800 Subject: [PATCH 3/4] Revert "address comment, also fix another issue that the array will never grow" This reverts commit d9aa2081c514577399ba77cfe2145a00ed477ef8. --- .../spark/unsafe/map/BytesToBytesMap.java | 28 ++++++++++++------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 9003375218e12..0351a234fa3d9 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -118,6 +118,11 @@ public final class BytesToBytesMap extends MemoryConsumer { // full base addresses in the page table for off-heap mode so that we can reconstruct the full // absolute memory addresses. + /** + * Whether or not the longArray can grow. We will not insert more elements if it's false. + */ + private boolean canGrowArray = true; + private final double loadFactor; /** @@ -690,16 +695,11 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff assert (vlen % 8 == 0); assert (longArray != null); - if (numKeys >= growthThreshold) { - if (longArray.size() / 2 == MAX_CAPACITY) { - // Should not grow beyond the max capacity - return false; - } - try { - growAndRehash(); - } catch (OutOfMemoryError oom) { - return false; - } + if (numKeys == MAX_CAPACITY + // The map could be reused from last spill (because of no enough memory to grow), + // then we don't try to grow again if hit the `growthThreshold`. + || !canGrowArray && numKeys >= growthThreshold) { + return false; } // Here, we'll copy the data into our data pages. Because we only store a relative offset from @@ -741,6 +741,14 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff numKeys++; longArray.set(pos * 2 + 1, keyHashcode); isDefined = true; + + if (numKeys >= growthThreshold && longArray.size() < MAX_CAPACITY) { + try { + growAndRehash(); + } catch (OutOfMemoryError oom) { + canGrowArray = false; + } + } } return true; } From 8f098aa762a711dc6b8f915ae22887b61b641f49 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 15 Feb 2017 11:27:55 -0800 Subject: [PATCH 4/4] fix growing --- .../main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 0351a234fa3d9..4bef21b6b4e4d 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -911,6 +911,7 @@ public void reset() { freePage(dataPage); } allocate(initialCapacity); + canGrowArray = true; currentPage = null; pageCursor = 0; }