From db8647e268097358c5bf184a31e35e3404751f8d Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Tue, 24 Feb 2015 17:11:59 -0800 Subject: [PATCH 1/4] Added update for shuffleWriteTime around spilled file cleanup in ExternalSorter --- .../org/apache/spark/shuffle/sort/SortShuffleWriter.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 27496c5a289cb..9144a1c54c016 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -88,7 +88,12 @@ private[spark] class SortShuffleWriter[K, V, C]( } finally { // Clean up our sorter, which may have its own intermediate files if (sorter != null) { + val startTime = System.nanoTime() sorter.stop() + context.taskMetrics().shuffleWriteMetrics.getOrElse { + case Some(metrics : ShuffleWriteMetrics) => + metrics.incShuffleWriteTime(System.nanoTime()-startTime) + } sorter = null } } From b946d085c50e5e5072d7429fe8a634e5b401986c Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Tue, 10 Mar 2015 17:05:28 -0700 Subject: [PATCH 2/4] Fixed error with option --- .../org/apache/spark/shuffle/sort/SortShuffleWriter.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 9144a1c54c016..347d1a4e08179 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -90,9 +90,10 @@ private[spark] class SortShuffleWriter[K, V, C]( if (sorter != null) { val startTime = System.nanoTime() sorter.stop() - context.taskMetrics().shuffleWriteMetrics.getOrElse { + context.taskMetrics().shuffleWriteMetrics match { case Some(metrics : ShuffleWriteMetrics) => metrics.incShuffleWriteTime(System.nanoTime()-startTime) + case None => Nil } sorter = null } From 3e059b002e18da3241a8358ee5ccda723e25b1cc Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Wed, 11 Mar 2015 09:53:49 -0700 Subject: [PATCH 3/4] Switched to using getorelse --- .../org/apache/spark/shuffle/sort/SortShuffleWriter.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 347d1a4e08179..6d98d07d8c4a1 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -90,11 +90,10 @@ private[spark] class SortShuffleWriter[K, V, C]( if (sorter != null) { val startTime = System.nanoTime() sorter.stop() - context.taskMetrics().shuffleWriteMetrics match { - case Some(metrics : ShuffleWriteMetrics) => + context.taskMetrics().shuffleWriteMetrics.getOrElse({ + metrics : ShuffleWriteMetrics => metrics.incShuffleWriteTime(System.nanoTime()-startTime) - case None => Nil - } + },Nil) sorter = null } } From bfabf88935658246cd2bb30eb317c6992282098d Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Thu, 12 Mar 2015 16:02:17 -0700 Subject: [PATCH 4/4] Changed to using a foreach vs. getorelse --- .../org/apache/spark/shuffle/sort/SortShuffleWriter.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 6d98d07d8c4a1..fa2e617762f55 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -90,10 +90,8 @@ private[spark] class SortShuffleWriter[K, V, C]( if (sorter != null) { val startTime = System.nanoTime() sorter.stop() - context.taskMetrics().shuffleWriteMetrics.getOrElse({ - metrics : ShuffleWriteMetrics => - metrics.incShuffleWriteTime(System.nanoTime()-startTime) - },Nil) + context.taskMetrics.shuffleWriteMetrics.foreach( + _.incShuffleWriteTime(System.nanoTime - startTime)) sorter = null } }