From 2fd3ab6b6f66d50d832fe7d33a9187764a64fd72 Mon Sep 17 00:00:00 2001 From: Carson Wang Date: Thu, 7 Jan 2016 14:57:49 +0800 Subject: [PATCH 1/5] Fix spill size metric in unsafe external sorter --- .../java/org/apache/spark/unsafe/map/BytesToBytesMap.java | 7 +++++-- .../apache/spark/sql/execution/UnsafeKVExternalSorter.java | 3 ++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 3387f9a4177c..a29297fdc911 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -844,18 +844,21 @@ public LongArray getArray() { } /** - * Reset this map to initialized state. + * Resets this map to initialized state. Returns the memory freed. */ - public void reset() { + public long reset() { + long memoryFreed = 0L; numElements = 0; longArray.zeroOut(); while (dataPages.size() > 0) { MemoryBlock dataPage = dataPages.removeLast(); + memoryFreed += dataPage.size(); freePage(dataPage); } currentPage = null; pageCursor = 0; + return memoryFreed; } /** diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java index 0da26bf376a6..22bc529ceefb 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java @@ -125,7 +125,8 @@ public UnsafeKVExternalSorter( // reset the map, so we can re-use it to insert new records. the inMemSorter will not used // anymore, so the underline array could be used by map again. - map.reset(); + final long spillSize = map.reset(); + taskContext.taskMetrics().incMemoryBytesSpilled(spillSize); } } From 81e3227ff9474c3c92333a7f2bcd730a20c8a975 Mon Sep 17 00:00:00 2001 From: Carson Wang Date: Thu, 7 Jan 2016 15:23:06 +0800 Subject: [PATCH 2/5] Update the comment --- .../main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index a29297fdc911..cc44448c437e 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -844,7 +844,7 @@ public LongArray getArray() { } /** - * Resets this map to initialized state. Returns the memory freed. + * Resets this map to initialized state. Returns the number of bytes freed. */ public long reset() { long memoryFreed = 0L; From be6748810119bf1c04763c3008f0c02bfbc7017a Mon Sep 17 00:00:00 2001 From: Carson Wang Date: Thu, 7 Jan 2016 16:33:27 +0800 Subject: [PATCH 3/5] Fix mima error --- project/MimaExcludes.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 43ca4690dc2b..02a97933d0d8 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -119,6 +119,10 @@ object MimaExcludes { ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.util.Vector"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.util.Vector$Multiplier"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.util.Vector$") + ) ++ + Seq( + // SPARK-12688 Fix spill size metric in unsafe external sorter + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.unsafe.map.BytesToBytesMap.reset") ) case v if v.startsWith("1.6") => Seq( From d1e9f7da12f68bb9c779aae74b2a2f799fe53d70 Mon Sep 17 00:00:00 2001 From: Carson Wang Date: Mon, 25 Jan 2016 11:20:11 +0800 Subject: [PATCH 4/5] Update the diskBytesSpilled metric --- .../unsafe/sort/UnsafeExternalSorter.java | 14 ++++++++++---- .../sql/execution/UnsafeKVExternalSorter.java | 5 ++++- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 68dc0c6d415f..57213d7a445c 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -52,7 +52,6 @@ public final class UnsafeExternalSorter extends MemoryConsumer { private final TaskMemoryManager taskMemoryManager; private final BlockManager blockManager; private final TaskContext taskContext; - private ShuffleWriteMetrics writeMetrics; /** The buffer size to use when writing spills using DiskBlockObjectWriter */ private final int fileBufferSizeBytes; @@ -122,9 +121,6 @@ private UnsafeExternalSorter( // Use getSizeAsKb (not bytes) to maintain backwards compatibility for units // this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; this.fileBufferSizeBytes = 32 * 1024; - // TODO: metrics tracking + integration with shuffle write metrics - // need to connect the write metrics to task metrics so we count the spill IO somewhere. - this.writeMetrics = new ShuffleWriteMetrics(); if (existingInMemorySorter == null) { this.inMemSorter = new UnsafeInMemorySorter( @@ -180,6 +176,7 @@ public long spill(long size, MemoryConsumer trigger) throws IOException { spillWriters.size(), spillWriters.size() > 1 ? " times" : " time"); + final ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics(); // We only write out contents of the inMemSorter if it is not empty. if (inMemSorter.numRecords() > 0) { final UnsafeSorterSpillWriter spillWriter = @@ -204,6 +201,7 @@ public long spill(long size, MemoryConsumer trigger) throws IOException { // pages will currently be counted as memory spilled even though that space isn't actually // written to disk. This also counts the space needed to store the sorter's pointer array. taskContext.taskMetrics().incMemoryBytesSpilled(spillSize); + taskContext.taskMetrics().incDiskBytesSpilled(writeMetrics.shuffleBytesWritten()); return spillSize; } @@ -450,6 +448,7 @@ public long spill() throws IOException { UnsafeInMemorySorter.SortedIterator inMemIterator = ((UnsafeInMemorySorter.SortedIterator) upstream).clone(); + final ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics(); // Iterate over the records that have not been returned and spill them. final UnsafeSorterSpillWriter spillWriter = new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics, numRecords); @@ -480,6 +479,13 @@ public long spill() throws IOException { allocatedPages.clear(); } + long spillSize = 0L; + if (lastPage != null) { + spillSize += lastPage.size(); + } + taskContext.taskMetrics().incMemoryBytesSpilled(spillSize); + taskContext.taskMetrics().incDiskBytesSpilled(writeMetrics.shuffleBytesWritten()); + // in-memory sorter will not be used after spilling assert(inMemSorter != null); released += inMemSorter.getMemoryUsage(); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java index 22bc529ceefb..584d26ee7997 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java @@ -124,7 +124,10 @@ public UnsafeKVExternalSorter( inMemSorter); // reset the map, so we can re-use it to insert new records. the inMemSorter will not used - // anymore, so the underline array could be used by map again. + // anymore, so the underline array could be used by map again. When the sorter spills, it + // increases zero to the number of in-memory bytes spilled because the records are stored + // in the map instead of the sorter. So we need update the metric after resetting the map + // here. final long spillSize = map.reset(); taskContext.taskMetrics().incMemoryBytesSpilled(spillSize); } From d689873d70efb820e640e7c17b0225d9fa830c2c Mon Sep 17 00:00:00 2001 From: Carson Wang Date: Mon, 25 Jan 2016 15:26:11 +0800 Subject: [PATCH 5/5] Fix spillsize value --- .../spark/util/collection/unsafe/sort/UnsafeExternalSorter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 57213d7a445c..77f95139e400 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -479,7 +479,7 @@ public long spill() throws IOException { allocatedPages.clear(); } - long spillSize = 0L; + long spillSize = released; if (lastPage != null) { spillSize += lastPage.size(); }