Skip to content

Commit 76e019d

Browse files
10110346cloud-fan
authored andcommitted
[SPARK-21860][CORE] Improve memory reuse for heap memory in HeapMemoryAllocator
## What changes were proposed in this pull request? In `HeapMemoryAllocator`, when allocating memory from pool, and the key of pool is memory size. Actually some size of memory ,such as 1025bytes,1026bytes,......1032bytes, we can think they are the same,because we allocate memory in multiples of 8 bytes. In this case, we can improve memory reuse. ## How was this patch tested? Existing tests and added unit tests Author: liuxian <[email protected]> Closes #19077 from 10110346/headmemoptimize.
1 parent a75f927 commit 76e019d

File tree

2 files changed

+33
-7
lines changed

2 files changed

+33
-7
lines changed

common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,12 @@ private boolean shouldPool(long size) {
4646

4747
@Override
4848
public MemoryBlock allocate(long size) throws OutOfMemoryError {
49-
if (shouldPool(size)) {
49+
int numWords = (int) ((size + 7) / 8);
50+
long alignedSize = numWords * 8L;
51+
assert (alignedSize >= size);
52+
if (shouldPool(alignedSize)) {
5053
synchronized (this) {
51-
final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(size);
54+
final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
5255
if (pool != null) {
5356
while (!pool.isEmpty()) {
5457
final WeakReference<long[]> arrayReference = pool.pop();
@@ -62,11 +65,11 @@ public MemoryBlock allocate(long size) throws OutOfMemoryError {
6265
return memory;
6366
}
6467
}
65-
bufferPoolsBySize.remove(size);
68+
bufferPoolsBySize.remove(alignedSize);
6669
}
6770
}
6871
}
69-
long[] array = new long[(int) ((size + 7) / 8)];
72+
long[] array = new long[numWords];
7073
MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
7174
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
7275
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
@@ -98,12 +101,13 @@ public void free(MemoryBlock memory) {
98101
long[] array = (long[]) memory.obj;
99102
memory.setObjAndOffset(null, 0);
100103

101-
if (shouldPool(size)) {
104+
long alignedSize = ((size + 7) / 8) * 8;
105+
if (shouldPool(alignedSize)) {
102106
synchronized (this) {
103-
LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(size);
107+
LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
104108
if (pool == null) {
105109
pool = new LinkedList<>();
106-
bufferPoolsBySize.put(size, pool);
110+
bufferPoolsBySize.put(alignedSize, pool);
107111
}
108112
pool.add(new WeakReference<>(array));
109113
}

common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.unsafe;
1919

20+
import org.apache.spark.unsafe.memory.HeapMemoryAllocator;
2021
import org.apache.spark.unsafe.memory.MemoryAllocator;
2122
import org.apache.spark.unsafe.memory.MemoryBlock;
2223

@@ -134,4 +135,25 @@ public void memoryDebugFillEnabledInTest() {
134135
MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
135136
MemoryAllocator.UNSAFE.free(offheap);
136137
}
138+
139+
@Test
140+
public void heapMemoryReuse() {
141+
MemoryAllocator heapMem = new HeapMemoryAllocator();
142+
// The size is less than `HeapMemoryAllocator.POOLING_THRESHOLD_BYTES`,allocate new memory every time.
143+
MemoryBlock onheap1 = heapMem.allocate(513);
144+
Object obj1 = onheap1.getBaseObject();
145+
heapMem.free(onheap1);
146+
MemoryBlock onheap2 = heapMem.allocate(514);
147+
Assert.assertNotEquals(obj1, onheap2.getBaseObject());
148+
149+
// The size is greater than `HeapMemoryAllocator.POOLING_THRESHOLD_BYTES`,
150+
// reuse the previous memory which has released.
151+
MemoryBlock onheap3 = heapMem.allocate(1024 * 1024 + 1);
152+
Assert.assertEquals(onheap3.size(), 1024 * 1024 + 1);
153+
Object obj3 = onheap3.getBaseObject();
154+
heapMem.free(onheap3);
155+
MemoryBlock onheap4 = heapMem.allocate(1024 * 1024 + 7);
156+
Assert.assertEquals(onheap4.size(), 1024 * 1024 + 7);
157+
Assert.assertEquals(obj3, onheap4.getBaseObject());
158+
}
137159
}

0 commit comments

Comments
 (0)