Skip to content

Commit f52eed8

Browse files
committed
More optimisation..
1 parent 34420ea commit f52eed8

File tree

3 files changed

+68
-85
lines changed

3 files changed

+68
-85
lines changed

core/src/main/scala/org/apache/spark/status/LiveEntity.scala

Lines changed: 32 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import java.util.Date
2121
import java.util.concurrent.atomic.AtomicInteger
2222

2323
import scala.collection.immutable.{HashSet, TreeSet}
24-
import scala.collection.mutable
2524
import scala.collection.mutable.HashMap
2625

2726
import com.google.common.collect.Interners
@@ -30,7 +29,6 @@ import org.apache.spark.JobExecutionStatus
3029
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
3130
import org.apache.spark.resource.ResourceInformation
3231
import org.apache.spark.scheduler.{AccumulableInfo, StageInfo, TaskInfo}
33-
import org.apache.spark.status.TaskIndexNames._
3432
import org.apache.spark.status.api.v1
3533
import org.apache.spark.storage.{RDDInfo, StorageLevel}
3634
import org.apache.spark.ui.SparkUI
@@ -187,15 +185,14 @@ private class LiveTask(
187185
}
188186

189187
val hasMetrics = metrics.executorDeserializeTime >= 0
190-
val handleZeros = mutable.HashSet[String]()
191188

