From df3e56b3f3f653fb8b15d8c62662a8d4e4c19501 Mon Sep 17 00:00:00 2001 From: shahid Date: Thu, 14 Nov 2019 02:06:02 +0530 Subject: [PATCH 01/14] [SPARK-26260][Core]Tasks summary table should show only successful tasks metrics for disk store --- .../apache/spark/status/AppStatusStore.scala | 136 ++++++---------- .../org/apache/spark/status/storeTypes.scala | 150 ++++++++++++++++++ .../spark/status/AppStatusStoreSuite.scala | 41 ++++- 3 files changed, 241 insertions(+), 86 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 964ab27a524c4..155e52fa00728 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -136,12 +136,6 @@ private[spark] class AppStatusStore( store.read(classOf[StageDataWrapper], Array(stageId, stageAttemptId)).locality } - // SPARK-26119: we only want to consider successful tasks when calculating the metrics summary, - // but currently this is very expensive when using a disk store. So we only trigger the slower - // code path when we know we have all data in memory. The following method checks whether all - // the data will be in memory. - private def isInMemoryStore: Boolean = store.isInstanceOf[InMemoryStore] || listener.isDefined - /** * Calculates a summary of the task metrics for the given stage attempt, returning the * requested quantiles for the recorded metrics. @@ -162,21 +156,11 @@ private[spark] class AppStatusStore( // cheaper for disk stores (avoids deserialization). val count = { Utils.tryWithResource( - if (isInMemoryStore) { - // For Live UI, we should count the tasks with status "SUCCESS" only. - store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(TaskIndexNames.STATUS) - .first("SUCCESS") - .last("SUCCESS") - .closeableIterator() - } else { - store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(TaskIndexNames.EXEC_RUN_TIME) - .first(0L) - .closeableIterator() - } + store.view(classOf[TaskDataWrapper]) + .parent(stageKey) + .index(SuccessTaskIndexNames.EXEC_RUN_TIME) + .first(0L) + .closeableIterator() ) { it => var _count = 0L while (it.hasNext()) { @@ -245,100 +229,82 @@ private[spark] class AppStatusStore( // stabilize once the stage finishes. It's also slow, especially with disk stores. val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } - // TODO: Summary metrics needs to display all the successful tasks' metrics (SPARK-26119). - // For InMemory case, it is efficient to find using the following code. But for diskStore case - // we need an efficient solution to avoid deserialization time overhead. For that, we need to - // rework on the way indexing works, so that we can index by specific metrics for successful - // and failed tasks differently (would be tricky). Also would require changing the disk store - // version (to invalidate old stores). def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = { - if (isInMemoryStore) { - val quantileTasks = store.view(classOf[TaskDataWrapper]) + Utils.tryWithResource( + store.view(classOf[TaskDataWrapper]) .parent(stageKey) .index(index) .first(0L) - .asScala - .filter { _.status == "SUCCESS"} // Filter "SUCCESS" tasks - .toIndexedSeq - - indices.map { index => - fn(quantileTasks(index.toInt)).toDouble - }.toIndexedSeq - } else { - Utils.tryWithResource( - store.view(classOf[TaskDataWrapper]) - .parent(stageKey) - .index(index) - .first(0L) - .closeableIterator() - ) { it => - var last = Double.NaN - var currentIdx = -1L - indices.map { idx => - if (idx == currentIdx) { + .closeableIterator() + ) { it => + var last = Double.NaN + var currentIdx = -1L + indices.map { idx => + if (idx == currentIdx) { + last + } else { + val diff = idx - currentIdx + currentIdx = idx + if (it.skip(diff - 1)) { + last = fn(it.next()).toDouble last } else { - val diff = idx - currentIdx - currentIdx = idx - if (it.skip(diff - 1)) { - last = fn(it.next()).toDouble - last - } else { - Double.NaN - } + Double.NaN } - }.toIndexedSeq - } + } + }.toIndexedSeq } } val computedQuantiles = new v1.TaskMetricDistributions( quantiles = quantiles, - executorDeserializeTime = scanTasks(TaskIndexNames.DESER_TIME) { t => + executorDeserializeTime = scanTasks(SuccessTaskIndexNames.DESER_TIME) { t => t.executorDeserializeTime }, - executorDeserializeCpuTime = scanTasks(TaskIndexNames.DESER_CPU_TIME) { t => + executorDeserializeCpuTime = scanTasks(SuccessTaskIndexNames.DESER_CPU_TIME) { t => t.executorDeserializeCpuTime }, - executorRunTime = scanTasks(TaskIndexNames.EXEC_RUN_TIME) { t => t.executorRunTime }, - executorCpuTime = scanTasks(TaskIndexNames.EXEC_CPU_TIME) { t => t.executorCpuTime }, - resultSize = scanTasks(TaskIndexNames.RESULT_SIZE) { t => t.resultSize }, - jvmGcTime = scanTasks(TaskIndexNames.GC_TIME) { t => t.jvmGcTime }, - resultSerializationTime = scanTasks(TaskIndexNames.SER_TIME) { t => + executorRunTime = scanTasks(SuccessTaskIndexNames.EXEC_RUN_TIME) { t => t.executorRunTime }, + executorCpuTime = scanTasks(SuccessTaskIndexNames.EXEC_CPU_TIME) { t => t.executorCpuTime }, + resultSize = scanTasks(SuccessTaskIndexNames.RESULT_SIZE) { t => t.resultSize }, + jvmGcTime = scanTasks(SuccessTaskIndexNames.GC_TIME) { t => t.jvmGcTime }, + resultSerializationTime = scanTasks(SuccessTaskIndexNames.SER_TIME) { t => t.resultSerializationTime }, - gettingResultTime = scanTasks(TaskIndexNames.GETTING_RESULT_TIME) { t => + gettingResultTime = scanTasks(SuccessTaskIndexNames.GETTING_RESULT_TIME) { t => t.gettingResultTime }, - schedulerDelay = scanTasks(TaskIndexNames.SCHEDULER_DELAY) { t => t.schedulerDelay }, - peakExecutionMemory = scanTasks(TaskIndexNames.PEAK_MEM) { t => t.peakExecutionMemory }, - memoryBytesSpilled = scanTasks(TaskIndexNames.MEM_SPILL) { t => t.memoryBytesSpilled }, - diskBytesSpilled = scanTasks(TaskIndexNames.DISK_SPILL) { t => t.diskBytesSpilled }, + schedulerDelay = scanTasks(SuccessTaskIndexNames.SCHEDULER_DELAY) { t => t.schedulerDelay }, + peakExecutionMemory = scanTasks(SuccessTaskIndexNames.PEAK_MEM) { t => + t.peakExecutionMemory }, + memoryBytesSpilled = scanTasks(SuccessTaskIndexNames.MEM_SPILL) { t => t.memoryBytesSpilled }, + diskBytesSpilled = scanTasks(SuccessTaskIndexNames.DISK_SPILL) { t => t.diskBytesSpilled }, inputMetrics = new v1.InputMetricDistributions( - scanTasks(TaskIndexNames.INPUT_SIZE) { t => t.inputBytesRead }, - scanTasks(TaskIndexNames.INPUT_RECORDS) { t => t.inputRecordsRead }), + scanTasks(SuccessTaskIndexNames.INPUT_SIZE) { t => t.inputBytesRead }, + scanTasks(SuccessTaskIndexNames.INPUT_RECORDS) { t => t.inputRecordsRead }), outputMetrics = new v1.OutputMetricDistributions( - scanTasks(TaskIndexNames.OUTPUT_SIZE) { t => t.outputBytesWritten }, - scanTasks(TaskIndexNames.OUTPUT_RECORDS) { t => t.outputRecordsWritten }), + scanTasks(SuccessTaskIndexNames.OUTPUT_SIZE) { t => t.outputBytesWritten }, + scanTasks(SuccessTaskIndexNames.OUTPUT_RECORDS) { t => t.outputRecordsWritten }), shuffleReadMetrics = new v1.ShuffleReadMetricDistributions( - scanTasks(TaskIndexNames.SHUFFLE_TOTAL_READS) { m => + scanTasks(SuccessTaskIndexNames.SHUFFLE_TOTAL_READS) { m => m.shuffleLocalBytesRead + m.shuffleRemoteBytesRead }, - scanTasks(TaskIndexNames.SHUFFLE_READ_RECORDS) { t => t.shuffleRecordsRead }, - scanTasks(TaskIndexNames.SHUFFLE_REMOTE_BLOCKS) { t => t.shuffleRemoteBlocksFetched }, - scanTasks(TaskIndexNames.SHUFFLE_LOCAL_BLOCKS) { t => t.shuffleLocalBlocksFetched }, - scanTasks(TaskIndexNames.SHUFFLE_READ_TIME) { t => t.shuffleFetchWaitTime }, - scanTasks(TaskIndexNames.SHUFFLE_REMOTE_READS) { t => t.shuffleRemoteBytesRead }, - scanTasks(TaskIndexNames.SHUFFLE_REMOTE_READS_TO_DISK) { t => + scanTasks(SuccessTaskIndexNames.SHUFFLE_READ_RECORDS) { t => t.shuffleRecordsRead }, + scanTasks(SuccessTaskIndexNames.SHUFFLE_REMOTE_BLOCKS) { t => + t.shuffleRemoteBlocksFetched }, + scanTasks(SuccessTaskIndexNames.SHUFFLE_LOCAL_BLOCKS) { t => t.shuffleLocalBlocksFetched }, + scanTasks(SuccessTaskIndexNames.SHUFFLE_READ_TIME) { t => t.shuffleFetchWaitTime }, + scanTasks(SuccessTaskIndexNames.SHUFFLE_REMOTE_READS) { t => t.shuffleRemoteBytesRead }, + scanTasks(SuccessTaskIndexNames.SHUFFLE_REMOTE_READS_TO_DISK) { t => t.shuffleRemoteBytesReadToDisk }, - scanTasks(TaskIndexNames.SHUFFLE_TOTAL_BLOCKS) { m => + scanTasks(SuccessTaskIndexNames.SHUFFLE_TOTAL_BLOCKS) { m => m.shuffleLocalBlocksFetched + m.shuffleRemoteBlocksFetched }), shuffleWriteMetrics = new v1.ShuffleWriteMetricDistributions( - scanTasks(TaskIndexNames.SHUFFLE_WRITE_SIZE) { t => t.shuffleBytesWritten }, - scanTasks(TaskIndexNames.SHUFFLE_WRITE_RECORDS) { t => t.shuffleRecordsWritten }, - scanTasks(TaskIndexNames.SHUFFLE_WRITE_TIME) { t => t.shuffleWriteTime })) + scanTasks(SuccessTaskIndexNames.SHUFFLE_WRITE_SIZE) { t => t.shuffleBytesWritten }, + scanTasks(SuccessTaskIndexNames.SHUFFLE_WRITE_RECORDS) { t => t.shuffleRecordsWritten }, + scanTasks(SuccessTaskIndexNames.SHUFFLE_WRITE_TIME) { t => t.shuffleWriteTime })) // Go through the computed quantiles and cache the values that match the caching criteria. computedQuantiles.quantiles.zipWithIndex diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index 9da5bea8bf5c4..769e6c13820a5 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -97,6 +97,41 @@ private[spark] class StageDataWrapper( private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1L) } +/** + * This object map the indices names of successful tasks' metrices. Mapped to short strings + * to save space when using a disk store. + */ +private[spark] object SuccessTaskIndexNames { + final val DESER_CPU_TIME = "sdct" + final val DESER_TIME = "sdes" + final val DISK_SPILL = "sdbs" + final val EXEC_CPU_TIME = "sect" + final val EXEC_RUN_TIME = "sert" + final val GC_TIME = "sgc" + final val GETTING_RESULT_TIME = "sgrt" + final val INPUT_RECORDS = "sir" + final val INPUT_SIZE = "sis" + final val MEM_SPILL = "smbs" + final val OUTPUT_RECORDS = "sor" + final val OUTPUT_SIZE = "sos" + final val PEAK_MEM = "spem" + final val RESULT_SIZE = "srs" + final val SCHEDULER_DELAY = "sdly" + final val SER_TIME = "srst" + final val SHUFFLE_LOCAL_BLOCKS = "sslbl" + final val SHUFFLE_READ_RECORDS = "ssrr" + final val SHUFFLE_READ_TIME = "ssrt" + final val SHUFFLE_REMOTE_BLOCKS = "ssrbl" + final val SHUFFLE_REMOTE_READS = "ssrby" + final val SHUFFLE_REMOTE_READS_TO_DISK = "ssrbd" + final val SHUFFLE_TOTAL_READS = "sstby" + final val SHUFFLE_TOTAL_BLOCKS = "sstbl" + final val SHUFFLE_WRITE_RECORDS = "sswr" + final val SHUFFLE_WRITE_SIZE = "ssws" + final val SHUFFLE_WRITE_TIME = "sswt" + final val STAGE = "stage" +} + /** * Tasks have a lot of indices that are used in a few different places. This object keeps logical * names for these indices, mapped to short strings to save space when using a disk store. @@ -235,6 +270,8 @@ private[spark] class TaskDataWrapper( def hasMetrics: Boolean = executorDeserializeTime >= 0 + private val isSuccess = status == "SUCCESS" + def toApi: TaskData = { val metrics = if (hasMetrics) { Some(new TaskMetrics( @@ -290,6 +327,119 @@ private[spark] class TaskDataWrapper( gettingResultTime = 0L) } + @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.DESER_TIME, parent = TaskIndexNames.STAGE) + def executorDeserializeTimeIndex: Long = if (isSuccess) { + executorDeserializeTime + } else { + -1L + } + + @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.DESER_CPU_TIME, parent = TaskIndexNames.STAGE) + def executorDeserializeCpuTimeIndex: Long = if (isSuccess) { + executorDeserializeCpuTime + } else { + -1L + } + + @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.EXEC_RUN_TIME, parent = TaskIndexNames.STAGE) + def executorRunTimeIndex: Long = if (isSuccess) executorRunTime else -1L + + @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.EXEC_CPU_TIME, parent = TaskIndexNames.STAGE) + def executorCpuTimeIndex: Long = if (isSuccess) executorCpuTime else -1L + + @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.RESULT_SIZE, parent = TaskIndexNames.STAGE) + def resultSizeIndex: Long = if (isSuccess) resultSize else -1L + + @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.GC_TIME, parent = TaskIndexNames.STAGE) + def jvmGcTimeIndex: Long = if (isSuccess) jvmGcTime else -1L + + @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.SER_TIME, parent = TaskIndexNames.STAGE) + def resultSerializationTimeIndex: Long = if (isSuccess) resultSerializationTime else -1L + + @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.MEM_SPILL, parent = TaskIndexNames.STAGE) + def memoryBytesSpilledIndex: Long = if (isSuccess) memoryBytesSpilled else -1L + + @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.DISK_SPILL, parent = TaskIndexNames.STAGE) + def diskBytesSpilledIndex: Long = if (isSuccess) diskBytesSpilled else -1L + + @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.PEAK_MEM, parent = TaskIndexNames.STAGE) + def peakExecutionMemoryIndex: Long = if (isSuccess) peakExecutionMemory else -1L + + @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.INPUT_SIZE, parent = TaskIndexNames.STAGE) + def inputBytesReadIndex: Long = if (isSuccess) inputBytesRead else -1L + + @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.INPUT_RECORDS, parent = TaskIndexNames.STAGE) + def inputRecordsReadIndex: Long = if (isSuccess) inputRecordsRead else -1L + + @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.OUTPUT_SIZE, parent = TaskIndexNames.STAGE) + def outputBytesWrittenIndex: Long = if (isSuccess) outputBytesWritten else -1L + + @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.OUTPUT_RECORDS, parent = TaskIndexNames.STAGE) + def outputRecordsWrittenIndex: Long = if (isSuccess) outputRecordsWritten else -1L + + @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.SHUFFLE_REMOTE_BLOCKS, + parent = TaskIndexNames.STAGE) + def shuffleRemoteBlocksFetchedIndex: Long = if (isSuccess) { + shuffleRemoteBlocksFetched + } else { + -1L + } + + @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.SHUFFLE_LOCAL_BLOCKS, + parent = TaskIndexNames.STAGE) + def shuffleLocalBlocksFetchedIndex: Long = if (isSuccess) { + shuffleLocalBlocksFetched + } else { + -1L + } + + @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.SHUFFLE_READ_TIME, + parent = TaskIndexNames.STAGE) + def shuffleFetchWaitTimeIndex: Long = if (isSuccess) shuffleFetchWaitTime else -1L + + @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.SHUFFLE_REMOTE_READS, + parent = TaskIndexNames.STAGE) + def shuffleRemoteBytesReadIndex: Long = if (isSuccess) shuffleRemoteBytesRead else -1L + + @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.SHUFFLE_REMOTE_READS_TO_DISK, + parent = TaskIndexNames.STAGE) + def shuffleRemoteBytesReadToDiskIndex: Long = if (isSuccess) { + shuffleRemoteBytesReadToDisk + } else { + -1L + } + + @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.SHUFFLE_READ_RECORDS, + parent = TaskIndexNames.STAGE) + def shuffleRecordsReadIndex: Long = if (isSuccess) shuffleRecordsRead else -1L + + @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.SHUFFLE_WRITE_SIZE, + parent = TaskIndexNames.STAGE) + def shuffleBytesWrittenIndex: Long = if (isSuccess) shuffleBytesWritten else -1L + + @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.SHUFFLE_WRITE_TIME, + parent = TaskIndexNames.STAGE) + def shuffleWriteTimeIndex: Long = if (isSuccess) shuffleWriteTime else -1L + + @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.SHUFFLE_WRITE_RECORDS, + parent = TaskIndexNames.STAGE) + def shuffleRecordsWrittenIndex: Long = if (isSuccess) shuffleRecordsWritten else -1L + + @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.SCHEDULER_DELAY, parent = TaskIndexNames.STAGE) + def schedulerDelayIndex: Long = if (isSuccess) schedulerDelay else -1L + + @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.GETTING_RESULT_TIME, + parent = TaskIndexNames.STAGE) + def gettingResultTimeIndex: Long = if (isSuccess) gettingResultTime else -1L + + @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.SHUFFLE_TOTAL_READS, + parent = TaskIndexNames.STAGE) + def shuffleTotalReadsIndex: Long = if (isSuccess) shuffleTotalReads else -1L + + @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.SHUFFLE_TOTAL_BLOCKS, + parent = TaskIndexNames.STAGE) + private def shuffleTotalBlocksIndex: Long = if (isSuccess) shuffleTotalBlocks else -1L + @JsonIgnore @KVIndex(TaskIndexNames.STAGE) private def stage: Array[Int] = Array(stageId, stageAttemptId) diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala index 165fdb71cc78b..4752a95441325 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.status import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.util.Distribution +import org.apache.spark.util.{Distribution, Utils} import org.apache.spark.util.kvstore._ class AppStatusStoreSuite extends SparkFunSuite { @@ -92,6 +92,17 @@ class AppStatusStoreSuite extends SparkFunSuite { } } + test("SPARK-28638: only successful tasks have taskSummary when with disk kvstore (LevelDB)") { + val testDir = Utils.createTempDir() + val store = KVUtils.open(testDir, getClass().getName()) + + (0 until 5).foreach { i => store.write(newTaskData(i, status = "FAILED")) } + Seq(new AppStatusStore(store)).foreach { appStore => + val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles) + assert(summary.size === 0) + } + } + test("SPARK-28638: summary should contain successful tasks only when with in memory kvstore") { val store = new InMemoryStore() @@ -115,6 +126,34 @@ class AppStatusStoreSuite extends SparkFunSuite { } } + test("summary should contain successful tasks only when with disk kvstore (LevelDB)") { + + val testDir = Utils.createTempDir() + + val store = KVUtils.open(testDir, getClass().getName()) + + for (i <- 0 to 5) { + if (i % 2 == 1) { + store.write(newTaskData(i, status = "FAILED")) + } else { + store.write(newTaskData(i)) + } + } + + Seq(new AppStatusStore(store)).foreach { appStore => + val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles).get + + val values = Array(0.0, 2.0, 4.0) + + val dist = new Distribution(values, 0, values.length).getQuantiles(uiQuantiles.sorted) + dist.zip(summary.executorRunTime).foreach { case (expected, actual) => + assert(expected === actual) + } + } + store.close() + Utils.deleteRecursively(testDir) + } + private def compareQuantiles(count: Int, quantiles: Array[Double]): Unit = { val store = new InMemoryStore() val values = (0 until count).map { i => From 03304e18f825d06c8175a76876d971fa2301e9e3 Mon Sep 17 00:00:00 2001 From: shahid Date: Thu, 14 Nov 2019 04:14:30 +0530 Subject: [PATCH 02/14] minor edit --- .../spark/status/AppStatusStoreSuite.scala | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala index 4752a95441325..fa7daa806adff 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala @@ -92,12 +92,12 @@ class AppStatusStoreSuite extends SparkFunSuite { } } - test("SPARK-28638: only successful tasks have taskSummary when with disk kvstore (LevelDB)") { + test("SPARK-26260: only successful tasks have taskSummary when with disk kvstore (LevelDB)") { val testDir = Utils.createTempDir() - val store = KVUtils.open(testDir, getClass().getName()) + val diskStore = KVUtils.open(testDir, getClass().getName()) - (0 until 5).foreach { i => store.write(newTaskData(i, status = "FAILED")) } - Seq(new AppStatusStore(store)).foreach { appStore => + (0 until 5).foreach { i => diskStore.write(newTaskData(i, status = "FAILED")) } + Seq(new AppStatusStore(diskStore)).foreach { appStore => val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles) assert(summary.size === 0) } @@ -126,21 +126,19 @@ class AppStatusStoreSuite extends SparkFunSuite { } } - test("summary should contain successful tasks only when with disk kvstore (LevelDB)") { - + test("SPARK-26260: summary should contain successful tasks only when with LevelDB store") { val testDir = Utils.createTempDir() - - val store = KVUtils.open(testDir, getClass().getName()) + val diskStore = KVUtils.open(testDir, getClass().getName()) for (i <- 0 to 5) { if (i % 2 == 1) { - store.write(newTaskData(i, status = "FAILED")) + diskStore.write(newTaskData(i, status = "FAILED")) } else { - store.write(newTaskData(i)) + diskStore.write(newTaskData(i)) } } - Seq(new AppStatusStore(store)).foreach { appStore => + Seq(new AppStatusStore(diskStore)).foreach { appStore => val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles).get val values = Array(0.0, 2.0, 4.0) @@ -150,7 +148,7 @@ class AppStatusStoreSuite extends SparkFunSuite { assert(expected === actual) } } - store.close() + diskStore.close() Utils.deleteRecursively(testDir) } From a5690f6a7a4921c608f2171c2790557015e01252 Mon Sep 17 00:00:00 2001 From: shahid Date: Thu, 14 Nov 2019 22:28:44 +0530 Subject: [PATCH 03/14] edit test --- .../scala/org/apache/spark/status/AppStatusStoreSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala index fa7daa806adff..db8d02299d39e 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala @@ -101,6 +101,8 @@ class AppStatusStoreSuite extends SparkFunSuite { val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles) assert(summary.size === 0) } + diskStore.close() + Utils.deleteRecursively(testDir) } test("SPARK-28638: summary should contain successful tasks only when with in memory kvstore") { From f7a15d686ce42261d896430f927445f9e26b5191 Mon Sep 17 00:00:00 2001 From: shahid Date: Sat, 16 Nov 2019 21:54:26 +0530 Subject: [PATCH 04/14] address comment --- .../apache/spark/status/AppStatusStore.scala | 58 ++--- .../org/apache/spark/status/LiveEntity.scala | 109 +++++++-- .../org/apache/spark/status/storeTypes.scala | 229 ++++-------------- .../spark/status/AppStatusStoreSuite.scala | 63 ++++- 4 files changed, 223 insertions(+), 236 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 155e52fa00728..f5788be784f6d 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -158,7 +158,7 @@ private[spark] class AppStatusStore( Utils.tryWithResource( store.view(classOf[TaskDataWrapper]) .parent(stageKey) - .index(SuccessTaskIndexNames.EXEC_RUN_TIME) + .index(TaskIndexNames.EXEC_RUN_TIME) .first(0L) .closeableIterator() ) { it => @@ -258,53 +258,53 @@ private[spark] class AppStatusStore( val computedQuantiles = new v1.TaskMetricDistributions( quantiles = quantiles, - executorDeserializeTime = scanTasks(SuccessTaskIndexNames.DESER_TIME) { t => + executorDeserializeTime = scanTasks(TaskIndexNames.DESER_TIME) { t => t.executorDeserializeTime }, - executorDeserializeCpuTime = scanTasks(SuccessTaskIndexNames.DESER_CPU_TIME) { t => + executorDeserializeCpuTime = scanTasks(TaskIndexNames.DESER_CPU_TIME) { t => t.executorDeserializeCpuTime }, - executorRunTime = scanTasks(SuccessTaskIndexNames.EXEC_RUN_TIME) { t => t.executorRunTime }, - executorCpuTime = scanTasks(SuccessTaskIndexNames.EXEC_CPU_TIME) { t => t.executorCpuTime }, - resultSize = scanTasks(SuccessTaskIndexNames.RESULT_SIZE) { t => t.resultSize }, - jvmGcTime = scanTasks(SuccessTaskIndexNames.GC_TIME) { t => t.jvmGcTime }, - resultSerializationTime = scanTasks(SuccessTaskIndexNames.SER_TIME) { t => + executorRunTime = scanTasks(TaskIndexNames.EXEC_RUN_TIME) { t => t.executorRunTime }, + executorCpuTime = scanTasks(TaskIndexNames.EXEC_CPU_TIME) { t => t.executorCpuTime }, + resultSize = scanTasks(TaskIndexNames.RESULT_SIZE) { t => t.resultSize }, + jvmGcTime = scanTasks(TaskIndexNames.GC_TIME) { t => t.jvmGcTime }, + resultSerializationTime = scanTasks(TaskIndexNames.SER_TIME) { t => t.resultSerializationTime }, - gettingResultTime = scanTasks(SuccessTaskIndexNames.GETTING_RESULT_TIME) { t => + gettingResultTime = scanTasks(TaskIndexNames.GETTING_RESULT_TIME) { t => t.gettingResultTime }, - schedulerDelay = scanTasks(SuccessTaskIndexNames.SCHEDULER_DELAY) { t => t.schedulerDelay }, - peakExecutionMemory = scanTasks(SuccessTaskIndexNames.PEAK_MEM) { t => + schedulerDelay = scanTasks(TaskIndexNames.SCHEDULER_DELAY) { t => t.schedulerDelay }, + peakExecutionMemory = scanTasks(TaskIndexNames.PEAK_MEM) { t => t.peakExecutionMemory }, - memoryBytesSpilled = scanTasks(SuccessTaskIndexNames.MEM_SPILL) { t => t.memoryBytesSpilled }, - diskBytesSpilled = scanTasks(SuccessTaskIndexNames.DISK_SPILL) { t => t.diskBytesSpilled }, + memoryBytesSpilled = scanTasks(TaskIndexNames.MEM_SPILL) { t => t.memoryBytesSpilled }, + diskBytesSpilled = scanTasks(TaskIndexNames.DISK_SPILL) { t => t.diskBytesSpilled }, inputMetrics = new v1.InputMetricDistributions( - scanTasks(SuccessTaskIndexNames.INPUT_SIZE) { t => t.inputBytesRead }, - scanTasks(SuccessTaskIndexNames.INPUT_RECORDS) { t => t.inputRecordsRead }), + scanTasks(TaskIndexNames.INPUT_SIZE) { t => t.inputBytesRead }, + scanTasks(TaskIndexNames.INPUT_RECORDS) { t => t.inputRecordsRead }), outputMetrics = new v1.OutputMetricDistributions( - scanTasks(SuccessTaskIndexNames.OUTPUT_SIZE) { t => t.outputBytesWritten }, - scanTasks(SuccessTaskIndexNames.OUTPUT_RECORDS) { t => t.outputRecordsWritten }), + scanTasks(TaskIndexNames.OUTPUT_SIZE) { t => t.outputBytesWritten }, + scanTasks(TaskIndexNames.OUTPUT_RECORDS) { t => t.outputRecordsWritten }), shuffleReadMetrics = new v1.ShuffleReadMetricDistributions( - scanTasks(SuccessTaskIndexNames.SHUFFLE_TOTAL_READS) { m => + scanTasks(TaskIndexNames.SHUFFLE_TOTAL_READS) { m => m.shuffleLocalBytesRead + m.shuffleRemoteBytesRead }, - scanTasks(SuccessTaskIndexNames.SHUFFLE_READ_RECORDS) { t => t.shuffleRecordsRead }, - scanTasks(SuccessTaskIndexNames.SHUFFLE_REMOTE_BLOCKS) { t => + scanTasks(TaskIndexNames.SHUFFLE_READ_RECORDS) { t => t.shuffleRecordsRead }, + scanTasks(TaskIndexNames.SHUFFLE_REMOTE_BLOCKS) { t => t.shuffleRemoteBlocksFetched }, - scanTasks(SuccessTaskIndexNames.SHUFFLE_LOCAL_BLOCKS) { t => t.shuffleLocalBlocksFetched }, - scanTasks(SuccessTaskIndexNames.SHUFFLE_READ_TIME) { t => t.shuffleFetchWaitTime }, - scanTasks(SuccessTaskIndexNames.SHUFFLE_REMOTE_READS) { t => t.shuffleRemoteBytesRead }, - scanTasks(SuccessTaskIndexNames.SHUFFLE_REMOTE_READS_TO_DISK) { t => + scanTasks(TaskIndexNames.SHUFFLE_LOCAL_BLOCKS) { t => t.shuffleLocalBlocksFetched }, + scanTasks(TaskIndexNames.SHUFFLE_READ_TIME) { t => t.shuffleFetchWaitTime }, + scanTasks(TaskIndexNames.SHUFFLE_REMOTE_READS) { t => t.shuffleRemoteBytesRead }, + scanTasks(TaskIndexNames.SHUFFLE_REMOTE_READS_TO_DISK) { t => t.shuffleRemoteBytesReadToDisk }, - scanTasks(SuccessTaskIndexNames.SHUFFLE_TOTAL_BLOCKS) { m => + scanTasks(TaskIndexNames.SHUFFLE_TOTAL_BLOCKS) { m => m.shuffleLocalBlocksFetched + m.shuffleRemoteBlocksFetched }), shuffleWriteMetrics = new v1.ShuffleWriteMetricDistributions( - scanTasks(SuccessTaskIndexNames.SHUFFLE_WRITE_SIZE) { t => t.shuffleBytesWritten }, - scanTasks(SuccessTaskIndexNames.SHUFFLE_WRITE_RECORDS) { t => t.shuffleRecordsWritten }, - scanTasks(SuccessTaskIndexNames.SHUFFLE_WRITE_TIME) { t => t.shuffleWriteTime })) + scanTasks(TaskIndexNames.SHUFFLE_WRITE_SIZE) { t => t.shuffleBytesWritten }, + scanTasks(TaskIndexNames.SHUFFLE_WRITE_RECORDS) { t => t.shuffleRecordsWritten }, + scanTasks(TaskIndexNames.SHUFFLE_WRITE_TIME) { t => t.shuffleWriteTime })) // Go through the computed quantiles and cache the values that match the caching criteria. computedQuantiles.quantiles.zipWithIndex @@ -548,7 +548,7 @@ private[spark] class AppStatusStore( private[spark] object AppStatusStore { - val CURRENT_VERSION = 1L + val CURRENT_VERSION = 2L /** * Create an in-memory store for a live application. diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index a0ef8da0a4b6b..ff58d6e495344 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger import scala.collection.immutable.{HashSet, TreeSet} import scala.collection.mutable.HashMap +import scala.collection.mutable import com.google.common.collect.Interners @@ -29,6 +30,7 @@ import org.apache.spark.JobExecutionStatus import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} import org.apache.spark.resource.ResourceInformation import org.apache.spark.scheduler.{AccumulableInfo, StageInfo, TaskInfo} +import org.apache.spark.status.TaskIndexNames._ import org.apache.spark.status.api.v1 import org.apache.spark.storage.{RDDInfo, StorageLevel} import org.apache.spark.ui.SparkUI @@ -184,6 +186,19 @@ private class LiveTask( info.timeRunning(lastUpdateTime.getOrElse(System.currentTimeMillis())) } + val hasMetrics = metrics.executorDeserializeTime >= 0 + val handleZeros = mutable.HashSet[String]() + + /** + * For non successful tasks, store the metrics as negetive to avoid the calculation in the + * task summary. `toApi` method in TaskDataWrapper will make it actual value. + */ + val taskMetrics: v1.TaskMetrics = if (hasMetrics && !info.successful) { + makeNegative(metrics, handleZeros) + } else { + metrics + } + new TaskDataWrapper( info.taskId, info.index, @@ -199,30 +214,32 @@ private class LiveTask( newAccumulatorInfos(info.accumulables), errorMessage, - metrics.executorDeserializeTime, - metrics.executorDeserializeCpuTime, - metrics.executorRunTime, - metrics.executorCpuTime, - metrics.resultSize, - metrics.jvmGcTime, - metrics.resultSerializationTime, - metrics.memoryBytesSpilled, - metrics.diskBytesSpilled, - metrics.peakExecutionMemory, - metrics.inputMetrics.bytesRead, - metrics.inputMetrics.recordsRead, - metrics.outputMetrics.bytesWritten, - metrics.outputMetrics.recordsWritten, - metrics.shuffleReadMetrics.remoteBlocksFetched, - metrics.shuffleReadMetrics.localBlocksFetched, - metrics.shuffleReadMetrics.fetchWaitTime, - metrics.shuffleReadMetrics.remoteBytesRead, - metrics.shuffleReadMetrics.remoteBytesReadToDisk, - metrics.shuffleReadMetrics.localBytesRead, - metrics.shuffleReadMetrics.recordsRead, - metrics.shuffleWriteMetrics.bytesWritten, - metrics.shuffleWriteMetrics.writeTime, - metrics.shuffleWriteMetrics.recordsWritten, + hasMetrics, + handleZeros, + taskMetrics.executorDeserializeTime, + taskMetrics.executorDeserializeCpuTime, + taskMetrics.executorRunTime, + taskMetrics.executorCpuTime, + taskMetrics.resultSize, + taskMetrics.jvmGcTime, + taskMetrics.resultSerializationTime, + taskMetrics.memoryBytesSpilled, + taskMetrics.diskBytesSpilled, + taskMetrics.peakExecutionMemory, + taskMetrics.inputMetrics.bytesRead, + taskMetrics.inputMetrics.recordsRead, + taskMetrics.outputMetrics.bytesWritten, + taskMetrics.outputMetrics.recordsWritten, + taskMetrics.shuffleReadMetrics.remoteBlocksFetched, + taskMetrics.shuffleReadMetrics.localBlocksFetched, + taskMetrics.shuffleReadMetrics.fetchWaitTime, + taskMetrics.shuffleReadMetrics.remoteBytesRead, + taskMetrics.shuffleReadMetrics.remoteBytesReadToDisk, + taskMetrics.shuffleReadMetrics.localBytesRead, + taskMetrics.shuffleReadMetrics.recordsRead, + taskMetrics.shuffleWriteMetrics.bytesWritten, + taskMetrics.shuffleWriteMetrics.writeTime, + taskMetrics.shuffleWriteMetrics.recordsWritten, stageId, stageAttemptId) @@ -710,6 +727,50 @@ private object LiveEntityHelpers { addMetrics(m1, m2, -1) } + /** + * Convert all the metric values to negative as well as handle zero values. + * This method assumes that all the metric values are greater than or equal to zero + */ + def makeNegative( + m: v1.TaskMetrics, + handleZeros: mutable.HashSet[String]): v1.TaskMetrics = { + // If the metric value is 0, then make -1 and update the metric index in handleZeros. + def updateMetricValue(metric: Long, index: String): Long = { + if (metric == 0L) { + handleZeros.add(index) + -1L + } else { + metric * -1L + } + } + + createMetrics( + updateMetricValue(m.executorDeserializeTime, DESER_TIME), + updateMetricValue(m.executorDeserializeCpuTime, DESER_CPU_TIME), + updateMetricValue(m.executorRunTime, EXEC_RUN_TIME), + updateMetricValue(m.executorCpuTime, EXEC_CPU_TIME), + updateMetricValue(m.resultSize, RESULT_SIZE), + updateMetricValue(m.jvmGcTime, GC_TIME), + updateMetricValue(m.resultSerializationTime, SER_TIME), + updateMetricValue(m.memoryBytesSpilled, MEM_SPILL), + updateMetricValue(m.diskBytesSpilled, DISK_SPILL), + updateMetricValue(m.peakExecutionMemory, PEAK_MEM), + updateMetricValue(m.inputMetrics.bytesRead, INPUT_SIZE), + updateMetricValue(m.inputMetrics.recordsRead, INPUT_RECORDS), + updateMetricValue(m.outputMetrics.bytesWritten, OUTPUT_SIZE), + updateMetricValue(m.outputMetrics.recordsWritten, OUTPUT_RECORDS), + updateMetricValue(m.shuffleReadMetrics.remoteBlocksFetched, SHUFFLE_REMOTE_BLOCKS), + updateMetricValue(m.shuffleReadMetrics.localBlocksFetched, SHUFFLE_LOCAL_BLOCKS), + updateMetricValue(m.shuffleReadMetrics.fetchWaitTime, SHUFFLE_READ_TIME), + updateMetricValue(m.shuffleReadMetrics.remoteBytesRead, SHUFFLE_REMOTE_READS), + updateMetricValue(m.shuffleReadMetrics.remoteBytesReadToDisk, SHUFFLE_REMOTE_READS_TO_DISK), + updateMetricValue(m.shuffleReadMetrics.localBytesRead, SHUFFLE_LOCAL_READ), + updateMetricValue(m.shuffleReadMetrics.recordsRead, SHUFFLE_READ_RECORDS), + updateMetricValue(m.shuffleWriteMetrics.bytesWritten, SHUFFLE_WRITE_SIZE), + updateMetricValue(m.shuffleWriteMetrics.writeTime, SHUFFLE_WRITE_TIME), + updateMetricValue(m.shuffleWriteMetrics.recordsWritten, SHUFFLE_WRITE_RECORDS)) + } + private def addMetrics(m1: v1.TaskMetrics, m2: v1.TaskMetrics, mult: Int): v1.TaskMetrics = { createMetrics( m1.executorDeserializeTime + m2.executorDeserializeTime * mult, diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index 769e6c13820a5..8147dff960373 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -20,6 +20,8 @@ package org.apache.spark.status import java.lang.{Long => JLong} import java.util.Date +import scala.collection.mutable.HashSet + import com.fasterxml.jackson.annotation.JsonIgnore import com.fasterxml.jackson.databind.annotation.JsonDeserialize @@ -97,41 +99,6 @@ private[spark] class StageDataWrapper( private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1L) } -/** - * This object map the indices names of successful tasks' metrices. Mapped to short strings - * to save space when using a disk store. - */ -private[spark] object SuccessTaskIndexNames { - final val DESER_CPU_TIME = "sdct" - final val DESER_TIME = "sdes" - final val DISK_SPILL = "sdbs" - final val EXEC_CPU_TIME = "sect" - final val EXEC_RUN_TIME = "sert" - final val GC_TIME = "sgc" - final val GETTING_RESULT_TIME = "sgrt" - final val INPUT_RECORDS = "sir" - final val INPUT_SIZE = "sis" - final val MEM_SPILL = "smbs" - final val OUTPUT_RECORDS = "sor" - final val OUTPUT_SIZE = "sos" - final val PEAK_MEM = "spem" - final val RESULT_SIZE = "srs" - final val SCHEDULER_DELAY = "sdly" - final val SER_TIME = "srst" - final val SHUFFLE_LOCAL_BLOCKS = "sslbl" - final val SHUFFLE_READ_RECORDS = "ssrr" - final val SHUFFLE_READ_TIME = "ssrt" - final val SHUFFLE_REMOTE_BLOCKS = "ssrbl" - final val SHUFFLE_REMOTE_READS = "ssrby" - final val SHUFFLE_REMOTE_READS_TO_DISK = "ssrbd" - final val SHUFFLE_TOTAL_READS = "sstby" - final val SHUFFLE_TOTAL_BLOCKS = "sstbl" - final val SHUFFLE_WRITE_RECORDS = "sswr" - final val SHUFFLE_WRITE_SIZE = "ssws" - final val SHUFFLE_WRITE_TIME = "sswt" - final val STAGE = "stage" -} - /** * Tasks have a lot of indices that are used in a few different places. This object keeps logical * names for these indices, mapped to short strings to save space when using a disk store. @@ -172,6 +139,7 @@ private[spark] object TaskIndexNames { final val SHUFFLE_WRITE_RECORDS = "swr" final val SHUFFLE_WRITE_SIZE = "sws" final val SHUFFLE_WRITE_TIME = "swt" + final val SHUFFLE_LOCAL_READ = "slr" final val STAGE = "stage" final val STATUS = "sta" final val TASK_INDEX = "idx" @@ -212,6 +180,13 @@ private[spark] class TaskDataWrapper( val accumulatorUpdates: Seq[AccumulableInfo], val errorMessage: Option[String], + val hasMetrics: Boolean, + // Non successful metrics will have negative values in `TaskDataWrapper`. + // zero metric value will be converted to -1 and update the index in the hashset. + // However `TaskData` will have actual metric values. To recover the actual metric value + // from `TaskDataWrapper`, need use `getMetricValue` method. parameter `handleZero` is to + // check whether the index has zero metric value, which is used in the `getMetricValue`. + val handleZero: HashSet[String], // The following is an exploded view of a TaskMetrics API object. This saves 5 objects // (= 80 bytes of Java object overhead) per instance of this wrapper. If the first value // (executorDeserializeTime) is -1L, it means the metrics for this task have not been @@ -268,41 +243,46 @@ private[spark] class TaskDataWrapper( val stageId: Int, val stageAttemptId: Int) { - def hasMetrics: Boolean = executorDeserializeTime >= 0 - - private val isSuccess = status == "SUCCESS" + // To handle non successful tasks metrics (Running, Failed, Killed). + private def gerMetricValue(metric: Long, index: String): Long = { + if (handleZero(index)) { + 0L + } else { + math.abs(metric) + } + } def toApi: TaskData = { val metrics = if (hasMetrics) { Some(new TaskMetrics( - executorDeserializeTime, - executorDeserializeCpuTime, - executorRunTime, - executorCpuTime, - resultSize, - jvmGcTime, - resultSerializationTime, - memoryBytesSpilled, - diskBytesSpilled, - peakExecutionMemory, + gerMetricValue(executorDeserializeTime, TaskIndexNames.DESER_TIME), + gerMetricValue(executorDeserializeCpuTime, TaskIndexNames.DESER_CPU_TIME), + gerMetricValue(executorRunTime, TaskIndexNames.EXEC_RUN_TIME), + gerMetricValue(executorCpuTime, TaskIndexNames.EXEC_CPU_TIME), + gerMetricValue(resultSize, TaskIndexNames.RESULT_SIZE), + gerMetricValue(jvmGcTime, TaskIndexNames.GC_TIME), + gerMetricValue(resultSerializationTime, TaskIndexNames.SER_TIME), + gerMetricValue(memoryBytesSpilled, TaskIndexNames.MEM_SPILL), + gerMetricValue(diskBytesSpilled, TaskIndexNames.DISK_SPILL), + gerMetricValue(peakExecutionMemory, TaskIndexNames.PEAK_MEM), new InputMetrics( - inputBytesRead, - inputRecordsRead), + gerMetricValue(inputBytesRead, TaskIndexNames.INPUT_SIZE), + gerMetricValue(inputRecordsRead, TaskIndexNames.INPUT_RECORDS)), new OutputMetrics( - outputBytesWritten, - outputRecordsWritten), + gerMetricValue(outputBytesWritten, TaskIndexNames.OUTPUT_SIZE), + gerMetricValue(outputRecordsWritten, TaskIndexNames.OUTPUT_RECORDS)), new ShuffleReadMetrics( - shuffleRemoteBlocksFetched, - shuffleLocalBlocksFetched, - shuffleFetchWaitTime, - shuffleRemoteBytesRead, - shuffleRemoteBytesReadToDisk, - shuffleLocalBytesRead, - shuffleRecordsRead), + gerMetricValue(shuffleRemoteBlocksFetched, TaskIndexNames.SHUFFLE_REMOTE_BLOCKS), + gerMetricValue(shuffleLocalBlocksFetched, TaskIndexNames.SHUFFLE_LOCAL_BLOCKS), + gerMetricValue(shuffleFetchWaitTime, TaskIndexNames.SHUFFLE_READ_TIME), + gerMetricValue(shuffleRemoteBytesRead, TaskIndexNames.SHUFFLE_REMOTE_READS), + gerMetricValue(shuffleRemoteBytesReadToDisk, TaskIndexNames.SHUFFLE_REMOTE_READS_TO_DISK), + gerMetricValue(shuffleLocalBytesRead, TaskIndexNames.SHUFFLE_LOCAL_READ), + gerMetricValue(shuffleRecordsRead, TaskIndexNames.SHUFFLE_READ_RECORDS)), new ShuffleWriteMetrics( - shuffleBytesWritten, - shuffleWriteTime, - shuffleRecordsWritten))) + gerMetricValue(shuffleBytesWritten, TaskIndexNames.SHUFFLE_WRITE_SIZE), + gerMetricValue(shuffleWriteTime, TaskIndexNames.SHUFFLE_WRITE_TIME), + gerMetricValue(shuffleRecordsWritten, TaskIndexNames.SHUFFLE_WRITE_RECORDS)))) } else { None } @@ -327,127 +307,16 @@ private[spark] class TaskDataWrapper( gettingResultTime = 0L) } - @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.DESER_TIME, parent = TaskIndexNames.STAGE) - def executorDeserializeTimeIndex: Long = if (isSuccess) { - executorDeserializeTime - } else { - -1L - } - - @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.DESER_CPU_TIME, parent = TaskIndexNames.STAGE) - def executorDeserializeCpuTimeIndex: Long = if (isSuccess) { - executorDeserializeCpuTime - } else { - -1L - } - - @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.EXEC_RUN_TIME, parent = TaskIndexNames.STAGE) - def executorRunTimeIndex: Long = if (isSuccess) executorRunTime else -1L - - @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.EXEC_CPU_TIME, parent = TaskIndexNames.STAGE) - def executorCpuTimeIndex: Long = if (isSuccess) executorCpuTime else -1L - - @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.RESULT_SIZE, parent = TaskIndexNames.STAGE) - def resultSizeIndex: Long = if (isSuccess) resultSize else -1L - - @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.GC_TIME, parent = TaskIndexNames.STAGE) - def jvmGcTimeIndex: Long = if (isSuccess) jvmGcTime else -1L - - @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.SER_TIME, parent = TaskIndexNames.STAGE) - def resultSerializationTimeIndex: Long = if (isSuccess) resultSerializationTime else -1L - - @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.MEM_SPILL, parent = TaskIndexNames.STAGE) - def memoryBytesSpilledIndex: Long = if (isSuccess) memoryBytesSpilled else -1L - - @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.DISK_SPILL, parent = TaskIndexNames.STAGE) - def diskBytesSpilledIndex: Long = if (isSuccess) diskBytesSpilled else -1L - - @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.PEAK_MEM, parent = TaskIndexNames.STAGE) - def peakExecutionMemoryIndex: Long = if (isSuccess) peakExecutionMemory else -1L - - @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.INPUT_SIZE, parent = TaskIndexNames.STAGE) - def inputBytesReadIndex: Long = if (isSuccess) inputBytesRead else -1L - - @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.INPUT_RECORDS, parent = TaskIndexNames.STAGE) - def inputRecordsReadIndex: Long = if (isSuccess) inputRecordsRead else -1L - - @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.OUTPUT_SIZE, parent = TaskIndexNames.STAGE) - def outputBytesWrittenIndex: Long = if (isSuccess) outputBytesWritten else -1L - - @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.OUTPUT_RECORDS, parent = TaskIndexNames.STAGE) - def outputRecordsWrittenIndex: Long = if (isSuccess) outputRecordsWritten else -1L - - @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.SHUFFLE_REMOTE_BLOCKS, - parent = TaskIndexNames.STAGE) - def shuffleRemoteBlocksFetchedIndex: Long = if (isSuccess) { - shuffleRemoteBlocksFetched - } else { - -1L - } - - @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.SHUFFLE_LOCAL_BLOCKS, - parent = TaskIndexNames.STAGE) - def shuffleLocalBlocksFetchedIndex: Long = if (isSuccess) { - shuffleLocalBlocksFetched - } else { - -1L - } - - @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.SHUFFLE_READ_TIME, - parent = TaskIndexNames.STAGE) - def shuffleFetchWaitTimeIndex: Long = if (isSuccess) shuffleFetchWaitTime else -1L - - @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.SHUFFLE_REMOTE_READS, - parent = TaskIndexNames.STAGE) - def shuffleRemoteBytesReadIndex: Long = if (isSuccess) shuffleRemoteBytesRead else -1L - - @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.SHUFFLE_REMOTE_READS_TO_DISK, - parent = TaskIndexNames.STAGE) - def shuffleRemoteBytesReadToDiskIndex: Long = if (isSuccess) { - shuffleRemoteBytesReadToDisk - } else { - -1L - } - - @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.SHUFFLE_READ_RECORDS, - parent = TaskIndexNames.STAGE) - def shuffleRecordsReadIndex: Long = if (isSuccess) shuffleRecordsRead else -1L - - @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.SHUFFLE_WRITE_SIZE, - parent = TaskIndexNames.STAGE) - def shuffleBytesWrittenIndex: Long = if (isSuccess) shuffleBytesWritten else -1L - - @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.SHUFFLE_WRITE_TIME, - parent = TaskIndexNames.STAGE) - def shuffleWriteTimeIndex: Long = if (isSuccess) shuffleWriteTime else -1L - - @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.SHUFFLE_WRITE_RECORDS, - parent = TaskIndexNames.STAGE) - def shuffleRecordsWrittenIndex: Long = if (isSuccess) shuffleRecordsWritten else -1L - - @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.SCHEDULER_DELAY, parent = TaskIndexNames.STAGE) - def schedulerDelayIndex: Long = if (isSuccess) schedulerDelay else -1L - - @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.GETTING_RESULT_TIME, - parent = TaskIndexNames.STAGE) - def gettingResultTimeIndex: Long = if (isSuccess) gettingResultTime else -1L - - @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.SHUFFLE_TOTAL_READS, - parent = TaskIndexNames.STAGE) - def shuffleTotalReadsIndex: Long = if (isSuccess) shuffleTotalReads else -1L - - @JsonIgnore @KVIndex(value = SuccessTaskIndexNames.SHUFFLE_TOTAL_BLOCKS, - parent = TaskIndexNames.STAGE) - private def shuffleTotalBlocksIndex: Long = if (isSuccess) shuffleTotalBlocks else -1L - @JsonIgnore @KVIndex(TaskIndexNames.STAGE) private def stage: Array[Int] = Array(stageId, stageAttemptId) @JsonIgnore @KVIndex(value = TaskIndexNames.SCHEDULER_DELAY, parent = TaskIndexNames.STAGE) def schedulerDelay: Long = { if (hasMetrics) { - AppStatusUtils.schedulerDelay(launchTime, resultFetchStart, duration, executorDeserializeTime, - resultSerializationTime, executorRunTime) + AppStatusUtils.schedulerDelay(launchTime, resultFetchStart, duration, + gerMetricValue(executorDeserializeTime, TaskIndexNames.DESER_TIME), + gerMetricValue(resultSerializationTime, TaskIndexNames.SER_TIME), + gerMetricValue(executorRunTime, TaskIndexNames.EXEC_RUN_TIME)) } else { -1L } @@ -480,7 +349,8 @@ private[spark] class TaskDataWrapper( @JsonIgnore @KVIndex(value = TaskIndexNames.SHUFFLE_TOTAL_READS, parent = TaskIndexNames.STAGE) private def shuffleTotalReads: Long = { if (hasMetrics) { - shuffleLocalBytesRead + shuffleRemoteBytesRead + gerMetricValue(shuffleLocalBytesRead, TaskIndexNames.SHUFFLE_LOCAL_READ) + + gerMetricValue(shuffleRemoteBytesRead, TaskIndexNames.SHUFFLE_REMOTE_READS) } else { -1L } @@ -489,7 +359,8 @@ private[spark] class TaskDataWrapper( @JsonIgnore @KVIndex(value = TaskIndexNames.SHUFFLE_TOTAL_BLOCKS, parent = TaskIndexNames.STAGE) private def shuffleTotalBlocks: Long = { if (hasMetrics) { - shuffleLocalBlocksFetched + shuffleRemoteBlocksFetched + gerMetricValue(shuffleLocalBlocksFetched, TaskIndexNames.SHUFFLE_LOCAL_BLOCKS) + + gerMetricValue(shuffleRemoteBlocksFetched, TaskIndexNames.SHUFFLE_REMOTE_BLOCKS) } else { -1L } diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala index db8d02299d39e..28c8973f2e814 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala @@ -17,7 +17,12 @@ package org.apache.spark.status +import scala.collection.mutable.HashSet + import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.status.LiveEntityHelpers.makeNegative +import org.apache.spark.status.api.v1 +import org.apache.spark.status.api.v1.{InputMetrics, OutputMetrics, ShuffleReadMetrics, ShuffleWriteMetrics} import org.apache.spark.util.{Distribution, Utils} import org.apache.spark.util.kvstore._ @@ -128,6 +133,15 @@ class AppStatusStoreSuite extends SparkFunSuite { } } + test("task summary size for default metrics should be zero") { + val store = new InMemoryStore() + (0 until 5).foreach { _ => store.write(newTaskData(-1, status = "RUNNING")) } + Seq(new AppStatusStore(store), createLiveStore(store)).foreach { appStore => + val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles) + assert(summary.size === 0) + } + } + test("SPARK-26260: summary should contain successful tasks only when with LevelDB store") { val testDir = Utils.createTempDir() val diskStore = KVUtils.open(testDir, getClass().getName()) @@ -171,10 +185,51 @@ class AppStatusStoreSuite extends SparkFunSuite { } private def newTaskData(i: Int, status: String = "SUCCESS"): TaskDataWrapper = { - new TaskDataWrapper( - i, i, i, i, i, i, i.toString, i.toString, status, i.toString, false, Nil, None, - i, i, i, i, i, i, i, i, i, i, + + val metrics = new v1.TaskMetrics( i, i, i, i, i, i, i, i, i, i, - i, i, i, i, stageId, attemptId) + new InputMetrics(i, i), + new OutputMetrics(i, i), + new ShuffleReadMetrics(i, i, i, i, i, i, i), + new ShuffleWriteMetrics(i, i, i)) + + val hasMetrics = i >= 0 + val handleZero = HashSet[String]() + + val taskMetrics: v1.TaskMetrics = if (hasMetrics && status != "SUCCESS") { + makeNegative(metrics, handleZero) + } else { + metrics + } + + new TaskDataWrapper( + i.toLong, i, i, i, i, i, i.toString, i.toString, status, i.toString, false, Nil, None, + hasMetrics, + handleZero, + taskMetrics.executorDeserializeTime, + taskMetrics.executorDeserializeCpuTime, + taskMetrics.executorRunTime, + taskMetrics.executorCpuTime, + taskMetrics.resultSize, + taskMetrics.jvmGcTime, + taskMetrics.resultSerializationTime, + taskMetrics.memoryBytesSpilled, + taskMetrics.diskBytesSpilled, + taskMetrics.peakExecutionMemory, + taskMetrics.inputMetrics.bytesRead, + taskMetrics.inputMetrics.recordsRead, + taskMetrics.outputMetrics.bytesWritten, + taskMetrics.outputMetrics.recordsWritten, + taskMetrics.shuffleReadMetrics.remoteBlocksFetched, + taskMetrics.shuffleReadMetrics.localBlocksFetched, + taskMetrics.shuffleReadMetrics.fetchWaitTime, + taskMetrics.shuffleReadMetrics.remoteBytesRead, + taskMetrics.shuffleReadMetrics.remoteBytesReadToDisk, + taskMetrics.shuffleReadMetrics.localBytesRead, + taskMetrics.shuffleReadMetrics.recordsRead, + taskMetrics.shuffleWriteMetrics.bytesWritten, + taskMetrics.shuffleWriteMetrics.writeTime, + taskMetrics.shuffleWriteMetrics.recordsWritten, + stageId, attemptId) } } From c8196b8677e1d80ce5d9fe7037d966d81cd6feee Mon Sep 17 00:00:00 2001 From: shahid Date: Sun, 17 Nov 2019 00:18:05 +0530 Subject: [PATCH 05/14] minor update --- .../apache/spark/status/AppStatusStore.scala | 6 +- .../org/apache/spark/status/LiveEntity.scala | 4 +- .../org/apache/spark/status/storeTypes.scala | 68 +++++++++---------- 3 files changed, 37 insertions(+), 41 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index f5788be784f6d..6b89812cc2bf0 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -275,8 +275,7 @@ private[spark] class AppStatusStore( t.gettingResultTime }, schedulerDelay = scanTasks(TaskIndexNames.SCHEDULER_DELAY) { t => t.schedulerDelay }, - peakExecutionMemory = scanTasks(TaskIndexNames.PEAK_MEM) { t => - t.peakExecutionMemory }, + peakExecutionMemory = scanTasks(TaskIndexNames.PEAK_MEM) { t => t.peakExecutionMemory }, memoryBytesSpilled = scanTasks(TaskIndexNames.MEM_SPILL) { t => t.memoryBytesSpilled }, diskBytesSpilled = scanTasks(TaskIndexNames.DISK_SPILL) { t => t.diskBytesSpilled }, inputMetrics = new v1.InputMetricDistributions( @@ -290,8 +289,7 @@ private[spark] class AppStatusStore( m.shuffleLocalBytesRead + m.shuffleRemoteBytesRead }, scanTasks(TaskIndexNames.SHUFFLE_READ_RECORDS) { t => t.shuffleRecordsRead }, - scanTasks(TaskIndexNames.SHUFFLE_REMOTE_BLOCKS) { t => - t.shuffleRemoteBlocksFetched }, + scanTasks(TaskIndexNames.SHUFFLE_REMOTE_BLOCKS) { t => t.shuffleRemoteBlocksFetched }, scanTasks(TaskIndexNames.SHUFFLE_LOCAL_BLOCKS) { t => t.shuffleLocalBlocksFetched }, scanTasks(TaskIndexNames.SHUFFLE_READ_TIME) { t => t.shuffleFetchWaitTime }, scanTasks(TaskIndexNames.SHUFFLE_REMOTE_READS) { t => t.shuffleRemoteBytesRead }, diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index ff58d6e495344..76bf46a05ffd0 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -731,9 +731,7 @@ private object LiveEntityHelpers { * Convert all the metric values to negative as well as handle zero values. * This method assumes that all the metric values are greater than or equal to zero */ - def makeNegative( - m: v1.TaskMetrics, - handleZeros: mutable.HashSet[String]): v1.TaskMetrics = { + def makeNegative(m: v1.TaskMetrics, handleZeros: mutable.HashSet[String]): v1.TaskMetrics = { // If the metric value is 0, then make -1 and update the metric index in handleZeros. def updateMetricValue(metric: Long, index: String): Long = { if (metric == 0L) { diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index 8147dff960373..4e4fb230b78a0 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -182,8 +182,7 @@ private[spark] class TaskDataWrapper( val hasMetrics: Boolean, // Non successful metrics will have negative values in `TaskDataWrapper`. - // zero metric value will be converted to -1 and update the index in the hashset. - // However `TaskData` will have actual metric values. To recover the actual metric value + // `TaskData` will have actual metric values. To recover the actual metric value // from `TaskDataWrapper`, need use `getMetricValue` method. parameter `handleZero` is to // check whether the index has zero metric value, which is used in the `getMetricValue`. val handleZero: HashSet[String], @@ -244,7 +243,7 @@ private[spark] class TaskDataWrapper( val stageAttemptId: Int) { // To handle non successful tasks metrics (Running, Failed, Killed). - private def gerMetricValue(metric: Long, index: String): Long = { + private def getMetricValue(metric: Long, index: String): Long = { if (handleZero(index)) { 0L } else { @@ -255,34 +254,35 @@ private[spark] class TaskDataWrapper( def toApi: TaskData = { val metrics = if (hasMetrics) { Some(new TaskMetrics( - gerMetricValue(executorDeserializeTime, TaskIndexNames.DESER_TIME), - gerMetricValue(executorDeserializeCpuTime, TaskIndexNames.DESER_CPU_TIME), - gerMetricValue(executorRunTime, TaskIndexNames.EXEC_RUN_TIME), - gerMetricValue(executorCpuTime, TaskIndexNames.EXEC_CPU_TIME), - gerMetricValue(resultSize, TaskIndexNames.RESULT_SIZE), - gerMetricValue(jvmGcTime, TaskIndexNames.GC_TIME), - gerMetricValue(resultSerializationTime, TaskIndexNames.SER_TIME), - gerMetricValue(memoryBytesSpilled, TaskIndexNames.MEM_SPILL), - gerMetricValue(diskBytesSpilled, TaskIndexNames.DISK_SPILL), - gerMetricValue(peakExecutionMemory, TaskIndexNames.PEAK_MEM), + getMetricValue(executorDeserializeTime, TaskIndexNames.DESER_TIME), + getMetricValue(executorDeserializeCpuTime, TaskIndexNames.DESER_CPU_TIME), + getMetricValue(executorRunTime, TaskIndexNames.EXEC_RUN_TIME), + getMetricValue(executorCpuTime, TaskIndexNames.EXEC_CPU_TIME), + getMetricValue(resultSize, TaskIndexNames.RESULT_SIZE), + getMetricValue(jvmGcTime, TaskIndexNames.GC_TIME), + getMetricValue(resultSerializationTime, TaskIndexNames.SER_TIME), + getMetricValue(memoryBytesSpilled, TaskIndexNames.MEM_SPILL), + getMetricValue(diskBytesSpilled, TaskIndexNames.DISK_SPILL), + getMetricValue(peakExecutionMemory, TaskIndexNames.PEAK_MEM), new InputMetrics( - gerMetricValue(inputBytesRead, TaskIndexNames.INPUT_SIZE), - gerMetricValue(inputRecordsRead, TaskIndexNames.INPUT_RECORDS)), + getMetricValue(inputBytesRead, TaskIndexNames.INPUT_SIZE), + getMetricValue(inputRecordsRead, TaskIndexNames.INPUT_RECORDS)), new OutputMetrics( - gerMetricValue(outputBytesWritten, TaskIndexNames.OUTPUT_SIZE), - gerMetricValue(outputRecordsWritten, TaskIndexNames.OUTPUT_RECORDS)), + getMetricValue(outputBytesWritten, TaskIndexNames.OUTPUT_SIZE), + getMetricValue(outputRecordsWritten, TaskIndexNames.OUTPUT_RECORDS)), new ShuffleReadMetrics( - gerMetricValue(shuffleRemoteBlocksFetched, TaskIndexNames.SHUFFLE_REMOTE_BLOCKS), - gerMetricValue(shuffleLocalBlocksFetched, TaskIndexNames.SHUFFLE_LOCAL_BLOCKS), - gerMetricValue(shuffleFetchWaitTime, TaskIndexNames.SHUFFLE_READ_TIME), - gerMetricValue(shuffleRemoteBytesRead, TaskIndexNames.SHUFFLE_REMOTE_READS), - gerMetricValue(shuffleRemoteBytesReadToDisk, TaskIndexNames.SHUFFLE_REMOTE_READS_TO_DISK), - gerMetricValue(shuffleLocalBytesRead, TaskIndexNames.SHUFFLE_LOCAL_READ), - gerMetricValue(shuffleRecordsRead, TaskIndexNames.SHUFFLE_READ_RECORDS)), + getMetricValue(shuffleRemoteBlocksFetched, TaskIndexNames.SHUFFLE_REMOTE_BLOCKS), + getMetricValue(shuffleLocalBlocksFetched, TaskIndexNames.SHUFFLE_LOCAL_BLOCKS), + getMetricValue(shuffleFetchWaitTime, TaskIndexNames.SHUFFLE_READ_TIME), + getMetricValue(shuffleRemoteBytesRead, TaskIndexNames.SHUFFLE_REMOTE_READS), + getMetricValue(shuffleRemoteBytesReadToDisk, + TaskIndexNames.SHUFFLE_REMOTE_READS_TO_DISK), + getMetricValue(shuffleLocalBytesRead, TaskIndexNames.SHUFFLE_LOCAL_READ), + getMetricValue(shuffleRecordsRead, TaskIndexNames.SHUFFLE_READ_RECORDS)), new ShuffleWriteMetrics( - gerMetricValue(shuffleBytesWritten, TaskIndexNames.SHUFFLE_WRITE_SIZE), - gerMetricValue(shuffleWriteTime, TaskIndexNames.SHUFFLE_WRITE_TIME), - gerMetricValue(shuffleRecordsWritten, TaskIndexNames.SHUFFLE_WRITE_RECORDS)))) + getMetricValue(shuffleBytesWritten, TaskIndexNames.SHUFFLE_WRITE_SIZE), + getMetricValue(shuffleWriteTime, TaskIndexNames.SHUFFLE_WRITE_TIME), + getMetricValue(shuffleRecordsWritten, TaskIndexNames.SHUFFLE_WRITE_RECORDS)))) } else { None } @@ -314,9 +314,9 @@ private[spark] class TaskDataWrapper( def schedulerDelay: Long = { if (hasMetrics) { AppStatusUtils.schedulerDelay(launchTime, resultFetchStart, duration, - gerMetricValue(executorDeserializeTime, TaskIndexNames.DESER_TIME), - gerMetricValue(resultSerializationTime, TaskIndexNames.SER_TIME), - gerMetricValue(executorRunTime, TaskIndexNames.EXEC_RUN_TIME)) + getMetricValue(executorDeserializeTime, TaskIndexNames.DESER_TIME), + getMetricValue(resultSerializationTime, TaskIndexNames.SER_TIME), + getMetricValue(executorRunTime, TaskIndexNames.EXEC_RUN_TIME)) } else { -1L } @@ -349,8 +349,8 @@ private[spark] class TaskDataWrapper( @JsonIgnore @KVIndex(value = TaskIndexNames.SHUFFLE_TOTAL_READS, parent = TaskIndexNames.STAGE) private def shuffleTotalReads: Long = { if (hasMetrics) { - gerMetricValue(shuffleLocalBytesRead, TaskIndexNames.SHUFFLE_LOCAL_READ) + - gerMetricValue(shuffleRemoteBytesRead, TaskIndexNames.SHUFFLE_REMOTE_READS) + getMetricValue(shuffleLocalBytesRead, TaskIndexNames.SHUFFLE_LOCAL_READ) + + getMetricValue(shuffleRemoteBytesRead, TaskIndexNames.SHUFFLE_REMOTE_READS) } else { -1L } @@ -359,8 +359,8 @@ private[spark] class TaskDataWrapper( @JsonIgnore @KVIndex(value = TaskIndexNames.SHUFFLE_TOTAL_BLOCKS, parent = TaskIndexNames.STAGE) private def shuffleTotalBlocks: Long = { if (hasMetrics) { - gerMetricValue(shuffleLocalBlocksFetched, TaskIndexNames.SHUFFLE_LOCAL_BLOCKS) + - gerMetricValue(shuffleRemoteBlocksFetched, TaskIndexNames.SHUFFLE_REMOTE_BLOCKS) + getMetricValue(shuffleLocalBlocksFetched, TaskIndexNames.SHUFFLE_LOCAL_BLOCKS) + + getMetricValue(shuffleRemoteBlocksFetched, TaskIndexNames.SHUFFLE_REMOTE_BLOCKS) } else { -1L } From e8d0d146048bdf93a45026021a76aa9d942b1576 Mon Sep 17 00:00:00 2001 From: shahid Date: Sun, 17 Nov 2019 00:22:10 +0530 Subject: [PATCH 06/14] scalastyle --- core/src/main/scala/org/apache/spark/status/LiveEntity.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 76bf46a05ffd0..b4a78a34360f0 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -21,8 +21,8 @@ import java.util.Date import java.util.concurrent.atomic.AtomicInteger import scala.collection.immutable.{HashSet, TreeSet} -import scala.collection.mutable.HashMap import scala.collection.mutable +import scala.collection.mutable.HashMap import com.google.common.collect.Interners From 34420ea44b6826f34b4773bdd35658754e396159 Mon Sep 17 00:00:00 2001 From: shahid Date: Sun, 17 Nov 2019 01:27:53 +0530 Subject: [PATCH 07/14] update --- .../org/apache/spark/status/LiveEntity.scala | 5 +++-- .../org/apache/spark/status/storeTypes.scala | 6 +++--- .../spark/status/AppStatusStoreSuite.scala | 18 +++++++++--------- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index b4a78a34360f0..6981c322aa831 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -190,8 +190,9 @@ private class LiveTask( val handleZeros = mutable.HashSet[String]() /** - * For non successful tasks, store the metrics as negetive to avoid the calculation in the - * task summary. `toApi` method in TaskDataWrapper will make it actual value. + * SPARK-26260: For non successful tasks, store the metrics as negetive to avoid + * the calculation in the task summary. `toApi` method in the `TaskDataWrapper` will make + * it actual value. */ val taskMetrics: v1.TaskMetrics = if (hasMetrics && !info.successful) { makeNegative(metrics, handleZeros) diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index 4e4fb230b78a0..4cc0929909616 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -181,7 +181,7 @@ private[spark] class TaskDataWrapper( val errorMessage: Option[String], val hasMetrics: Boolean, - // Non successful metrics will have negative values in `TaskDataWrapper`. + // Non successful metrics now will have negative values in `TaskDataWrapper`. // `TaskData` will have actual metric values. To recover the actual metric value // from `TaskDataWrapper`, need use `getMetricValue` method. parameter `handleZero` is to // check whether the index has zero metric value, which is used in the `getMetricValue`. @@ -242,9 +242,9 @@ private[spark] class TaskDataWrapper( val stageId: Int, val stageAttemptId: Int) { - // To handle non successful tasks metrics (Running, Failed, Killed). + // SPARK-26260: To handle non successful tasks metrics (Running, Failed, Killed). private def getMetricValue(metric: Long, index: String): Long = { - if (handleZero(index)) { + if (status != "SUCCESS" && handleZero(index)) { 0L } else { math.abs(metric) diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala index 28c8973f2e814..5567dbfe537bb 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala @@ -133,15 +133,6 @@ class AppStatusStoreSuite extends SparkFunSuite { } } - test("task summary size for default metrics should be zero") { - val store = new InMemoryStore() - (0 until 5).foreach { _ => store.write(newTaskData(-1, status = "RUNNING")) } - Seq(new AppStatusStore(store), createLiveStore(store)).foreach { appStore => - val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles) - assert(summary.size === 0) - } - } - test("SPARK-26260: summary should contain successful tasks only when with LevelDB store") { val testDir = Utils.createTempDir() val diskStore = KVUtils.open(testDir, getClass().getName()) @@ -168,6 +159,15 @@ class AppStatusStoreSuite extends SparkFunSuite { Utils.deleteRecursively(testDir) } + test("SPARK-26260: task summary size for default metrics should be zero") { + val store = new InMemoryStore() + store.write(newTaskData(-1, status = "RUNNING")) + Seq(new AppStatusStore(store), createLiveStore(store)).foreach { appStore => + val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles) + assert(summary.size === 0) + } + } + private def compareQuantiles(count: Int, quantiles: Array[Double]): Unit = { val store = new InMemoryStore() val values = (0 until count).map { i => From f52eed832b185a5364d4f650f6e8fecc70ed37aa Mon Sep 17 00:00:00 2001 From: shahid Date: Sun, 17 Nov 2019 20:03:21 +0530 Subject: [PATCH 08/14] More optimisation.. --- .../org/apache/spark/status/LiveEntity.scala | 70 ++++++++--------- .../org/apache/spark/status/storeTypes.scala | 77 +++++++++---------- .../spark/status/AppStatusStoreSuite.scala | 6 +- 3 files changed, 68 insertions(+), 85 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 6981c322aa831..31b1fc4b27dee 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -21,7 +21,6 @@ import java.util.Date import java.util.concurrent.atomic.AtomicInteger import scala.collection.immutable.{HashSet, TreeSet} -import scala.collection.mutable import scala.collection.mutable.HashMap import com.google.common.collect.Interners @@ -30,7 +29,6 @@ import org.apache.spark.JobExecutionStatus import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} import org.apache.spark.resource.ResourceInformation import org.apache.spark.scheduler.{AccumulableInfo, StageInfo, TaskInfo} -import org.apache.spark.status.TaskIndexNames._ import org.apache.spark.status.api.v1 import org.apache.spark.storage.{RDDInfo, StorageLevel} import org.apache.spark.ui.SparkUI @@ -187,7 +185,6 @@ private class LiveTask( } val hasMetrics = metrics.executorDeserializeTime >= 0 - val handleZeros = mutable.HashSet[String]() /** * SPARK-26260: For non successful tasks, store the metrics as negetive to avoid @@ -195,7 +192,7 @@ private class LiveTask( * it actual value. */ val taskMetrics: v1.TaskMetrics = if (hasMetrics && !info.successful) { - makeNegative(metrics, handleZeros) + makeNegative(metrics) } else { metrics } @@ -216,7 +213,6 @@ private class LiveTask( errorMessage, hasMetrics, - handleZeros, taskMetrics.executorDeserializeTime, taskMetrics.executorDeserializeCpuTime, taskMetrics.executorRunTime, @@ -732,42 +728,40 @@ private object LiveEntityHelpers { * Convert all the metric values to negative as well as handle zero values. * This method assumes that all the metric values are greater than or equal to zero */ - def makeNegative(m: v1.TaskMetrics, handleZeros: mutable.HashSet[String]): v1.TaskMetrics = { - // If the metric value is 0, then make -1 and update the metric index in handleZeros. - def updateMetricValue(metric: Long, index: String): Long = { - if (metric == 0L) { - handleZeros.add(index) - -1L - } else { - metric * -1L - } + def makeNegative(m: v1.TaskMetrics): v1.TaskMetrics = { + // To handle 0 metric value, add 1 and make the metric negative. + // To recover actual value do `math.abs(metric + 1)` + // Eg: if the metric values are (5, 3, 0, 1) => Updated metrics value will be (-6, -4, -1, -2) + // To get actual metric value math.abs(metric +1) => (5, 3, 0, 1) + def updateMetricValue(metric: Long): Long = { + metric * -1L - 1L } createMetrics( - updateMetricValue(m.executorDeserializeTime, DESER_TIME), - updateMetricValue(m.executorDeserializeCpuTime, DESER_CPU_TIME), - updateMetricValue(m.executorRunTime, EXEC_RUN_TIME), - updateMetricValue(m.executorCpuTime, EXEC_CPU_TIME), - updateMetricValue(m.resultSize, RESULT_SIZE), - updateMetricValue(m.jvmGcTime, GC_TIME), - updateMetricValue(m.resultSerializationTime, SER_TIME), - updateMetricValue(m.memoryBytesSpilled, MEM_SPILL), - updateMetricValue(m.diskBytesSpilled, DISK_SPILL), - updateMetricValue(m.peakExecutionMemory, PEAK_MEM), - updateMetricValue(m.inputMetrics.bytesRead, INPUT_SIZE), - updateMetricValue(m.inputMetrics.recordsRead, INPUT_RECORDS), - updateMetricValue(m.outputMetrics.bytesWritten, OUTPUT_SIZE), - updateMetricValue(m.outputMetrics.recordsWritten, OUTPUT_RECORDS), - updateMetricValue(m.shuffleReadMetrics.remoteBlocksFetched, SHUFFLE_REMOTE_BLOCKS), - updateMetricValue(m.shuffleReadMetrics.localBlocksFetched, SHUFFLE_LOCAL_BLOCKS), - updateMetricValue(m.shuffleReadMetrics.fetchWaitTime, SHUFFLE_READ_TIME), - updateMetricValue(m.shuffleReadMetrics.remoteBytesRead, SHUFFLE_REMOTE_READS), - updateMetricValue(m.shuffleReadMetrics.remoteBytesReadToDisk, SHUFFLE_REMOTE_READS_TO_DISK), - updateMetricValue(m.shuffleReadMetrics.localBytesRead, SHUFFLE_LOCAL_READ), - updateMetricValue(m.shuffleReadMetrics.recordsRead, SHUFFLE_READ_RECORDS), - updateMetricValue(m.shuffleWriteMetrics.bytesWritten, SHUFFLE_WRITE_SIZE), - updateMetricValue(m.shuffleWriteMetrics.writeTime, SHUFFLE_WRITE_TIME), - updateMetricValue(m.shuffleWriteMetrics.recordsWritten, SHUFFLE_WRITE_RECORDS)) + updateMetricValue(m.executorDeserializeTime), + updateMetricValue(m.executorDeserializeCpuTime), + updateMetricValue(m.executorRunTime), + updateMetricValue(m.executorCpuTime), + updateMetricValue(m.resultSize), + updateMetricValue(m.jvmGcTime), + updateMetricValue(m.resultSerializationTime), + updateMetricValue(m.memoryBytesSpilled), + updateMetricValue(m.diskBytesSpilled), + updateMetricValue(m.peakExecutionMemory), + updateMetricValue(m.inputMetrics.bytesRead), + updateMetricValue(m.inputMetrics.recordsRead), + updateMetricValue(m.outputMetrics.bytesWritten), + updateMetricValue(m.outputMetrics.recordsWritten), + updateMetricValue(m.shuffleReadMetrics.remoteBlocksFetched), + updateMetricValue(m.shuffleReadMetrics.localBlocksFetched), + updateMetricValue(m.shuffleReadMetrics.fetchWaitTime), + updateMetricValue(m.shuffleReadMetrics.remoteBytesRead), + updateMetricValue(m.shuffleReadMetrics.remoteBytesReadToDisk), + updateMetricValue(m.shuffleReadMetrics.localBytesRead), + updateMetricValue(m.shuffleReadMetrics.recordsRead), + updateMetricValue(m.shuffleWriteMetrics.bytesWritten), + updateMetricValue(m.shuffleWriteMetrics.writeTime), + updateMetricValue(m.shuffleWriteMetrics.recordsWritten)) } private def addMetrics(m1: v1.TaskMetrics, m2: v1.TaskMetrics, mult: Int): v1.TaskMetrics = { diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index 4cc0929909616..f6035cb3a55b7 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -20,8 +20,6 @@ package org.apache.spark.status import java.lang.{Long => JLong} import java.util.Date -import scala.collection.mutable.HashSet - import com.fasterxml.jackson.annotation.JsonIgnore import com.fasterxml.jackson.databind.annotation.JsonDeserialize @@ -180,12 +178,10 @@ private[spark] class TaskDataWrapper( val accumulatorUpdates: Seq[AccumulableInfo], val errorMessage: Option[String], - val hasMetrics: Boolean, // Non successful metrics now will have negative values in `TaskDataWrapper`. // `TaskData` will have actual metric values. To recover the actual metric value - // from `TaskDataWrapper`, need use `getMetricValue` method. parameter `handleZero` is to - // check whether the index has zero metric value, which is used in the `getMetricValue`. - val handleZero: HashSet[String], + // from `TaskDataWrapper`, need use `getMetricValue` method. + val hasMetrics: Boolean, // The following is an exploded view of a TaskMetrics API object. This saves 5 objects // (= 80 bytes of Java object overhead) per instance of this wrapper. If the first value // (executorDeserializeTime) is -1L, it means the metrics for this task have not been @@ -243,46 +239,45 @@ private[spark] class TaskDataWrapper( val stageAttemptId: Int) { // SPARK-26260: To handle non successful tasks metrics (Running, Failed, Killed). - private def getMetricValue(metric: Long, index: String): Long = { - if (status != "SUCCESS" && handleZero(index)) { - 0L + private def getMetricValue(metric: Long): Long = { + if (status != "SUCCESS") { + math.abs(metric + 1) } else { - math.abs(metric) + metric } } def toApi: TaskData = { val metrics = if (hasMetrics) { Some(new TaskMetrics( - getMetricValue(executorDeserializeTime, TaskIndexNames.DESER_TIME), - getMetricValue(executorDeserializeCpuTime, TaskIndexNames.DESER_CPU_TIME), - getMetricValue(executorRunTime, TaskIndexNames.EXEC_RUN_TIME), - getMetricValue(executorCpuTime, TaskIndexNames.EXEC_CPU_TIME), - getMetricValue(resultSize, TaskIndexNames.RESULT_SIZE), - getMetricValue(jvmGcTime, TaskIndexNames.GC_TIME), - getMetricValue(resultSerializationTime, TaskIndexNames.SER_TIME), - getMetricValue(memoryBytesSpilled, TaskIndexNames.MEM_SPILL), - getMetricValue(diskBytesSpilled, TaskIndexNames.DISK_SPILL), - getMetricValue(peakExecutionMemory, TaskIndexNames.PEAK_MEM), + getMetricValue(executorDeserializeTime), + getMetricValue(executorDeserializeCpuTime), + getMetricValue(executorRunTime), + getMetricValue(executorCpuTime), + getMetricValue(resultSize), + getMetricValue(jvmGcTime), + getMetricValue(resultSerializationTime), + getMetricValue(memoryBytesSpilled), + getMetricValue(diskBytesSpilled), + getMetricValue(peakExecutionMemory), new InputMetrics( - getMetricValue(inputBytesRead, TaskIndexNames.INPUT_SIZE), - getMetricValue(inputRecordsRead, TaskIndexNames.INPUT_RECORDS)), + getMetricValue(inputBytesRead), + getMetricValue(inputRecordsRead)), new OutputMetrics( - getMetricValue(outputBytesWritten, TaskIndexNames.OUTPUT_SIZE), - getMetricValue(outputRecordsWritten, TaskIndexNames.OUTPUT_RECORDS)), + getMetricValue(outputBytesWritten), + getMetricValue(outputRecordsWritten)), new ShuffleReadMetrics( - getMetricValue(shuffleRemoteBlocksFetched, TaskIndexNames.SHUFFLE_REMOTE_BLOCKS), - getMetricValue(shuffleLocalBlocksFetched, TaskIndexNames.SHUFFLE_LOCAL_BLOCKS), - getMetricValue(shuffleFetchWaitTime, TaskIndexNames.SHUFFLE_READ_TIME), - getMetricValue(shuffleRemoteBytesRead, TaskIndexNames.SHUFFLE_REMOTE_READS), - getMetricValue(shuffleRemoteBytesReadToDisk, - TaskIndexNames.SHUFFLE_REMOTE_READS_TO_DISK), - getMetricValue(shuffleLocalBytesRead, TaskIndexNames.SHUFFLE_LOCAL_READ), - getMetricValue(shuffleRecordsRead, TaskIndexNames.SHUFFLE_READ_RECORDS)), + getMetricValue(shuffleRemoteBlocksFetched), + getMetricValue(shuffleLocalBlocksFetched), + getMetricValue(shuffleFetchWaitTime), + getMetricValue(shuffleRemoteBytesRead), + getMetricValue(shuffleRemoteBytesReadToDisk), + getMetricValue(shuffleLocalBytesRead), + getMetricValue(shuffleRecordsRead)), new ShuffleWriteMetrics( - getMetricValue(shuffleBytesWritten, TaskIndexNames.SHUFFLE_WRITE_SIZE), - getMetricValue(shuffleWriteTime, TaskIndexNames.SHUFFLE_WRITE_TIME), - getMetricValue(shuffleRecordsWritten, TaskIndexNames.SHUFFLE_WRITE_RECORDS)))) + getMetricValue(shuffleBytesWritten), + getMetricValue(shuffleWriteTime), + getMetricValue(shuffleRecordsWritten)))) } else { None } @@ -314,9 +309,9 @@ private[spark] class TaskDataWrapper( def schedulerDelay: Long = { if (hasMetrics) { AppStatusUtils.schedulerDelay(launchTime, resultFetchStart, duration, - getMetricValue(executorDeserializeTime, TaskIndexNames.DESER_TIME), - getMetricValue(resultSerializationTime, TaskIndexNames.SER_TIME), - getMetricValue(executorRunTime, TaskIndexNames.EXEC_RUN_TIME)) + getMetricValue(executorDeserializeTime), + getMetricValue(resultSerializationTime), + getMetricValue(executorRunTime)) } else { -1L } @@ -349,8 +344,7 @@ private[spark] class TaskDataWrapper( @JsonIgnore @KVIndex(value = TaskIndexNames.SHUFFLE_TOTAL_READS, parent = TaskIndexNames.STAGE) private def shuffleTotalReads: Long = { if (hasMetrics) { - getMetricValue(shuffleLocalBytesRead, TaskIndexNames.SHUFFLE_LOCAL_READ) + - getMetricValue(shuffleRemoteBytesRead, TaskIndexNames.SHUFFLE_REMOTE_READS) + getMetricValue(shuffleLocalBytesRead) + getMetricValue(shuffleRemoteBytesRead) } else { -1L } @@ -359,8 +353,7 @@ private[spark] class TaskDataWrapper( @JsonIgnore @KVIndex(value = TaskIndexNames.SHUFFLE_TOTAL_BLOCKS, parent = TaskIndexNames.STAGE) private def shuffleTotalBlocks: Long = { if (hasMetrics) { - getMetricValue(shuffleLocalBlocksFetched, TaskIndexNames.SHUFFLE_LOCAL_BLOCKS) + - getMetricValue(shuffleRemoteBlocksFetched, TaskIndexNames.SHUFFLE_REMOTE_BLOCKS) + getMetricValue(shuffleLocalBlocksFetched) + getMetricValue(shuffleRemoteBlocksFetched) } else { -1L } diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala index 5567dbfe537bb..0b6f1003d9ccf 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.status -import scala.collection.mutable.HashSet - import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.status.LiveEntityHelpers.makeNegative import org.apache.spark.status.api.v1 @@ -194,10 +192,9 @@ class AppStatusStoreSuite extends SparkFunSuite { new ShuffleWriteMetrics(i, i, i)) val hasMetrics = i >= 0 - val handleZero = HashSet[String]() val taskMetrics: v1.TaskMetrics = if (hasMetrics && status != "SUCCESS") { - makeNegative(metrics, handleZero) + makeNegative(metrics) } else { metrics } @@ -205,7 +202,6 @@ class AppStatusStoreSuite extends SparkFunSuite { new TaskDataWrapper( i.toLong, i, i, i, i, i, i.toString, i.toString, status, i.toString, false, Nil, None, hasMetrics, - handleZero, taskMetrics.executorDeserializeTime, taskMetrics.executorDeserializeCpuTime, taskMetrics.executorRunTime, From af7244e58507cd2243d0f06e57bcf845bc2df1a9 Mon Sep 17 00:00:00 2001 From: shahid Date: Sun, 17 Nov 2019 20:41:30 +0530 Subject: [PATCH 09/14] minor update --- .../org/apache/spark/status/LiveEntity.scala | 4 ++-- .../org/apache/spark/status/storeTypes.scala | 1 - .../spark/status/AppStatusStoreSuite.scala | 19 ++++++++++--------- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 31b1fc4b27dee..6369323fd5db0 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -731,8 +731,8 @@ private object LiveEntityHelpers { def makeNegative(m: v1.TaskMetrics): v1.TaskMetrics = { // To handle 0 metric value, add 1 and make the metric negative. // To recover actual value do `math.abs(metric + 1)` - // Eg: if the metric values are (5, 3, 0, 1) => Updated metrics value will be (-6, -4, -1, -2) - // To get actual metric value math.abs(metric +1) => (5, 3, 0, 1) + // Eg: if the metric values are (5, 3, 0, 1) => Updated metric values will be (-6, -4, -1, -2) + // To get actual metric value, do math.abs(metric + 1) => (5, 3, 0, 1) def updateMetricValue(metric: Long): Long = { metric * -1L - 1L } diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index f6035cb3a55b7..e0f7ecdc48db4 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -137,7 +137,6 @@ private[spark] object TaskIndexNames { final val SHUFFLE_WRITE_RECORDS = "swr" final val SHUFFLE_WRITE_SIZE = "sws" final val SHUFFLE_WRITE_TIME = "swt" - final val SHUFFLE_LOCAL_READ = "slr" final val STAGE = "stage" final val STATUS = "sta" final val TASK_INDEX = "idx" diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala index 0b6f1003d9ccf..c7c5f44987111 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala @@ -131,6 +131,16 @@ class AppStatusStoreSuite extends SparkFunSuite { } } + + test("SPARK-26260: task summary size for default metrics should be zero") { + val store = new InMemoryStore() + store.write(newTaskData(-1, status = "RUNNING")) + Seq(new AppStatusStore(store), createLiveStore(store)).foreach { appStore => + val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles) + assert(summary.size === 0) + } + } + test("SPARK-26260: summary should contain successful tasks only when with LevelDB store") { val testDir = Utils.createTempDir() val diskStore = KVUtils.open(testDir, getClass().getName()) @@ -157,15 +167,6 @@ class AppStatusStoreSuite extends SparkFunSuite { Utils.deleteRecursively(testDir) } - test("SPARK-26260: task summary size for default metrics should be zero") { - val store = new InMemoryStore() - store.write(newTaskData(-1, status = "RUNNING")) - Seq(new AppStatusStore(store), createLiveStore(store)).foreach { appStore => - val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles) - assert(summary.size === 0) - } - } - private def compareQuantiles(count: Int, quantiles: Array[Double]): Unit = { val store = new InMemoryStore() val values = (0 until count).map { i => From 5951c9e6323c3a80e4fc82fbfd38a18c21b61359 Mon Sep 17 00:00:00 2001 From: shahid Date: Tue, 19 Nov 2019 14:19:50 +0530 Subject: [PATCH 10/14] refactor AppStatusStoreSuite --- .../org/apache/spark/status/LiveEntity.scala | 2 +- .../spark/status/AppStatusStoreSuite.scala | 112 ++++++++---------- 2 files changed, 50 insertions(+), 64 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 6369323fd5db0..5ac7a56b216f2 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -187,7 +187,7 @@ private class LiveTask( val hasMetrics = metrics.executorDeserializeTime >= 0 /** - * SPARK-26260: For non successful tasks, store the metrics as negetive to avoid + * SPARK-26260: For non successful tasks, store the metrics as negative to avoid * the calculation in the task summary. `toApi` method in the `TaskDataWrapper` will make * it actual value. */ diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala index c7c5f44987111..e74922356212d 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala @@ -79,47 +79,47 @@ class AppStatusStoreSuite extends SparkFunSuite { assert(store.count(classOf[CachedQuantile]) === 2) } - private def createLiveStore(inMemoryStore: InMemoryStore): AppStatusStore = { + private def createAppStore(store: KVStore, live: Boolean = false): AppStatusStore = { val conf = new SparkConf() - val store = new ElementTrackingStore(inMemoryStore, conf) - val listener = new AppStatusListener(store, conf, true, None) - new AppStatusStore(store, listener = Some(listener)) - } - - test("SPARK-28638: only successful tasks have taskSummary when with in memory kvstore") { - val store = new InMemoryStore() - (0 until 5).foreach { i => store.write(newTaskData(i, status = "FAILED")) } - Seq(new AppStatusStore(store), createLiveStore(store)).foreach { appStore => - val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles) - assert(summary.size === 0) + if (live) { + AppStatusStore.createLiveStore(conf) + } else { + new AppStatusStore(store) } } - test("SPARK-26260: only successful tasks have taskSummary when with disk kvstore (LevelDB)") { + test("SPARK-26260: task summary should contain only successful tasks' metrics") { val testDir = Utils.createTempDir() - val diskStore = KVUtils.open(testDir, getClass().getName()) - - (0 until 5).foreach { i => diskStore.write(newTaskData(i, status = "FAILED")) } - Seq(new AppStatusStore(diskStore)).foreach { appStore => - val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles) - assert(summary.size === 0) - } - diskStore.close() - Utils.deleteRecursively(testDir) - } - - test("SPARK-28638: summary should contain successful tasks only when with in memory kvstore") { - val store = new InMemoryStore() - - for (i <- 0 to 5) { - if (i % 2 == 1) { - store.write(newTaskData(i, status = "FAILED")) - } else { - store.write(newTaskData(i)) + val diskStore = KVUtils.open(testDir, getClass.getName) + val inMemoryStore = new InMemoryStore + + val historyDiskAppStore = createAppStore(diskStore) + val historyInMemoryAppStore = createAppStore(inMemoryStore) + val liveAppStore = createAppStore(inMemoryStore, live = true) + + Seq(historyDiskAppStore, historyInMemoryAppStore, liveAppStore).foreach { appStore => + val store = appStore.store + // Success and failed tasks metrics + for (i <- 0 to 5) { + if (i % 2 == 1) { + store.write(newTaskData(i, status = "FAILED")) + } else { + store.write(newTaskData(i, status = "SUCCESS")) + } + } + // Running tasks metrics (default metrics, positive metrics) + Seq(-1, 6).foreach { metric => + store.write(newTaskData(metric, status = "RUNNING")) } - } - Seq(new AppStatusStore(store), createLiveStore(store)).foreach { appStore => + /** + * Following are the tasks metrics, + * 0, 2, 4 => Success + * 1, 3, 5 => Failed + * -1, 6 => Running + * + * Task summary will consider (0, 2, 4) only + */ val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles).get val values = Array(0.0, 2.0, 4.0) @@ -128,42 +128,28 @@ class AppStatusStoreSuite extends SparkFunSuite { dist.zip(summary.executorRunTime).foreach { case (expected, actual) => assert(expected === actual) } + appStore.close() } + Utils.deleteRecursively(testDir) } - - test("SPARK-26260: task summary size for default metrics should be zero") { - val store = new InMemoryStore() - store.write(newTaskData(-1, status = "RUNNING")) - Seq(new AppStatusStore(store), createLiveStore(store)).foreach { appStore => - val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles) - assert(summary.size === 0) - } - } - - test("SPARK-26260: summary should contain successful tasks only when with LevelDB store") { + test("SPARK-26260: task summary should be empty for non-successful tasks") { + // This test will check for 0 metric value for failed task val testDir = Utils.createTempDir() - val diskStore = KVUtils.open(testDir, getClass().getName()) - - for (i <- 0 to 5) { - if (i % 2 == 1) { - diskStore.write(newTaskData(i, status = "FAILED")) - } else { - diskStore.write(newTaskData(i)) - } - } + val diskStore = KVUtils.open(testDir, getClass.getName) + val inMemoryStore = new InMemoryStore - Seq(new AppStatusStore(diskStore)).foreach { appStore => - val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles).get - - val values = Array(0.0, 2.0, 4.0) + val historyDiskAppStore = createAppStore(diskStore) + val historyInMemoryAppStore = createAppStore(inMemoryStore) + val liveAppStore = createAppStore(inMemoryStore, live = true) - val dist = new Distribution(values, 0, values.length).getQuantiles(uiQuantiles.sorted) - dist.zip(summary.executorRunTime).foreach { case (expected, actual) => - assert(expected === actual) - } + Seq(historyDiskAppStore, historyInMemoryAppStore, liveAppStore).foreach { appStore => + val store = appStore.store + (0 until 5).foreach { i => store.write(newTaskData(i, status = "FAILED")) } + val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles) + assert(summary.size === 0) + appStore.close() } - diskStore.close() Utils.deleteRecursively(testDir) } From 4032471a419213b71da6aa6af3dc4ee41f9fa62c Mon Sep 17 00:00:00 2001 From: shahid Date: Thu, 21 Nov 2019 06:46:40 +0530 Subject: [PATCH 11/14] hide pool info for history server --- .../org/apache/spark/ui/jobs/AllStagesPage.scala | 6 +++--- .../scala/org/apache/spark/ui/jobs/StageTable.scala | 13 ++++++------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala index f672ce0ec6a68..31dabf9a8df4d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala @@ -30,7 +30,7 @@ import org.apache.spark.ui.{UIUtils, WebUIPage} private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { private val sc = parent.sc private val subPath = "stages" - private def isFairScheduler = parent.isFairScheduler + private def showPoolInfo = sc.isDefined && parent.isFairScheduler def render(request: HttpServletRequest): Seq[Node] = { // For now, pool information is only accessible in live UIs @@ -57,7 +57,7 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { - val poolsDescription = if (sc.isDefined && isFairScheduler) { + val poolsDescription = if (showPoolInfo) {

