From d0dbded1eba651923b9b197de546c267691c3051 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 6 Aug 2019 23:40:34 +0800 Subject: [PATCH 1/5] fix --- .../main/scala/org/apache/spark/status/AppStatusStore.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 6a9677834de9b..5a3a3da335234 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -156,7 +156,8 @@ private[spark] class AppStatusStore( // cheaper for disk stores (avoids deserialization). val count = { Utils.tryWithResource( - if (store.isInstanceOf[InMemoryStore]) { + if (store.isInstanceOf[ElementTrackingStore]) { + // For Live UI, we should count the tasks with status "SUCCESS" only. store.view(classOf[TaskDataWrapper]) .parent(stageKey) .index(TaskIndexNames.STATUS) @@ -245,7 +246,7 @@ private[spark] class AppStatusStore( // and failed tasks differently (would be tricky). Also would require changing the disk store // version (to invalidate old stores). def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - if (store.isInstanceOf[InMemoryStore]) { + if (store.isInstanceOf[ElementTrackingStore]) { val quantileTasks = store.view(classOf[TaskDataWrapper]) .parent(stageKey) .index(index) From 18bde1c295a137355ddc59fe5e54ce69625274c7 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 7 Aug 2019 15:03:53 +0800 Subject: [PATCH 2/5] address comment --- .../main/scala/org/apache/spark/status/AppStatusStore.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 5a3a3da335234..192f598dde685 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -136,6 +136,8 @@ private[spark] class AppStatusStore( store.read(classOf[StageDataWrapper], Array(stageId, stageAttemptId)).locality } + private def isLiveUI: Boolean = listener.isDefined + /** * Calculates a summary of the task metrics for the given stage attempt, returning the * requested quantiles for the recorded metrics. @@ -156,7 +158,7 @@ private[spark] class AppStatusStore( // cheaper for disk stores (avoids deserialization). val count = { Utils.tryWithResource( - if (store.isInstanceOf[ElementTrackingStore]) { + if (isLiveUI) { // For Live UI, we should count the tasks with status "SUCCESS" only. store.view(classOf[TaskDataWrapper]) .parent(stageKey) @@ -246,7 +248,7 @@ private[spark] class AppStatusStore( // and failed tasks differently (would be tricky). Also would require changing the disk store // version (to invalidate old stores). def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - if (store.isInstanceOf[ElementTrackingStore]) { + if (isLiveUI) { val quantileTasks = store.view(classOf[TaskDataWrapper]) .parent(stageKey) .index(index) From d19c232255f254a88e8cfcd83da53949d018f878 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 7 Aug 2019 16:35:23 +0800 Subject: [PATCH 3/5] fix test failure --- .../scala/org/apache/spark/status/AppStatusStore.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 192f598dde685..b58684f12f10d 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -136,7 +136,11 @@ private[spark] class AppStatusStore( store.read(classOf[StageDataWrapper], Array(stageId, stageAttemptId)).locality } - private def isLiveUI: Boolean = listener.isDefined + private def isInMemoryStore: Boolean = store match { + case _: InMemoryStore => true + case _: ElementTrackingStore => true // Live UI + case _ => false + } /** * Calculates a summary of the task metrics for the given stage attempt, returning the @@ -158,7 +162,7 @@ private[spark] class AppStatusStore( // cheaper for disk stores (avoids deserialization). val count = { Utils.tryWithResource( - if (isLiveUI) { + if (isInMemoryStore) { // For Live UI, we should count the tasks with status "SUCCESS" only. store.view(classOf[TaskDataWrapper]) .parent(stageKey) @@ -248,7 +252,7 @@ private[spark] class AppStatusStore( // and failed tasks differently (would be tricky). Also would require changing the disk store // version (to invalidate old stores). def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - if (isLiveUI) { + if (isInMemoryStore) { val quantileTasks = store.view(classOf[TaskDataWrapper]) .parent(stageKey) .index(index) From a23f1f55175df2fd48eb4f94ac1b031ac8f2d2d9 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 8 Aug 2019 01:29:59 +0800 Subject: [PATCH 4/5] address comment --- .../main/scala/org/apache/spark/status/AppStatusStore.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index b58684f12f10d..04be1a1e68871 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -136,11 +136,7 @@ private[spark] class AppStatusStore( store.read(classOf[StageDataWrapper], Array(stageId, stageAttemptId)).locality } - private def isInMemoryStore: Boolean = store match { - case _: InMemoryStore => true - case _: ElementTrackingStore => true // Live UI - case _ => false - } + private def isInMemoryStore: Boolean = store.isInstanceOf[InMemoryStore] || listener.isDefined /** * Calculates a summary of the task metrics for the given stage attempt, returning the From 08eca6e7909098ae2cabac6d593c41cdf5a7aee0 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 12 Aug 2019 01:03:23 +0800 Subject: [PATCH 5/5] address comments --- .../apache/spark/status/AppStatusStore.scala | 4 +++ .../spark/status/AppStatusStoreSuite.scala | 32 ++++++++++++------- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 04be1a1e68871..964ab27a524c4 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -136,6 +136,10 @@ private[spark] class AppStatusStore( store.read(classOf[StageDataWrapper], Array(stageId, stageAttemptId)).locality } + // SPARK-26119: we only want to consider successful tasks when calculating the metrics summary, + // but currently this is very expensive when using a disk store. So we only trigger the slower + // code path when we know we have all data in memory. The following method checks whether all + // the data will be in memory. private def isInMemoryStore: Boolean = store.isInstanceOf[InMemoryStore] || listener.isDefined /** diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala index 75a658161d3ff..165fdb71cc78b 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala @@ -17,8 +17,7 @@ package org.apache.spark.status -import org.apache.spark.SparkFunSuite -import org.apache.spark.status.api.v1.TaskMetricDistributions +import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.util.Distribution import org.apache.spark.util.kvstore._ @@ -77,14 +76,23 @@ class AppStatusStoreSuite extends SparkFunSuite { assert(store.count(classOf[CachedQuantile]) === 2) } - test("only successfull task have taskSummary") { + private def createLiveStore(inMemoryStore: InMemoryStore): AppStatusStore = { + val conf = new SparkConf() + val store = new ElementTrackingStore(inMemoryStore, conf) + val listener = new AppStatusListener(store, conf, true, None) + new AppStatusStore(store, listener = Some(listener)) + } + + test("SPARK-28638: only successful tasks have taskSummary when with in memory kvstore") { val store = new InMemoryStore() (0 until 5).foreach { i => store.write(newTaskData(i, status = "FAILED")) } - val appStore = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles) - assert(appStore.size === 0) + Seq(new AppStatusStore(store), createLiveStore(store)).foreach { appStore => + val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles) + assert(summary.size === 0) + } } - test("summary should contain task metrics of only successfull tasks") { + test("SPARK-28638: summary should contain successful tasks only when with in memory kvstore") { val store = new InMemoryStore() for (i <- 0 to 5) { @@ -95,13 +103,15 @@ class AppStatusStoreSuite extends SparkFunSuite { } } - val summary = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles).get + Seq(new AppStatusStore(store), createLiveStore(store)).foreach { appStore => + val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles).get - val values = Array(0.0, 2.0, 4.0) + val values = Array(0.0, 2.0, 4.0) - val dist = new Distribution(values, 0, values.length).getQuantiles(uiQuantiles.sorted) - dist.zip(summary.executorRunTime).foreach { case (expected, actual) => - assert(expected === actual) + val dist = new Distribution(values, 0, values.length).getQuantiles(uiQuantiles.sorted) + dist.zip(summary.executorRunTime).foreach { case (expected, actual) => + assert(expected === actual) + } } }