diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 3387f9a4177ce..cc44448c437e6 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -844,18 +844,21 @@ public LongArray getArray() { } /** - * Reset this map to initialized state. + * Resets this map to initialized state. Returns the number of bytes freed. */ - public void reset() { + public long reset() { + long memoryFreed = 0L; numElements = 0; longArray.zeroOut(); while (dataPages.size() > 0) { MemoryBlock dataPage = dataPages.removeLast(); + memoryFreed += dataPage.size(); freePage(dataPage); } currentPage = null; pageCursor = 0; + return memoryFreed; } /** diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index a6edc1ad3f665..77f95139e400e 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -52,7 +52,6 @@ public final class UnsafeExternalSorter extends MemoryConsumer { private final TaskMemoryManager taskMemoryManager; private final BlockManager blockManager; private final TaskContext taskContext; - private ShuffleWriteMetrics writeMetrics; /** The buffer size to use when writing spills using DiskBlockObjectWriter */ private final int fileBufferSizeBytes; @@ -122,7 +121,6 @@ private UnsafeExternalSorter( // Use getSizeAsKb (not bytes) to maintain backwards compatibility for units // this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; this.fileBufferSizeBytes = 32 * 1024; - this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics(); if (existingInMemorySorter == null) { this.inMemSorter = new UnsafeInMemorySorter( @@ -178,6 +176,7 @@ public long spill(long size, MemoryConsumer trigger) throws IOException { spillWriters.size(), spillWriters.size() > 1 ? " times" : " time"); + final ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics(); // We only write out contents of the inMemSorter if it is not empty. if (inMemSorter.numRecords() > 0) { final UnsafeSorterSpillWriter spillWriter = @@ -202,6 +201,7 @@ public long spill(long size, MemoryConsumer trigger) throws IOException { // pages will currently be counted as memory spilled even though that space isn't actually // written to disk. This also counts the space needed to store the sorter's pointer array. taskContext.taskMetrics().incMemoryBytesSpilled(spillSize); + taskContext.taskMetrics().incDiskBytesSpilled(writeMetrics.shuffleBytesWritten()); return spillSize; } @@ -448,6 +448,7 @@ public long spill() throws IOException { UnsafeInMemorySorter.SortedIterator inMemIterator = ((UnsafeInMemorySorter.SortedIterator) upstream).clone(); + final ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics(); // Iterate over the records that have not been returned and spill them. final UnsafeSorterSpillWriter spillWriter = new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics, numRecords); @@ -478,6 +479,13 @@ public long spill() throws IOException { allocatedPages.clear(); } + long spillSize = released; + if (lastPage != null) { + spillSize += lastPage.size(); + } + taskContext.taskMetrics().incMemoryBytesSpilled(spillSize); + taskContext.taskMetrics().incDiskBytesSpilled(writeMetrics.shuffleBytesWritten()); + // in-memory sorter will not be used after spilling assert(inMemSorter != null); released += inMemSorter.getMemoryUsage(); diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 8b1a7303fc5b2..966ac6482d4e3 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -179,6 +179,9 @@ object MimaExcludes { ) ++ Seq( // SPARK-12510 Refactor ActorReceiver to support Java ProblemFilters.exclude[AbstractClassProblem]("org.apache.spark.streaming.receiver.ActorReceiver") + ) ++ Seq( + // SPARK-12688 Fix spill size metric in unsafe external sorter + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.unsafe.map.BytesToBytesMap.reset") ) ++ Seq( // SPARK-12895 Implement TaskMetrics using accumulators ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.TaskContext.internalMetricsToAccumulators"), diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java index 0da26bf376a6a..584d26ee79974 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java @@ -124,8 +124,12 @@ public UnsafeKVExternalSorter( inMemSorter); // reset the map, so we can re-use it to insert new records. the inMemSorter will not used - // anymore, so the underline array could be used by map again. - map.reset(); + // anymore, so the underline array could be used by map again. When the sorter spills, it + // increases zero to the number of in-memory bytes spilled because the records are stored + // in the map instead of the sorter. So we need update the metric after resetting the map + // here. + final long spillSize = map.reset(); + taskContext.taskMetrics().incMemoryBytesSpilled(spillSize); } }