192189
/**
193190
* SPARK-26260: For non successful tasks, store the metrics as negetive to avoid
194191
* the calculation in the task summary. `toApi` method in the `TaskDataWrapper` will make
195192
* it actual value.
196193
*/
197194
val taskMetrics: v1.TaskMetrics = if (hasMetrics && !info.successful) {
198-
makeNegative(metrics, handleZeros)
195+
makeNegative(metrics)
199196
} else {
200197
metrics
201198
}
@@ -216,7 +213,6 @@ private class LiveTask(
216213
errorMessage,
217214

218215
hasMetrics,
219-
handleZeros,
220216
taskMetrics.executorDeserializeTime,
221217
taskMetrics.executorDeserializeCpuTime,
222218
taskMetrics.executorRunTime,
@@ -732,42 +728,40 @@ private object LiveEntityHelpers {
732728
* Convert all the metric values to negative as well as handle zero values.
733729
* This method assumes that all the metric values are greater than or equal to zero
734730
*/
735-
def makeNegative(m: v1.TaskMetrics, handleZeros: mutable.HashSet[String]): v1.TaskMetrics = {
736-
// If the metric value is 0, then make -1 and update the metric index in handleZeros.
737-
def updateMetricValue(metric: Long, index: String): Long = {
738-
if (metric == 0L) {
739-
handleZeros.add(index)
740-
-1L
741-
} else {
742-
metric * -1L
743-
}
731+
def makeNegative(m: v1.TaskMetrics): v1.TaskMetrics = {
732+
// To handle 0 metric value, add 1 and make the metric negative.
733+
// To recover actual value do `math.abs(metric + 1)`
734+
// Eg: if the metric values are (5, 3, 0, 1) => Updated metrics value will be (-6, -4, -1, -2)
735+
// To get actual metric value math.abs(metric +1) => (5, 3, 0, 1)
736+
def updateMetricValue(metric: Long): Long = {
737+
metric * -1L - 1L
744738
}
745739

746740
createMetrics(
747-
updateMetricValue(m.executorDeserializeTime, DESER_TIME),
748-
updateMetricValue(m.executorDeserializeCpuTime, DESER_CPU_TIME),
749-
updateMetricValue(m.executorRunTime, EXEC_RUN_TIME),
750-
updateMetricValue(m.executorCpuTime, EXEC_CPU_TIME),
751-
updateMetricValue(m.resultSize, RESULT_SIZE),
752-
updateMetricValue(m.jvmGcTime, GC_TIME),
753-
updateMetricValue(m.resultSerializationTime, SER_TIME),
754-
updateMetricValue(m.memoryBytesSpilled, MEM_SPILL),
755-
updateMetricValue(m.diskBytesSpilled, DISK_SPILL),
756-
updateMetricValue(m.peakExecutionMemory, PEAK_MEM),
757-
updateMetricValue(m.inputMetrics.bytesRead, INPUT_SIZE),
758-
updateMetricValue(m.inputMetrics.recordsRead, INPUT_RECORDS),
759-
updateMetricValue(m.outputMetrics.bytesWritten, OUTPUT_SIZE),
760-
updateMetricValue(m.outputMetrics.recordsWritten, OUTPUT_RECORDS),
761-
updateMetricValue(m.shuffleReadMetrics.remoteBlocksFetched, SHUFFLE_REMOTE_BLOCKS),
762-
updateMetricValue(m.shuffleReadMetrics.localBlocksFetched, SHUFFLE_LOCAL_BLOCKS),
763-
updateMetricValue(m.shuffleReadMetrics.fetchWaitTime, SHUFFLE_READ_TIME),
764-
updateMetricValue(m.shuffleReadMetrics.remoteBytesRead, SHUFFLE_REMOTE_READS),
765-
updateMetricValue(m.shuffleReadMetrics.remoteBytesReadToDisk, SHUFFLE_REMOTE_READS_TO_DISK),
766-
updateMetricValue(m.shuffleReadMetrics.localBytesRead, SHUFFLE_LOCAL_READ),
767-
updateMetricValue(m.shuffleReadMetrics.recordsRead, SHUFFLE_READ_RECORDS),
768-
updateMetricValue(m.shuffleWriteMetrics.bytesWritten, SHUFFLE_WRITE_SIZE),
769-
updateMetricValue(m.shuffleWriteMetrics.writeTime, SHUFFLE_WRITE_TIME),
770-
updateMetricValue(m.shuffleWriteMetrics.recordsWritten, SHUFFLE_WRITE_RECORDS))
741+
updateMetricValue(m.executorDeserializeTime),
742+
updateMetricValue(m.executorDeserializeCpuTime),
743+
updateMetricValue(m.executorRunTime),
744+
updateMetricValue(m.executorCpuTime),
745+
updateMetricValue(m.resultSize),
746+
updateMetricValue(m.jvmGcTime),
747+
updateMetricValue(m.resultSerializationTime),
748+
updateMetricValue(m.memoryBytesSpilled),
749+
updateMetricValue(m.diskBytesSpilled),
750+
updateMetricValue(m.peakExecutionMemory),
751+
updateMetricValue(m.inputMetrics.bytesRead),
752+
updateMetricValue(m.inputMetrics.recordsRead),
753+
updateMetricValue(m.outputMetrics.bytesWritten),
754+
updateMetricValue(m.outputMetrics.recordsWritten),
755+
updateMetricValue(m.shuffleReadMetrics.remoteBlocksFetched),
756+
updateMetricValue(m.shuffleReadMetrics.localBlocksFetched),
757+
updateMetricValue(m.shuffleReadMetrics.fetchWaitTime),
758+
updateMetricValue(m.shuffleReadMetrics.remoteBytesRead),
759+
updateMetricValue(m.shuffleReadMetrics.remoteBytesReadToDisk),
760+
updateMetricValue(m.shuffleReadMetrics.localBytesRead),
761+
updateMetricValue(m.shuffleReadMetrics.recordsRead),
762+
updateMetricValue(m.shuffleWriteMetrics.bytesWritten),
763+
updateMetricValue(m.shuffleWriteMetrics.writeTime),
764+
updateMetricValue(m.shuffleWriteMetrics.recordsWritten))
771765
}
772766

773767
private def addMetrics(m1: v1.TaskMetrics, m2: v1.TaskMetrics, mult: Int): v1.TaskMetrics = {

core/src/main/scala/org/apache/spark/status/storeTypes.scala

Lines changed: 35 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ package org.apache.spark.status
2020
import java.lang.{Long => JLong}
2121
import java.util.Date
2222

23-
import scala.collection.mutable.HashSet
24-
2523
import com.fasterxml.jackson.annotation.JsonIgnore
2624
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
2725

@@ -180,12 +178,10 @@ private[spark] class TaskDataWrapper(
180178
val accumulatorUpdates: Seq[AccumulableInfo],
181179
val errorMessage: Option[String],
182180

183-
val hasMetrics: Boolean,
184181
// Non successful metrics now will have negative values in `TaskDataWrapper`.
185182
// `TaskData` will have actual metric values. To recover the actual metric value
186-
// from `TaskDataWrapper`, need use `getMetricValue` method. parameter `handleZero` is to
187-
// check whether the index has zero metric value, which is used in the `getMetricValue`.
188-
val handleZero: HashSet[String],
183+
// from `TaskDataWrapper`, need use `getMetricValue` method.
184+
val hasMetrics: Boolean,
189185
// The following is an exploded view of a TaskMetrics API object. This saves 5 objects
190186
// (= 80 bytes of Java object overhead) per instance of this wrapper. If the first value
191187
// (executorDeserializeTime) is -1L, it means the metrics for this task have not been
@@ -243,46 +239,45 @@ private[spark] class TaskDataWrapper(
243239
val stageAttemptId: Int) {
244240

245241
// SPARK-26260: To handle non successful tasks metrics (Running, Failed, Killed).
246-
private def getMetricValue(metric: Long, index: String): Long = {
247-
if (status != "SUCCESS" && handleZero(index)) {
248-
0L
242+
private def getMetricValue(metric: Long): Long = {
243+
if (status != "SUCCESS") {
244+
math.abs(metric + 1)
249245
} else {
250-
math.abs(metric)
246+
metric
251247
}
252248
}
253249

254250
def toApi: TaskData = {
255251
val metrics = if (hasMetrics) {
256252
Some(new TaskMetrics(
257-
getMetricValue(executorDeserializeTime, TaskIndexNames.DESER_TIME),
258-
getMetricValue(executorDeserializeCpuTime, TaskIndexNames.DESER_CPU_TIME),
259-
getMetricValue(executorRunTime, TaskIndexNames.EXEC_RUN_TIME),
260-
getMetricValue(executorCpuTime, TaskIndexNames.EXEC_CPU_TIME),
261-
getMetricValue(resultSize, TaskIndexNames.RESULT_SIZE),
262-
getMetricValue(jvmGcTime, TaskIndexNames.GC_TIME),
263-
getMetricValue(resultSerializationTime, TaskIndexNames.SER_TIME),
264-
getMetricValue(memoryBytesSpilled, TaskIndexNames.MEM_SPILL),
265-
getMetricValue(diskBytesSpilled, TaskIndexNames.DISK_SPILL),
266-
getMetricValue(peakExecutionMemory, TaskIndexNames.PEAK_MEM),
253+
getMetricValue(executorDeserializeTime),
254+
getMetricValue(executorDeserializeCpuTime),
255+
getMetricValue(executorRunTime),
256+
getMetricValue(executorCpuTime),
257+
getMetricValue(resultSize),
258+
getMetricValue(jvmGcTime),
259+
getMetricValue(resultSerializationTime),
260+
getMetricValue(memoryBytesSpilled),
261+
getMetricValue(diskBytesSpilled),
262+
getMetricValue(peakExecutionMemory),
267263
new InputMetrics(
268-
getMetricValue(inputBytesRead, TaskIndexNames.INPUT_SIZE),
269-
getMetricValue(inputRecordsRead, TaskIndexNames.INPUT_RECORDS)),
264+
getMetricValue(inputBytesRead),
265+
getMetricValue(inputRecordsRead)),
270266
new OutputMetrics(
271-
getMetricValue(outputBytesWritten, TaskIndexNames.OUTPUT_SIZE),
272-
getMetricValue(outputRecordsWritten, TaskIndexNames.OUTPUT_RECORDS)),
267+
getMetricValue(outputBytesWritten),
268+
getMetricValue(outputRecordsWritten)),
273269
new ShuffleReadMetrics(
274-
getMetricValue(shuffleRemoteBlocksFetched, TaskIndexNames.SHUFFLE_REMOTE_BLOCKS),
275-
getMetricValue(shuffleLocalBlocksFetched, TaskIndexNames.SHUFFLE_LOCAL_BLOCKS),
276-
getMetricValue(shuffleFetchWaitTime, TaskIndexNames.SHUFFLE_READ_TIME),
277-
getMetricValue(shuffleRemoteBytesRead, TaskIndexNames.SHUFFLE_REMOTE_READS),
278-
getMetricValue(shuffleRemoteBytesReadToDisk,
279-
TaskIndexNames.SHUFFLE_REMOTE_READS_TO_DISK),
280-
getMetricValue(shuffleLocalBytesRead, TaskIndexNames.SHUFFLE_LOCAL_READ),
281-
getMetricValue(shuffleRecordsRead, TaskIndexNames.SHUFFLE_READ_RECORDS)),
270+
getMetricValue(shuffleRemoteBlocksFetched),
271+
getMetricValue(shuffleLocalBlocksFetched),
272+
getMetricValue(shuffleFetchWaitTime),
273+
getMetricValue(shuffleRemoteBytesRead),
274+
getMetricValue(shuffleRemoteBytesReadToDisk),
275+
getMetricValue(shuffleLocalBytesRead),
276+
getMetricValue(shuffleRecordsRead)),
282277
new ShuffleWriteMetrics(
283-
getMetricValue(shuffleBytesWritten, TaskIndexNames.SHUFFLE_WRITE_SIZE),
284-
getMetricValue(shuffleWriteTime, TaskIndexNames.SHUFFLE_WRITE_TIME),
285-
getMetricValue(shuffleRecordsWritten, TaskIndexNames.SHUFFLE_WRITE_RECORDS))))
278+
getMetricValue(shuffleBytesWritten),
279+
getMetricValue(shuffleWriteTime),
280+
getMetricValue(shuffleRecordsWritten))))
286281
} else {
287282
None
288283
}
@@ -314,9 +309,9 @@ private[spark] class TaskDataWrapper(
314309
def schedulerDelay: Long = {
315310
if (hasMetrics) {
316311
AppStatusUtils.schedulerDelay(launchTime, resultFetchStart, duration,
317-
getMetricValue(executorDeserializeTime, TaskIndexNames.DESER_TIME),
318-
getMetricValue(resultSerializationTime, TaskIndexNames.SER_TIME),
319-
getMetricValue(executorRunTime, TaskIndexNames.EXEC_RUN_TIME))
312+
getMetricValue(executorDeserializeTime),
313+
getMetricValue(resultSerializationTime),
314+
getMetricValue(executorRunTime))
320315
} else {
321316
-1L
322317
}
@@ -349,8 +344,7 @@ private[spark] class TaskDataWrapper(
349344
@JsonIgnore @KVIndex(value = TaskIndexNames.SHUFFLE_TOTAL_READS, parent = TaskIndexNames.STAGE)
350345
private def shuffleTotalReads: Long = {
351346
if (hasMetrics) {
352-
getMetricValue(shuffleLocalBytesRead, TaskIndexNames.SHUFFLE_LOCAL_READ) +
353-
getMetricValue(shuffleRemoteBytesRead, TaskIndexNames.SHUFFLE_REMOTE_READS)
347+
getMetricValue(shuffleLocalBytesRead) + getMetricValue(shuffleRemoteBytesRead)
354348
} else {
355349
-1L
356350
}
@@ -359,8 +353,7 @@ private[spark] class TaskDataWrapper(
359353
@JsonIgnore @KVIndex(value = TaskIndexNames.SHUFFLE_TOTAL_BLOCKS, parent = TaskIndexNames.STAGE)
360354
private def shuffleTotalBlocks: Long = {
361355
if (hasMetrics) {
362-
getMetricValue(shuffleLocalBlocksFetched, TaskIndexNames.SHUFFLE_LOCAL_BLOCKS) +
363-
getMetricValue(shuffleRemoteBlocksFetched, TaskIndexNames.SHUFFLE_REMOTE_BLOCKS)
356+
getMetricValue(shuffleLocalBlocksFetched) + getMetricValue(shuffleRemoteBlocksFetched)
364357
} else {
365358
-1L
366359
}

core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.status
1919

20-
import scala.collection.mutable.HashSet
21-
2220
import org.apache.spark.{SparkConf, SparkFunSuite}
2321
import org.apache.spark.status.LiveEntityHelpers.makeNegative
2422
import org.apache.spark.status.api.v1
@@ -194,18 +192,16 @@ class AppStatusStoreSuite extends SparkFunSuite {
194192
new ShuffleWriteMetrics(i, i, i))
195193

196194
val hasMetrics = i >= 0
197-
val handleZero = HashSet[String]()
198195

199196
val taskMetrics: v1.TaskMetrics = if (hasMetrics && status != "SUCCESS") {
200-
makeNegative(metrics, handleZero)
197+
makeNegative(metrics)
201198
} else {
202199
metrics
203200
}
204201

205202
new TaskDataWrapper(
206203
i.toLong, i, i, i, i, i, i.toString, i.toString, status, i.toString, false, Nil, None,
207204
hasMetrics,
208-
handleZero,
209205
taskMetrics.executorDeserializeTime,
210206
taskMetrics.executorDeserializeCpuTime,
211207
taskMetrics.executorRunTime,

0 commit comments

Comments
 (0)