Skip to content

Commit cda4b2a

Browse files
author
Davies Liu
committed
address more comments
1 parent afc8c7c commit cda4b2a

File tree

4 files changed

+30
-32
lines changed

4 files changed

+30
-32
lines changed

core/src/main/java/org/apache/spark/memory/MemoryConsumer.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,6 @@ public abstract class MemoryConsumer {
3434

3535
protected MemoryConsumer(TaskMemoryManager taskMemoryManager, long pageSize) {
3636
this.taskMemoryManager = taskMemoryManager;
37-
if (pageSize == 0) {
38-
pageSize = taskMemoryManager.pageSizeBytes();
39-
}
4037
this.pageSize = pageSize;
4138
this.used = 0;
4239
}
@@ -95,8 +92,8 @@ protected void acquireMemory(long size) {
9592
* Release `size` bytes memory.
9693
*/
9794
protected void releaseMemory(long size) {
98-
taskMemoryManager.releaseExecutionMemory(size, this);
9995
used -= size;
96+
taskMemoryManager.releaseExecutionMemory(size, this);
10097
}
10198

10299
/**
@@ -125,7 +122,7 @@ protected MemoryBlock allocatePage(long required) {
125122
* Free a memory block.
126123
*/
127124
protected void freePage(MemoryBlock page) {
128-
taskMemoryManager.freePage(page, this);
129125
used -= page.size();
126+
taskMemoryManager.freePage(page, this);
130127
}
131128
}

core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.memory;
1919

20+
import javax.annotation.concurrent.GuardedBy;
2021
import java.io.IOException;
2122
import java.util.Arrays;
2223
import java.util.BitSet;
@@ -107,6 +108,7 @@ public class TaskMemoryManager {
107108
/**
108109
* The size of memory granted to each consumer.
109110
*/
111+
@GuardedBy("this")
110112
private final HashSet<MemoryConsumer> consumers;
111113

112114
/**

core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,6 @@
2323
import java.util.Iterator;
2424
import java.util.LinkedList;
2525

26-
import scala.runtime.AbstractFunction0;
27-
import scala.runtime.BoxedUnit;
28-
2926
import com.google.common.annotations.VisibleForTesting;
3027
import org.slf4j.Logger;
3128
import org.slf4j.LoggerFactory;
@@ -43,6 +40,7 @@
4340
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
4441
import org.apache.spark.unsafe.memory.MemoryBlock;
4542
import org.apache.spark.unsafe.memory.MemoryLocation;
43+
import org.apache.spark.util.TaskCompletionListener;
4644
import org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader;
4745
import org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter;
4846

@@ -349,33 +347,35 @@ public long spill(long numBytes) throws IOException {
349347
offset += 4;
350348
final UnsafeSorterSpillWriter writer =
351349
new UnsafeSorterSpillWriter(blockManager, 32 * 1024, writeMetrics, numRecords);
352-
while (numRecords-- > 0) {
350+
while (numRecords > 0) {
353351
int length = Platform.getInt(base, offset);
354352
writer.write(base, offset + 4, length, 0);
355353
offset += 4 + length;
354+
numRecords--;
356355
}
357356
writer.close();
358357
spillWriters.add(writer);
359358
if (TaskContext.get() != null) {
360-
TaskContext.get().addOnCompleteCallback(new AbstractFunction0<BoxedUnit>() {
361-
@Override
362-
public BoxedUnit apply() {
363-
File file = writer.getFile();
364-
if (file != null && file.exists()) {
365-
if (!file.delete()) {
366-
logger.error("Was unable to delete spill file {}", file.getAbsolutePath());
359+
TaskContext.get().addTaskCompletionListener(
360+
new TaskCompletionListener() {
361+
@Override
362+
public void onTaskCompletion(TaskContext context) {
363+
File file = writer.getFile();
364+
if (file != null && file.exists()) {
365+
if (!file.delete()) {
366+
logger.error("Was unable to delete spill file {}", file.getAbsolutePath());
367+
}
367368
}
368369
}
369-
return null;
370370
}
371-
});
371+
);
372372
}
373373

374374
dataPages.removeLast();
375-
freePage(block);
376375
released += block.size();
376+
freePage(block);
377377

378-
if (released > numBytes) {
378+
if (released >= numBytes) {
379379
break;
380380
}
381381
}
@@ -545,8 +545,8 @@ private Location with(MemoryBlock page, long offsetInPage) {
545545
}
546546

547547
/**
548-
+ * This is only used for spilling
549-
+ */
548+
* This is only used for spilling
549+
*/
550550
private Location with(Object base, long offset, int length) {
551551
this.isDefined = true;
552552
this.memoryPage = null;

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,6 @@
2222
import java.io.IOException;
2323
import java.util.LinkedList;
2424

25-
import scala.runtime.AbstractFunction0;
26-
import scala.runtime.BoxedUnit;
27-
2825
import com.google.common.annotations.VisibleForTesting;
2926
import org.slf4j.Logger;
3027
import org.slf4j.LoggerFactory;
@@ -36,6 +33,7 @@
3633
import org.apache.spark.storage.BlockManager;
3734
import org.apache.spark.unsafe.Platform;
3835
import org.apache.spark.unsafe.memory.MemoryBlock;
36+
import org.apache.spark.util.TaskCompletionListener;
3937
import org.apache.spark.util.Utils;
4038

4139
/**
@@ -133,13 +131,14 @@ private UnsafeExternalSorter(
133131
// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
134132
// the end of the task. This is necessary to avoid memory leaks in when the downstream operator
135133
// does not fully consume the sorter's output (e.g. sort followed by limit).
136-
taskContext.addOnCompleteCallback(new AbstractFunction0<BoxedUnit>() {
137-
@Override
138-
public BoxedUnit apply() {
139-
cleanupResources();
140-
return null;
134+
TaskContext.get().addTaskCompletionListener(
135+
new TaskCompletionListener() {
136+
@Override
137+
public void onTaskCompletion(TaskContext context) {
138+
cleanupResources();
139+
}
141140
}
142-
});
141+
);
143142
}
144143

145144
/**
@@ -244,8 +243,8 @@ private long freeMemory() {
244243
updatePeakMemoryUsed();
245244
long memoryFreed = 0;
246245
for (MemoryBlock block : allocatedPages) {
247-
freePage(block);
248246
memoryFreed += block.size();
247+
freePage(block);
249248
}
250249
allocatedPages.clear();
251250
currentPage = null;

0 commit comments

Comments
 (0)