From 8fa8fa4bca355ac8f803132156fd3926d68190f8 Mon Sep 17 00:00:00 2001 From: Jie Xiong Date: Tue, 1 Nov 2016 13:43:19 -0700 Subject: [PATCH 1/4] Fix the OOM failure from this operator. --- .../java/org/apache/spark/unsafe/map/BytesToBytesMap.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index d2fcdea4f2cee..785650f371c7d 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -170,6 +170,8 @@ public final class BytesToBytesMap extends MemoryConsumer { private long peakMemoryUsedBytes = 0L; + private int initialCapacity; + private final BlockManager blockManager; private final SerializerManager serializerManager; private volatile MapIterator destructiveIterator = null; @@ -202,6 +204,7 @@ public BytesToBytesMap( throw new IllegalArgumentException("Page size " + pageSizeBytes + " cannot exceed " + TaskMemoryManager.MAXIMUM_PAGE_SIZE_BYTES); } + this.initialCapacity = initialCapacity; allocate(initialCapacity); } @@ -903,11 +906,12 @@ public void reset() { numKeys = 0; numValues = 0; longArray.zeroOut(); - + freeArray(longArray); while (dataPages.size() > 0) { MemoryBlock dataPage = dataPages.removeLast(); freePage(dataPage); } + allocate(initialCapacity); currentPage = null; pageCursor = 0; } From 7a956978189953590fcfd482ff96083d2cf91383 Mon Sep 17 00:00:00 2001 From: Jie Xiong Date: Tue, 1 Nov 2016 17:02:22 -0700 Subject: [PATCH 2/4] address comments --- .../main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 785650f371c7d..375f43f0867f4 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -905,7 +905,6 @@ public LongArray getArray() { public void reset() { numKeys = 0; numValues = 0; - longArray.zeroOut(); freeArray(longArray); while (dataPages.size() > 0) { MemoryBlock dataPage = dataPages.removeLast(); From 073d58b4ba1ba2a72a57437716c54ab3566bdae6 Mon Sep 17 00:00:00 2001 From: jiexiong Date: Wed, 30 Nov 2016 18:03:38 -0800 Subject: [PATCH 3/4] Update BytesToBytesMap.java --- .../main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 375f43f0867f4..0b07766317ac3 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -170,7 +170,7 @@ public final class BytesToBytesMap extends MemoryConsumer { private long peakMemoryUsedBytes = 0L; - private int initialCapacity; + final private int initialCapacity; private final BlockManager blockManager; private final SerializerManager serializerManager; From e7ec5bf4fa647c6cdd92fa3adb44cc06ea46de75 Mon Sep 17 00:00:00 2001 From: jiexiong Date: Wed, 30 Nov 2016 18:07:43 -0800 Subject: [PATCH 4/4] Update BytesToBytesMap.java --- .../main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 0b07766317ac3..44120e591f2fb 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -170,7 +170,7 @@ public final class BytesToBytesMap extends MemoryConsumer { private long peakMemoryUsedBytes = 0L; - final private int initialCapacity; + private final int initialCapacity; private final BlockManager blockManager; private final SerializerManager serializerManager;