diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index dcae4a34c4b0b..87aff77ddb5e4 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -145,9 +145,7 @@ private UnsafeExternalSorter( // Use getSizeAsKb (not bytes) to maintain backwards compatibility for units // this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024 this.fileBufferSizeBytes = 32 * 1024; - // The spill metrics are stored in a new ShuffleWriteMetrics, - // and then discarded (this fixes SPARK-16827). - // TODO: Instead, separate spill metrics should be stored and reported (tracked in SPARK-3577). + // The spill metrics are stored in a new ShuffleWriteMetrics, and then used at the end of the spill methods this.writeMetrics = new ShuffleWriteMetrics(); if (existingInMemorySorter == null) { @@ -232,6 +230,7 @@ public long spill(long size, MemoryConsumer trigger) throws IOException { // pages, we might not be able to get memory for the pointer array. taskContext.taskMetrics().incMemoryBytesSpilled(spillSize); + taskContext.taskMetrics().incDiskBytesSpilled(writeMetrics.bytesWritten()); totalSpillBytes += spillSize; return spillSize; } @@ -546,6 +545,7 @@ public long spill() throws IOException { inMemSorter.free(); inMemSorter = null; taskContext.taskMetrics().incMemoryBytesSpilled(released); + taskContext.taskMetrics().incDiskBytesSpilled(writeMetrics.bytesWritten()); totalSpillBytes += released; return released; }