Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -844,18 +844,21 @@ public LongArray getArray() {
}

/**
* Reset this map to initialized state.
* Resets this map to initialized state. Returns the number of bytes freed.
*/
public void reset() {
public long reset() {
long memoryFreed = 0L;
numElements = 0;
longArray.zeroOut();

while (dataPages.size() > 0) {
MemoryBlock dataPage = dataPages.removeLast();
memoryFreed += dataPage.size();
freePage(dataPage);
}
currentPage = null;
pageCursor = 0;
return memoryFreed;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
private final TaskMemoryManager taskMemoryManager;
private final BlockManager blockManager;
private final TaskContext taskContext;
private ShuffleWriteMetrics writeMetrics;

/** The buffer size to use when writing spills using DiskBlockObjectWriter */
private final int fileBufferSizeBytes;
Expand Down Expand Up @@ -122,7 +121,6 @@ private UnsafeExternalSorter(
// Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
// this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.fileBufferSizeBytes = 32 * 1024;
this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics();

if (existingInMemorySorter == null) {
this.inMemSorter = new UnsafeInMemorySorter(
Expand Down Expand Up @@ -178,6 +176,7 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
spillWriters.size(),
spillWriters.size() > 1 ? " times" : " time");

final ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
// We only write out contents of the inMemSorter if it is not empty.
if (inMemSorter.numRecords() > 0) {
final UnsafeSorterSpillWriter spillWriter =
Expand All @@ -202,6 +201,7 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
// pages will currently be counted as memory spilled even though that space isn't actually
// written to disk. This also counts the space needed to store the sorter's pointer array.
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
taskContext.taskMetrics().incDiskBytesSpilled(writeMetrics.shuffleBytesWritten());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Sql aggregation, the spillSize here is 0 because the data are stored in a map instead of this sorter. So incMemoryBytesSpilled(spillSize) actually increase 0. We need update the MemoryBytesSpilled after freeing the memory in the map.


return spillSize;
}
Expand Down Expand Up @@ -448,6 +448,7 @@ public long spill() throws IOException {
UnsafeInMemorySorter.SortedIterator inMemIterator =
((UnsafeInMemorySorter.SortedIterator) upstream).clone();

final ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
// Iterate over the records that have not been returned and spill them.
final UnsafeSorterSpillWriter spillWriter =
new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics, numRecords);
Expand Down Expand Up @@ -478,6 +479,13 @@ public long spill() throws IOException {
allocatedPages.clear();
}

long spillSize = released;
if (lastPage != null) {
spillSize += lastPage.size();
}
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
taskContext.taskMetrics().incDiskBytesSpilled(writeMetrics.shuffleBytesWritten());

// in-memory sorter will not be used after spilling
assert(inMemSorter != null);
released += inMemSorter.getMemoryUsage();
Expand Down
3 changes: 3 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ object MimaExcludes {
) ++ Seq(
// SPARK-12510 Refactor ActorReceiver to support Java
ProblemFilters.exclude[AbstractClassProblem]("org.apache.spark.streaming.receiver.ActorReceiver")
) ++ Seq(
// SPARK-12688 Fix spill size metric in unsafe external sorter
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.unsafe.map.BytesToBytesMap.reset")
) ++ Seq(
// SPARK-12895 Implement TaskMetrics using accumulators
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.TaskContext.internalMetricsToAccumulators"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,12 @@ public UnsafeKVExternalSorter(
inMemSorter);

// reset the map, so we can re-use it to insert new records. the inMemSorter will not used
// anymore, so the underline array could be used by map again.
map.reset();
// anymore, so the underline array could be used by map again. When the sorter spills, it
// increases zero to the number of in-memory bytes spilled because the records are stored
// in the map instead of the sorter. So we need update the metric after resetting the map
// here.
final long spillSize = map.reset();
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to my understanding of spill metrics, a spill needs to update both memoryBytesSpilled and diskBytesSpilled. The memory spill is the in-memory size of the data being spilled, while the disk spill records the size of that data after it has been serialized and written to disk. As a result, I think that there must be a corresponding incDiskBytesSpilled call somewhere. I'm thinking that this memory spill metric should be updated closer to the site of where we increment the disk bytes spilled rather than here, since I think doing it that way would make it easier to reason about whether we're double-counting.

If this does turn out to be the right place for this spill, it would be great to add a code comment explaining the rationale for why this call must be here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping @carsonwang, do you plan to update this PR to address my comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay, @JoshRosen . I will update this soon.

}
}

Expand Down