Skip to content

Commit 0155693

Browse files
Davies LiuJoshRosen
authored andcommitted
[SPARK-11805] free the array in UnsafeExternalSorter during spilling
After calling spill() on SortedIterator, the array inside InMemorySorter is not needed, it should be freed during spilling, this could help to join multiple tables with limited memory. Author: Davies Liu <[email protected]> Closes #9793 from davies/free_array. (cherry picked from commit 58d9b26) Signed-off-by: Josh Rosen <[email protected]>
1 parent 3f40af5 commit 0155693

File tree

2 files changed

+19
-22
lines changed

2 files changed

+19
-22
lines changed

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,12 @@ public long spill() throws IOException {
468468
}
469469
allocatedPages.clear();
470470
}
471+
472+
// in-memory sorter will not be used after spilling
473+
assert(inMemSorter != null);
474+
released += inMemSorter.getMemoryUsage();
475+
inMemSorter.free();
476+
inMemSorter = null;
471477
return released;
472478
}
473479
}
@@ -489,10 +495,6 @@ public void loadNext() throws IOException {
489495
}
490496
upstream = nextUpstream;
491497
nextUpstream = null;
492-
493-
assert(inMemSorter != null);
494-
inMemSorter.free();
495-
inMemSorter = null;
496498
}
497499
numRecords--;
498500
upstream.loadNext();

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ public UnsafeInMemorySorter(
108108
*/
109109
public void free() {
110110
consumer.freeArray(array);
111+
array = null;
111112
}
112113

113114
public void reset() {
@@ -160,28 +161,22 @@ public void insertRecord(long recordPointer, long keyPrefix) {
160161
pos++;
161162
}
162163

163-
public static final class SortedIterator extends UnsafeSorterIterator {
164+
public final class SortedIterator extends UnsafeSorterIterator {
164165

165-
private final TaskMemoryManager memoryManager;
166-
private final int sortBufferInsertPosition;
167-
private final LongArray sortBuffer;
168-
private int position = 0;
166+
private final int numRecords;
167+
private int position;
169168
private Object baseObject;
170169
private long baseOffset;
171170
private long keyPrefix;
172171
private int recordLength;
173172

174-
private SortedIterator(
175-
TaskMemoryManager memoryManager,
176-
int sortBufferInsertPosition,
177-
LongArray sortBuffer) {
178-
this.memoryManager = memoryManager;
179-
this.sortBufferInsertPosition = sortBufferInsertPosition;
180-
this.sortBuffer = sortBuffer;
173+
private SortedIterator(int numRecords) {
174+
this.numRecords = numRecords;
175+
this.position = 0;
181176
}
182177

183178
public SortedIterator clone () {
184-
SortedIterator iter = new SortedIterator(memoryManager, sortBufferInsertPosition, sortBuffer);
179+
SortedIterator iter = new SortedIterator(numRecords);
185180
iter.position = position;
186181
iter.baseObject = baseObject;
187182
iter.baseOffset = baseOffset;
@@ -192,21 +187,21 @@ public SortedIterator clone () {
192187

193188
@Override
194189
public boolean hasNext() {
195-
return position < sortBufferInsertPosition;
190+
return position / 2 < numRecords;
196191
}
197192

198193
public int numRecordsLeft() {
199-
return (sortBufferInsertPosition - position) / 2;
194+
return numRecords - position / 2;
200195
}
201196

202197
@Override
203198
public void loadNext() {
204199
// This pointer points to a 4-byte record length, followed by the record's bytes
205-
final long recordPointer = sortBuffer.get(position);
200+
final long recordPointer = array.get(position);
206201
baseObject = memoryManager.getPage(recordPointer);
207202
baseOffset = memoryManager.getOffsetInPage(recordPointer) + 4; // Skip over record length
208203
recordLength = Platform.getInt(baseObject, baseOffset - 4);
209-
keyPrefix = sortBuffer.get(position + 1);
204+
keyPrefix = array.get(position + 1);
210205
position += 2;
211206
}
212207

@@ -229,6 +224,6 @@ public void loadNext() {
229224
*/
230225
public SortedIterator getSortedIterator() {
231226
sorter.sort(array, 0, pos / 2, sortComparator);
232-
return new SortedIterator(memoryManager, pos, array);
227+
return new SortedIterator(pos / 2);
233228
}
234229
}

0 commit comments

Comments
 (0)