diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 6a9677834de9b..964ab27a524c4 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -136,6 +136,12 @@ private[spark] class AppStatusStore( store.read(classOf[StageDataWrapper], Array(stageId, stageAttemptId)).locality } + // SPARK-26119: we only want to consider successful tasks when calculating the metrics summary, + // but currently this is very expensive when using a disk store. So we only trigger the slower + // code path when we know we have all data in memory. The following method checks whether all + // the data will be in memory. + private def isInMemoryStore: Boolean = store.isInstanceOf[InMemoryStore] || listener.isDefined + /** * Calculates a summary of the task metrics for the given stage attempt, returning the * requested quantiles for the recorded metrics. @@ -156,7 +162,8 @@ private[spark] class AppStatusStore( // cheaper for disk stores (avoids deserialization). val count = { Utils.tryWithResource( - if (store.isInstanceOf[InMemoryStore]) { + if (isInMemoryStore) { + // For Live UI, we should count the tasks with status "SUCCESS" only. store.view(classOf[TaskDataWrapper]) .parent(stageKey) .index(TaskIndexNames.STATUS) @@ -245,7 +252,7 @@ private[spark] class AppStatusStore( // and failed tasks differently (would be tricky). Also would require changing the disk store // version (to invalidate old stores). def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - if (store.isInstanceOf[InMemoryStore]) { + if (isInMemoryStore) { val quantileTasks = store.view(classOf[TaskDataWrapper]) .parent(stageKey) .index(index) diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala index 75a658161d3ff..165fdb71cc78b 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala @@ -17,8 +17,7 @@ package org.apache.spark.status -import org.apache.spark.SparkFunSuite -import org.apache.spark.status.api.v1.TaskMetricDistributions +import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.util.Distribution import org.apache.spark.util.kvstore._ @@ -77,14 +76,23 @@ class AppStatusStoreSuite extends SparkFunSuite { assert(store.count(classOf[CachedQuantile]) === 2) } - test("only successfull task have taskSummary") { + private def createLiveStore(inMemoryStore: InMemoryStore): AppStatusStore = { + val conf = new SparkConf() + val store = new ElementTrackingStore(inMemoryStore, conf) + val listener = new AppStatusListener(store, conf, true, None) + new AppStatusStore(store, listener = Some(listener)) + } + + test("SPARK-28638: only successful tasks have taskSummary when with in memory kvstore") { val store = new InMemoryStore() (0 until 5).foreach { i => store.write(newTaskData(i, status = "FAILED")) } - val appStore = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles) - assert(appStore.size === 0) + Seq(new AppStatusStore(store), createLiveStore(store)).foreach { appStore => + val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles) + assert(summary.size === 0) + } } - test("summary should contain task metrics of only successfull tasks") { + test("SPARK-28638: summary should contain successful tasks only when with in memory kvstore") { val store = new InMemoryStore() for (i <- 0 to 5) { @@ -95,13 +103,15 @@ class AppStatusStoreSuite extends SparkFunSuite { } } - val summary = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles).get + Seq(new AppStatusStore(store), createLiveStore(store)).foreach { appStore => + val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles).get - val values = Array(0.0, 2.0, 4.0) + val values = Array(0.0, 2.0, 4.0) - val dist = new Distribution(values, 0, values.length).getQuantiles(uiQuantiles.sorted) - dist.zip(summary.executorRunTime).foreach { case (expected, actual) => - assert(expected === actual) + val dist = new Distribution(values, 0, values.length).getQuantiles(uiQuantiles.sorted) + dist.zip(summary.executorRunTime).foreach { case (expected, actual) => + assert(expected === actual) + } } }