Skip to content

Commit 4ee1f42

Browse files
author
Davies Liu
committed
improve OOM handling
1 parent cda4b2a commit 4ee1f42

File tree

4 files changed

+44
-29
lines changed

4 files changed

+44
-29
lines changed

core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -320,22 +320,26 @@ public void cleanupResources() {
320320
private void growPointerArrayIfNecessary() throws IOException {
321321
assert(inMemSorter != null);
322322
if (!inMemSorter.hasSpaceForAnotherRecord()) {
323-
logger.debug("Attempting to expand sort pointer array");
324323
long used = inMemSorter.getMemoryUsage();
325-
long needed = inMemSorter.getMemoryToExpand();
324+
long needed = used + inMemSorter.getMemoryToExpand();
326325
try {
327-
acquireMemory(used + needed); // could trigger spilling
328-
if (inMemSorter.hasSpaceForAnotherRecord()) {
329-
releaseMemory(used + needed);
330-
} else {
331-
logger.debug("Expand sort pointer array");
326+
acquireMemory(needed); // could trigger spilling
327+
} catch (OutOfMemoryError e) {
328+
// should have trigger spilling
329+
assert(inMemSorter.hasSpaceForAnotherRecord());
330+
return;
331+
}
332+
// check if spilling is triggered or not
333+
if (inMemSorter.hasSpaceForAnotherRecord()) {
334+
releaseMemory(needed);
335+
} else {
336+
try {
332337
inMemSorter.expandPointerArray();
333338
releaseMemory(used);
334-
}
335-
} catch (OutOfMemoryError oom) {
336-
// spilling should be triggered
337-
if (!inMemSorter.hasSpaceForAnotherRecord()) {
338-
spill(); // Just in case that JVM had run out of memory
339+
} catch (OutOfMemoryError oom) {
340+
// Just in case that JVM had run out of memory
341+
releaseMemory(needed);
342+
spill();
339343
}
340344
}
341345
}

core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,7 @@ public void reset() {
6060

6161
private int newLength() {
6262
// Guard against overflow:
63-
return array.length <= Integer.MAX_VALUE / 2 ?
64-
(array.length * 2) : Integer.MAX_VALUE;
63+
return array.length <= Integer.MAX_VALUE / 2 ? (array.length * 2) : Integer.MAX_VALUE;
6564
}
6665

6766
/**

core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -710,13 +710,13 @@ public boolean putNewKey(Object keyBase, long keyOffset, int keyLength,
710710
private boolean acquireNewPage(long required) {
711711
try {
712712
currentPage = allocatePage(required);
713-
dataPages.add(currentPage);
714-
Platform.putInt(currentPage.getBaseObject(), currentPage.getBaseOffset(), 0);
715-
pageCursor = 4;
716-
return true;
717713
} catch (OutOfMemoryError e) {
718714
return false;
719715
}
716+
dataPages.add(currentPage);
717+
Platform.putInt(currentPage.getBaseObject(), currentPage.getBaseOffset(), 0);
718+
pageCursor = 4;
719+
return true;
720720
}
721721

722722
@Override
@@ -862,7 +862,13 @@ void growAndRehash() {
862862
final int oldCapacity = (int) oldBitSet.capacity();
863863

864864
// Allocate the new data structures
865-
allocate(Math.min(growthStrategy.nextCapacity(oldCapacity), MAX_CAPACITY));
865+
try {
866+
allocate(Math.min(growthStrategy.nextCapacity(oldCapacity), MAX_CAPACITY));
867+
} catch (OutOfMemoryError oom) {
868+
longArray = oldLongArray;
869+
bitset = oldBitSet;
870+
throw oom;
871+
}
866872

867873
// Re-mask (we don't recompute the hashcode because we stored all 32 bits of it)
868874
for (int pos = oldBitSet.nextSetBit(0); pos >= 0; pos = oldBitSet.nextSetBit(pos + 1)) {

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -290,19 +290,25 @@ private void growPointerArrayIfNecessary() throws IOException {
290290
assert(inMemSorter != null);
291291
if (!inMemSorter.hasSpaceForAnotherRecord()) {
292292
long used = inMemSorter.getMemoryUsage();
293-
long needed = inMemSorter.getMemoryToExpand();
293+
long needed = used + inMemSorter.getMemoryToExpand();
294294
try {
295-
acquireMemory(used + needed); // could trigger spilling
296-
if (inMemSorter.hasSpaceForAnotherRecord()) {
297-
releaseMemory(used + needed);
298-
} else {
295+
acquireMemory(needed); // could trigger spilling
296+
} catch (OutOfMemoryError e) {
297+
// should have trigger spilling
298+
assert(inMemSorter.hasSpaceForAnotherRecord());
299+
return;
300+
}
301+
// check if spilling is triggered or not
302+
if (inMemSorter.hasSpaceForAnotherRecord()) {
303+
releaseMemory(needed);
304+
} else {
305+
try {
299306
inMemSorter.expandPointerArray();
300307
releaseMemory(used);
301-
}
302-
} catch (OutOfMemoryError oom) {
303-
// spilling should be triggered
304-
if (!inMemSorter.hasSpaceForAnotherRecord()) {
305-
spill(); // Just in case that JVM had run out of memory
308+
} catch (OutOfMemoryError oom) {
309+
// Just in case that JVM had run out of memory
310+
releaseMemory(needed);
311+
spill();
306312
}
307313
}
308314
}

0 commit comments

Comments
 (0)