From da39a32955eee6598e5a927c5075fd34c2fb25c3 Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Wed, 23 Feb 2022 15:45:19 -0800 Subject: [PATCH 1/2] fixed issue and added unit test --- .../org/apache/spark/status/storeTypes.scala | 4 +- .../spark/status/AppStatusStoreSuite.scala | 108 +++++++++++++----- 2 files changed, 81 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index 103e4bab411e5..39bf593274904 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -344,7 +344,7 @@ private[spark] class TaskDataWrapper( @JsonIgnore @KVIndex(value = TaskIndexNames.SHUFFLE_TOTAL_READS, parent = TaskIndexNames.STAGE) private def shuffleTotalReads: Long = { if (hasMetrics) { - getMetricValue(shuffleLocalBytesRead) + getMetricValue(shuffleRemoteBytesRead) + shuffleLocalBytesRead + shuffleRemoteBytesRead } else { -1L } @@ -353,7 +353,7 @@ private[spark] class TaskDataWrapper( @JsonIgnore @KVIndex(value = TaskIndexNames.SHUFFLE_TOTAL_BLOCKS, parent = TaskIndexNames.STAGE) private def shuffleTotalBlocks: Long = { if (hasMetrics) { - getMetricValue(shuffleLocalBlocksFetched) + getMetricValue(shuffleRemoteBlocksFetched) + shuffleLocalBlocksFetched + shuffleRemoteBlocksFetched } else { -1L } diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala index 798cff8d60fcd..53b01313d5d4c 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.status +import scala.util.Random + import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.executor.TaskMetrics import org.apache.spark.internal.config.History.{HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend} @@ -137,13 +139,52 @@ class AppStatusStoreSuite extends SparkFunSuite { * Task summary will consider (1, 3, 5) only */ val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles).get + val successfulTasks = Array(getTaskMetrics(1), getTaskMetrics(3), getTaskMetrics(5)) - val values = Array(1.0, 3.0, 5.0) + def assertQuantiles(metricGetter: TaskMetrics => Double, + actualQuantiles: Seq[Double]): Unit = { + val values = successfulTasks.map(metricGetter) + val expectedQuantiles = new Distribution(values, 0, values.length) + .getQuantiles(uiQuantiles.sorted) - val dist = new Distribution(values, 0, values.length).getQuantiles(uiQuantiles.sorted) - dist.zip(summary.executorRunTime).foreach { case (expected, actual) => - assert(expected === actual) + assert(actualQuantiles === expectedQuantiles) } + + assertQuantiles(_.executorDeserializeTime, summary.executorDeserializeTime) + assertQuantiles(_.executorDeserializeCpuTime, summary.executorDeserializeCpuTime) + assertQuantiles(_.executorRunTime, summary.executorRunTime) + assertQuantiles(_.executorRunTime, summary.executorRunTime) + assertQuantiles(_.executorCpuTime, summary.executorCpuTime) + assertQuantiles(_.resultSize, summary.resultSize) + assertQuantiles(_.jvmGCTime, summary.jvmGcTime) + assertQuantiles(_.resultSerializationTime, summary.resultSerializationTime) + assertQuantiles(_.memoryBytesSpilled, summary.memoryBytesSpilled) + assertQuantiles(_.diskBytesSpilled, summary.diskBytesSpilled) + assertQuantiles(_.peakExecutionMemory, summary.peakExecutionMemory) + assertQuantiles(_.inputMetrics.bytesRead, summary.inputMetrics.bytesRead) + assertQuantiles(_.inputMetrics.recordsRead, summary.inputMetrics.recordsRead) + assertQuantiles(_.outputMetrics.bytesWritten, summary.outputMetrics.bytesWritten) + assertQuantiles(_.outputMetrics.recordsWritten, summary.outputMetrics.recordsWritten) + assertQuantiles(_.shuffleReadMetrics.remoteBlocksFetched, + summary.shuffleReadMetrics.remoteBlocksFetched) + assertQuantiles(_.shuffleReadMetrics.localBlocksFetched, + summary.shuffleReadMetrics.localBlocksFetched) + assertQuantiles(_.shuffleReadMetrics.fetchWaitTime, summary.shuffleReadMetrics.fetchWaitTime) + assertQuantiles(_.shuffleReadMetrics.remoteBytesRead, + summary.shuffleReadMetrics.remoteBytesRead) + assertQuantiles(_.shuffleReadMetrics.remoteBytesReadToDisk, + summary.shuffleReadMetrics.remoteBytesReadToDisk) + assertQuantiles( + t => t.shuffleReadMetrics.localBytesRead + t.shuffleReadMetrics.remoteBytesRead, + summary.shuffleReadMetrics.readBytes) + assertQuantiles( + t => t.shuffleReadMetrics.localBlocksFetched + t.shuffleReadMetrics.remoteBlocksFetched, + summary.shuffleReadMetrics.totalBlocksFetched) + assertQuantiles(_.shuffleWriteMetrics.bytesWritten, summary.shuffleWriteMetrics.writeBytes) + assertQuantiles(_.shuffleWriteMetrics.writeTime, summary.shuffleWriteMetrics.writeTime) + assertQuantiles(_.shuffleWriteMetrics.recordsWritten, + summary.shuffleWriteMetrics.writeRecords) + appStore.close() } } @@ -227,32 +268,41 @@ class AppStatusStoreSuite extends SparkFunSuite { liveTask.write(store.asInstanceOf[ElementTrackingStore], 1L) } - private def getTaskMetrics(i: Int): TaskMetrics = { + /** + * Creates fake task metrics + * @param seed The random seed. The output will be reproducible for a given seed. + * @return The test metrics object with fake data + */ + private def getTaskMetrics(seed: Int): TaskMetrics = { + val random = new Random(seed) + val randomMax = 1000 + def nextInt(): Int = random.nextInt(randomMax) + val taskMetrics = new TaskMetrics() - taskMetrics.setExecutorDeserializeTime(i) - taskMetrics.setExecutorDeserializeCpuTime(i) - taskMetrics.setExecutorRunTime(i) - taskMetrics.setExecutorCpuTime(i) - taskMetrics.setResultSize(i) - taskMetrics.setJvmGCTime(i) - taskMetrics.setResultSerializationTime(i) - taskMetrics.incMemoryBytesSpilled(i) - taskMetrics.incDiskBytesSpilled(i) - taskMetrics.incPeakExecutionMemory(i) - taskMetrics.inputMetrics.incBytesRead(i) - taskMetrics.inputMetrics.incRecordsRead(i) - taskMetrics.outputMetrics.setBytesWritten(i) - taskMetrics.outputMetrics.setRecordsWritten(i) - taskMetrics.shuffleReadMetrics.incRemoteBlocksFetched(i) - taskMetrics.shuffleReadMetrics.incLocalBlocksFetched(i) - taskMetrics.shuffleReadMetrics.incFetchWaitTime(i) - taskMetrics.shuffleReadMetrics.incRemoteBytesRead(i) - taskMetrics.shuffleReadMetrics.incRemoteBytesReadToDisk(i) - taskMetrics.shuffleReadMetrics.incLocalBytesRead(i) - taskMetrics.shuffleReadMetrics.incRecordsRead(i) - taskMetrics.shuffleWriteMetrics.incBytesWritten(i) - taskMetrics.shuffleWriteMetrics.incWriteTime(i) - taskMetrics.shuffleWriteMetrics.incRecordsWritten(i) + taskMetrics.setExecutorDeserializeTime(nextInt()) + taskMetrics.setExecutorDeserializeCpuTime(nextInt()) + taskMetrics.setExecutorRunTime(nextInt()) + taskMetrics.setExecutorCpuTime(nextInt()) + taskMetrics.setResultSize(nextInt()) + taskMetrics.setJvmGCTime(nextInt()) + taskMetrics.setResultSerializationTime(nextInt()) + taskMetrics.incMemoryBytesSpilled(nextInt()) + taskMetrics.incDiskBytesSpilled(nextInt()) + taskMetrics.incPeakExecutionMemory(nextInt()) + taskMetrics.inputMetrics.incBytesRead(nextInt()) + taskMetrics.inputMetrics.incRecordsRead(nextInt()) + taskMetrics.outputMetrics.setBytesWritten(nextInt()) + taskMetrics.outputMetrics.setRecordsWritten(nextInt()) + taskMetrics.shuffleReadMetrics.incRemoteBlocksFetched(nextInt()) + taskMetrics.shuffleReadMetrics.incLocalBlocksFetched(nextInt()) + taskMetrics.shuffleReadMetrics.incFetchWaitTime(nextInt()) + taskMetrics.shuffleReadMetrics.incRemoteBytesRead(nextInt()) + taskMetrics.shuffleReadMetrics.incRemoteBytesReadToDisk(nextInt()) + taskMetrics.shuffleReadMetrics.incLocalBytesRead(nextInt()) + taskMetrics.shuffleReadMetrics.incRecordsRead(nextInt()) + taskMetrics.shuffleWriteMetrics.incBytesWritten(nextInt()) + taskMetrics.shuffleWriteMetrics.incWriteTime(nextInt()) + taskMetrics.shuffleWriteMetrics.incRecordsWritten(nextInt()) taskMetrics } From 7715538a8743817f110dbaac8aa010dee6bd730b Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Wed, 23 Feb 2022 17:22:06 -0800 Subject: [PATCH 2/2] Trigger Build