@@ -96,7 +96,7 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { val stagesTable = new StageTableBase(parent.store, request, stages, statusName(status), stageTag(status), - parent.basePath, subPath, parent.isFairScheduler, killEnabled, isFailedStage) + parent.basePath, subPath, showPoolInfo, killEnabled, isFailedStage) val stagesSize = stages.size (Some(summary(appSummary, status, stagesSize)), Some(table(appSummary, status, stagesTable, stagesSize))) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 09a215ba9f03d..18557f52142ef 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -40,13 +40,12 @@ private[ui] class StageTableBase( stageTag: String, basePath: String, subPath: String, - isFairScheduler: Boolean, + showPoolInfo: Boolean, killEnabled: Boolean, isFailedStage: Boolean) { val parameterOtherTable = request.getParameterMap().asScala .filterNot(_._1.startsWith(stageTag)) .map(para => para._1 + "=" + para._2(0)) - val parameterStagePage = request.getParameter(stageTag + ".page") val parameterStageSortColumn = request.getParameter(stageTag + ".sort") val parameterStageSortDesc = request.getParameter(stageTag + ".desc") @@ -72,7 +71,7 @@ private[ui] class StageTableBase( stageTag, basePath, subPath, - isFairScheduler, + showPoolInfo, killEnabled, currentTime, stagePageSize, @@ -128,7 +127,7 @@ private[ui] class StagePagedTable( stageTag: String, basePath: String, subPath: String, - isFairScheduler: Boolean, + showPoolInfo: Boolean, killEnabled: Boolean, currentTime: Long, pageSize: Int, @@ -181,7 +180,7 @@ private[ui] class StagePagedTable( // Otherwise, it has two parts: tooltip text, and position (true for left, false for default). val stageHeadersAndCssClasses: Seq[(String, String, Boolean)] = Seq(("Stage Id", null, true)) ++ - {if (isFairScheduler) {Seq(("Pool Name", null, true))} else Seq.empty} ++ + {if (showPoolInfo) {Seq(("Pool Name", null, true))} else Seq.empty} ++ Seq( ("Description", null, true), ("Submitted", null, true), ("Duration", null, true), ("Tasks: Succeeded/Total", null, false), @@ -263,7 +262,7 @@ private[ui] class StagePagedTable( } else { {data.stageId} }} ++ - {if (isFairScheduler) { + {if (showPoolInfo) { @@ -370,7 +369,7 @@ private[ui] class StagePagedTable( protected def missingStageRow(stageId: Int): Seq[Node] = { {stageId} ++ - {if (isFairScheduler) {-} else Seq.empty} ++ + {if (showPoolInfo) {-} else Seq.empty} ++ No data available for this stage ++ // Description ++ // Submitted ++ // Duration From d11859c4db3e32b73f9be200365664afc633f4c6 Mon Sep 17 00:00:00 2001 From: shahid Date: Fri, 22 Nov 2019 16:51:11 +0530 Subject: [PATCH 12/14] address comments --- .../spark/status/AppStatusStoreSuite.scala | 159 ++++++++---------- 1 file changed, 74 insertions(+), 85 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala index e74922356212d..d629522fe87df 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala @@ -18,9 +18,8 @@ package org.apache.spark.status import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.status.LiveEntityHelpers.makeNegative -import org.apache.spark.status.api.v1 -import org.apache.spark.status.api.v1.{InputMetrics, OutputMetrics, ShuffleReadMetrics, ShuffleWriteMetrics} +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.scheduler.{TaskInfo, TaskLocality} import org.apache.spark.util.{Distribution, Utils} import org.apache.spark.util.kvstore._ @@ -79,50 +78,55 @@ class AppStatusStoreSuite extends SparkFunSuite { assert(store.count(classOf[CachedQuantile]) === 2) } - private def createAppStore(store: KVStore, live: Boolean = false): AppStatusStore = { + private def createAppStore(disk: Boolean, live: Boolean): AppStatusStore = { val conf = new SparkConf() if (live) { - AppStatusStore.createLiveStore(conf) + return AppStatusStore.createLiveStore(conf) + } + + val store: KVStore = if (disk) { + val testDir = Utils.createTempDir() + val diskStore = KVUtils.open(testDir, getClass.getName) + new ElementTrackingStore(diskStore, conf) } else { - new AppStatusStore(store) + new ElementTrackingStore(new InMemoryStore, conf) } + new AppStatusStore(store) } - test("SPARK-26260: task summary should contain only successful tasks' metrics") { - val testDir = Utils.createTempDir() - val diskStore = KVUtils.open(testDir, getClass.getName) - val inMemoryStore = new InMemoryStore - - val historyDiskAppStore = createAppStore(diskStore) - val historyInMemoryAppStore = createAppStore(inMemoryStore) - val liveAppStore = createAppStore(inMemoryStore, live = true) - - Seq(historyDiskAppStore, historyInMemoryAppStore, liveAppStore).foreach { appStore => + Seq( + "disk" -> createAppStore(disk = true, live = false), + "in memory" -> createAppStore(disk = false, live = false), + "in memory live" -> createAppStore(disk = false, live = true) + ).foreach { case (hint, appStore) => + test(s"SPARK-26260: summary should contain only successful tasks' metrics (store = $hint") { val store = appStore.store + // Success and failed tasks metrics for (i <- 0 to 5) { - if (i % 2 == 1) { - store.write(newTaskData(i, status = "FAILED")) + if (i % 2 == 0) { + writeTaskDataToStore(i, store, "FAILED") } else { - store.write(newTaskData(i, status = "SUCCESS")) + writeTaskDataToStore(i, store, "SUCCESS") } } - // Running tasks metrics (default metrics, positive metrics) + + // Running tasks metrics (-1 = no metrics reported, positive = metrics have been reported) Seq(-1, 6).foreach { metric => - store.write(newTaskData(metric, status = "RUNNING")) + writeTaskDataToStore(metric, store, "RUNNING") } /** * Following are the tasks metrics, - * 0, 2, 4 => Success - * 1, 3, 5 => Failed + * 1, 3, 5 => Success + * 0, 2, 4 => Failed * -1, 6 => Running * - * Task summary will consider (0, 2, 4) only + * Task summary will consider (1, 3, 5) only */ val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles).get - val values = Array(0.0, 2.0, 4.0) + val values = Array(1.0, 3.0, 5.0) val dist = new Distribution(values, 0, values.length).getQuantiles(uiQuantiles.sorted) dist.zip(summary.executorRunTime).foreach { case (expected, actual) => @@ -130,27 +134,6 @@ class AppStatusStoreSuite extends SparkFunSuite { } appStore.close() } - Utils.deleteRecursively(testDir) - } - - test("SPARK-26260: task summary should be empty for non-successful tasks") { - // This test will check for 0 metric value for failed task - val testDir = Utils.createTempDir() - val diskStore = KVUtils.open(testDir, getClass.getName) - val inMemoryStore = new InMemoryStore - - val historyDiskAppStore = createAppStore(diskStore) - val historyInMemoryAppStore = createAppStore(inMemoryStore) - val liveAppStore = createAppStore(inMemoryStore, live = true) - - Seq(historyDiskAppStore, historyInMemoryAppStore, liveAppStore).foreach { appStore => - val store = appStore.store - (0 until 5).foreach { i => store.write(newTaskData(i, status = "FAILED")) } - val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles) - assert(summary.size === 0) - appStore.close() - } - Utils.deleteRecursively(testDir) } private def compareQuantiles(count: Int, quantiles: Array[Double]): Unit = { @@ -170,49 +153,55 @@ class AppStatusStoreSuite extends SparkFunSuite { } private def newTaskData(i: Int, status: String = "SUCCESS"): TaskDataWrapper = { - - val metrics = new v1.TaskMetrics( + new TaskDataWrapper( + i.toLong, i, i, i, i, i, i.toString, i.toString, status, i.toString, false, Nil, None, true, + i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, i, - new InputMetrics(i, i), - new OutputMetrics(i, i), - new ShuffleReadMetrics(i, i, i, i, i, i, i), - new ShuffleWriteMetrics(i, i, i)) + i, i, i, i, stageId, attemptId) + } - val hasMetrics = i >= 0 + private def writeTaskDataToStore(i: Int, store: KVStore, status: String): Unit = { + val liveTask = new LiveTask(new TaskInfo( i.toLong, i, i, i.toLong, i.toString, + i.toString, TaskLocality.ANY, false), stageId, attemptId, None) - val taskMetrics: v1.TaskMetrics = if (hasMetrics && status != "SUCCESS") { - makeNegative(metrics) - } else { - metrics + if (status == "SUCCESS") { + liveTask.info.finishTime = 1L + } else if (status == "FAILED") { + liveTask.info.failed = true + liveTask.info.finishTime = 1L } - new TaskDataWrapper( - i.toLong, i, i, i, i, i, i.toString, i.toString, status, i.toString, false, Nil, None, - hasMetrics, - taskMetrics.executorDeserializeTime, - taskMetrics.executorDeserializeCpuTime, - taskMetrics.executorRunTime, - taskMetrics.executorCpuTime, - taskMetrics.resultSize, - taskMetrics.jvmGcTime, - taskMetrics.resultSerializationTime, - taskMetrics.memoryBytesSpilled, - taskMetrics.diskBytesSpilled, - taskMetrics.peakExecutionMemory, - taskMetrics.inputMetrics.bytesRead, - taskMetrics.inputMetrics.recordsRead, - taskMetrics.outputMetrics.bytesWritten, - taskMetrics.outputMetrics.recordsWritten, - taskMetrics.shuffleReadMetrics.remoteBlocksFetched, - taskMetrics.shuffleReadMetrics.localBlocksFetched, - taskMetrics.shuffleReadMetrics.fetchWaitTime, - taskMetrics.shuffleReadMetrics.remoteBytesRead, - taskMetrics.shuffleReadMetrics.remoteBytesReadToDisk, - taskMetrics.shuffleReadMetrics.localBytesRead, - taskMetrics.shuffleReadMetrics.recordsRead, - taskMetrics.shuffleWriteMetrics.bytesWritten, - taskMetrics.shuffleWriteMetrics.writeTime, - taskMetrics.shuffleWriteMetrics.recordsWritten, - stageId, attemptId) + val taskMetrics = getTaskMetrics(i) + liveTask.updateMetrics(taskMetrics) + liveTask.write(store.asInstanceOf[ElementTrackingStore], 1L) + } + + private def getTaskMetrics(i: Int): TaskMetrics = { + val taskMetrics = new TaskMetrics() + taskMetrics.setExecutorDeserializeTime(i) + taskMetrics.setExecutorDeserializeCpuTime(i) + taskMetrics.setExecutorRunTime(i) + taskMetrics.setExecutorCpuTime(i) + taskMetrics.setResultSize(i) + taskMetrics.setJvmGCTime(i) + taskMetrics.setResultSerializationTime(i) + taskMetrics.incMemoryBytesSpilled(i) + taskMetrics.incDiskBytesSpilled(i) + taskMetrics.incPeakExecutionMemory(i) + taskMetrics.inputMetrics.incBytesRead(i) + taskMetrics.inputMetrics.incRecordsRead(i) + taskMetrics.outputMetrics.setBytesWritten(i) + taskMetrics.outputMetrics.setRecordsWritten(i) + taskMetrics.shuffleReadMetrics.incRemoteBlocksFetched(i) + taskMetrics.shuffleReadMetrics.incLocalBlocksFetched(i) + taskMetrics.shuffleReadMetrics.incFetchWaitTime(i) + taskMetrics.shuffleReadMetrics.incRemoteBytesRead(i) + taskMetrics.shuffleReadMetrics.incRemoteBytesReadToDisk(i) + taskMetrics.shuffleReadMetrics.incLocalBytesRead(i) + taskMetrics.shuffleReadMetrics.incRecordsRead(i) + taskMetrics.shuffleWriteMetrics.incBytesWritten(i) + taskMetrics.shuffleWriteMetrics.incWriteTime(i) + taskMetrics.shuffleWriteMetrics.incRecordsWritten(i) + taskMetrics } } From 07e43506ce51b5651c00e927da7cc72a34720196 Mon Sep 17 00:00:00 2001 From: shahid Date: Fri, 22 Nov 2019 19:36:04 +0530 Subject: [PATCH 13/14] nit --- .../scala/org/apache/spark/status/AppStatusStoreSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala index d629522fe87df..29f943adf205e 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala @@ -99,7 +99,7 @@ class AppStatusStoreSuite extends SparkFunSuite { "in memory" -> createAppStore(disk = false, live = false), "in memory live" -> createAppStore(disk = false, live = true) ).foreach { case (hint, appStore) => - test(s"SPARK-26260: summary should contain only successful tasks' metrics (store = $hint") { + test(s"SPARK-26260: summary should contain only successful tasks' metrics (store = $hint)") { val store = appStore.store // Success and failed tasks metrics From 8c5a37d02673a388afcb868a7367b8505f186692 Mon Sep 17 00:00:00 2001 From: shahid Date: Sat, 23 Nov 2019 17:46:28 +0530 Subject: [PATCH 14/14] address comment --- .../scala/org/apache/spark/status/storeTypes.scala | 11 +++++------ .../org/apache/spark/ui/jobs/AllStagesPage.scala | 6 +++--- .../scala/org/apache/spark/ui/jobs/StageTable.scala | 13 +++++++------ .../apache/spark/status/AppStatusStoreSuite.scala | 2 +- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index e0f7ecdc48db4..f0a94d84d8a04 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -177,14 +177,13 @@ private[spark] class TaskDataWrapper( val accumulatorUpdates: Seq[AccumulableInfo], val errorMessage: Option[String], - // Non successful metrics now will have negative values in `TaskDataWrapper`. - // `TaskData` will have actual metric values. To recover the actual metric value - // from `TaskDataWrapper`, need use `getMetricValue` method. val hasMetrics: Boolean, // The following is an exploded view of a TaskMetrics API object. This saves 5 objects - // (= 80 bytes of Java object overhead) per instance of this wrapper. If the first value - // (executorDeserializeTime) is -1L, it means the metrics for this task have not been - // recorded. + // (= 80 bytes of Java object overhead) per instance of this wrapper. Non successful + // tasks' metrics will have negative values in `TaskDataWrapper`. `TaskData` will have + // actual metric values. To recover the actual metric value from `TaskDataWrapper`, + // need use `getMetricValue` method. If `hasMetrics` is false, it means the metrics + // for this task have not been recorded. @KVIndexParam(value = TaskIndexNames.DESER_TIME, parent = TaskIndexNames.STAGE) val executorDeserializeTime: Long, @KVIndexParam(value = TaskIndexNames.DESER_CPU_TIME, parent = TaskIndexNames.STAGE) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala index 31dabf9a8df4d..f672ce0ec6a68 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala @@ -30,7 +30,7 @@ import org.apache.spark.ui.{UIUtils, WebUIPage} private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { private val sc = parent.sc private val subPath = "stages" - private def showPoolInfo = sc.isDefined && parent.isFairScheduler + private def isFairScheduler = parent.isFairScheduler def render(request: HttpServletRequest): Seq[Node] = { // For now, pool information is only accessible in live UIs @@ -57,7 +57,7 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { - val poolsDescription = if (showPoolInfo) { + val poolsDescription = if (sc.isDefined && isFairScheduler) {

@@ -96,7 +96,7 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { val stagesTable = new StageTableBase(parent.store, request, stages, statusName(status), stageTag(status), - parent.basePath, subPath, showPoolInfo, killEnabled, isFailedStage) + parent.basePath, subPath, parent.isFairScheduler, killEnabled, isFailedStage) val stagesSize = stages.size (Some(summary(appSummary, status, stagesSize)), Some(table(appSummary, status, stagesTable, stagesSize))) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 18557f52142ef..09a215ba9f03d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -40,12 +40,13 @@ private[ui] class StageTableBase( stageTag: String, basePath: String, subPath: String, - showPoolInfo: Boolean, + isFairScheduler: Boolean, killEnabled: Boolean, isFailedStage: Boolean) { val parameterOtherTable = request.getParameterMap().asScala .filterNot(_._1.startsWith(stageTag)) .map(para => para._1 + "=" + para._2(0)) + val parameterStagePage = request.getParameter(stageTag + ".page") val parameterStageSortColumn = request.getParameter(stageTag + ".sort") val parameterStageSortDesc = request.getParameter(stageTag + ".desc") @@ -71,7 +72,7 @@ private[ui] class StageTableBase( stageTag, basePath, subPath, - showPoolInfo, + isFairScheduler, killEnabled, currentTime, stagePageSize, @@ -127,7 +128,7 @@ private[ui] class StagePagedTable( stageTag: String, basePath: String, subPath: String, - showPoolInfo: Boolean, + isFairScheduler: Boolean, killEnabled: Boolean, currentTime: Long, pageSize: Int, @@ -180,7 +181,7 @@ private[ui] class StagePagedTable( // Otherwise, it has two parts: tooltip text, and position (true for left, false for default). val stageHeadersAndCssClasses: Seq[(String, String, Boolean)] = Seq(("Stage Id", null, true)) ++ - {if (showPoolInfo) {Seq(("Pool Name", null, true))} else Seq.empty} ++ + {if (isFairScheduler) {Seq(("Pool Name", null, true))} else Seq.empty} ++ Seq( ("Description", null, true), ("Submitted", null, true), ("Duration", null, true), ("Tasks: Succeeded/Total", null, false), @@ -262,7 +263,7 @@ private[ui] class StagePagedTable( } else { {data.stageId} }} ++ - {if (showPoolInfo) { + {if (isFairScheduler) { @@ -369,7 +370,7 @@ private[ui] class StagePagedTable( protected def missingStageRow(stageId: Int): Seq[Node] = { {stageId} ++ - {if (showPoolInfo) {-} else Seq.empty} ++ + {if (isFairScheduler) {-} else Seq.empty} ++ No data available for this stage ++ // Description ++ // Submitted ++ // Duration diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala index 29f943adf205e..735e51942626f 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala @@ -161,7 +161,7 @@ class AppStatusStoreSuite extends SparkFunSuite { } private def writeTaskDataToStore(i: Int, store: KVStore, status: String): Unit = { - val liveTask = new LiveTask(new TaskInfo( i.toLong, i, i, i.toLong, i.toString, + val liveTask = new LiveTask(new TaskInfo( i.toLong, i, i, i.toLong, i.toString, i.toString, TaskLocality.ANY, false), stageId, attemptId, None) if (status == "SUCCESS") {