Skip to content

Commit 90a30f4

Browse files
jisookim0513Marcelo Vanzin
authored andcommitted
[SPARK-12221] add cpu time to metrics
Currently task metrics don't support executor CPU time, so there's no way to calculate how much CPU time a stage/task took from History Server metrics. This PR enables reporting CPU time. Author: jisookim <[email protected]> Closes #10212 from jisookim0513/add-cpu-time-metric.
1 parent 988c714 commit 90a30f4

30 files changed

+492
-21
lines changed

core/src/main/scala/org/apache/spark/InternalAccumulator.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ private[spark] object InternalAccumulator {
3131

3232
// Names of internal task level metrics
3333
val EXECUTOR_DESERIALIZE_TIME = METRICS_PREFIX + "executorDeserializeTime"
34+
val EXECUTOR_DESERIALIZE_CPU_TIME = METRICS_PREFIX + "executorDeserializeCpuTime"
3435
val EXECUTOR_RUN_TIME = METRICS_PREFIX + "executorRunTime"
36+
val EXECUTOR_CPU_TIME = METRICS_PREFIX + "executorCpuTime"
3537
val RESULT_SIZE = METRICS_PREFIX + "resultSize"
3638
val JVM_GC_TIME = METRICS_PREFIX + "jvmGCTime"
3739
val RESULT_SERIALIZATION_TIME = METRICS_PREFIX + "resultSerializationTime"

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,13 +232,18 @@ private[spark] class Executor(
232232
}
233233

234234
override def run(): Unit = {
235+
val threadMXBean = ManagementFactory.getThreadMXBean
235236
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
236237
val deserializeStartTime = System.currentTimeMillis()
238+
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
239+
threadMXBean.getCurrentThreadCpuTime
240+
} else 0L
237241
Thread.currentThread.setContextClassLoader(replClassLoader)
238242
val ser = env.closureSerializer.newInstance()
239243
logInfo(s"Running $taskName (TID $taskId)")
240244
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
241245
var taskStart: Long = 0
246+
var taskStartCpu: Long = 0
242247
startGCTime = computeTotalGcTime()
243248

244249
try {
@@ -269,6 +274,9 @@ private[spark] class Executor(
269274

270275
// Run the actual task and measure its runtime.
271276
taskStart = System.currentTimeMillis()
277+
taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
278+
threadMXBean.getCurrentThreadCpuTime
279+
} else 0L
272280
var threwException = true
273281
val value = try {
274282
val res = task.run(
@@ -302,6 +310,9 @@ private[spark] class Executor(
302310
}
303311
}
304312
val taskFinish = System.currentTimeMillis()
313+
val taskFinishCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
314+
threadMXBean.getCurrentThreadCpuTime
315+
} else 0L
305316

306317
// If the task has been killed, let's fail it.
307318
if (task.killed) {
@@ -317,8 +328,12 @@ private[spark] class Executor(
317328
// includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
318329
task.metrics.setExecutorDeserializeTime(
319330
(taskStart - deserializeStartTime) + task.executorDeserializeTime)
331+
task.metrics.setExecutorDeserializeCpuTime(
332+
(taskStartCpu - deserializeStartCpuTime) + task.executorDeserializeCpuTime)
320333
// We need to subtract Task.run()'s deserialization time to avoid double-counting
321334
task.metrics.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
335+
task.metrics.setExecutorCpuTime(
336+
(taskFinishCpu - taskStartCpu) - task.executorDeserializeCpuTime)
322337
task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
323338
task.metrics.setResultSerializationTime(afterSerialization - beforeSerialization)
324339

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata, Accumulat
4747
class TaskMetrics private[spark] () extends Serializable {
4848
// Each metric is internally represented as an accumulator
4949
private val _executorDeserializeTime = new LongAccumulator
50+
private val _executorDeserializeCpuTime = new LongAccumulator
5051
private val _executorRunTime = new LongAccumulator
52+
private val _executorCpuTime = new LongAccumulator
5153
private val _resultSize = new LongAccumulator
5254
private val _jvmGCTime = new LongAccumulator
5355
private val _resultSerializationTime = new LongAccumulator
@@ -61,11 +63,22 @@ class TaskMetrics private[spark] () extends Serializable {
6163
*/
6264
def executorDeserializeTime: Long = _executorDeserializeTime.sum
6365

66+
/**
67+
* CPU Time taken on the executor to deserialize this task in nanoseconds.
68+
*/
69+
def executorDeserializeCpuTime: Long = _executorDeserializeCpuTime.sum
70+
6471
/**
6572
* Time the executor spends actually running the task (including fetching shuffle data).
6673
*/
6774
def executorRunTime: Long = _executorRunTime.sum
6875

76+
/**
77+
* CPU Time the executor spends actually running the task
78+
* (including fetching shuffle data) in nanoseconds.
79+
*/
80+
def executorCpuTime: Long = _executorCpuTime.sum
81+
6982
/**
7083
* The number of bytes this task transmitted back to the driver as the TaskResult.
7184
*/
@@ -111,7 +124,10 @@ class TaskMetrics private[spark] () extends Serializable {
111124
// Setters and increment-ers
112125
private[spark] def setExecutorDeserializeTime(v: Long): Unit =
113126
_executorDeserializeTime.setValue(v)
127+
private[spark] def setExecutorDeserializeCpuTime(v: Long): Unit =
128+
_executorDeserializeCpuTime.setValue(v)
114129
private[spark] def setExecutorRunTime(v: Long): Unit = _executorRunTime.setValue(v)
130+
private[spark] def setExecutorCpuTime(v: Long): Unit = _executorCpuTime.setValue(v)
115131
private[spark] def setResultSize(v: Long): Unit = _resultSize.setValue(v)
116132
private[spark] def setJvmGCTime(v: Long): Unit = _jvmGCTime.setValue(v)
117133
private[spark] def setResultSerializationTime(v: Long): Unit =
@@ -188,7 +204,9 @@ class TaskMetrics private[spark] () extends Serializable {
188204
import InternalAccumulator._
189205
@transient private[spark] lazy val nameToAccums = LinkedHashMap(
190206
EXECUTOR_DESERIALIZE_TIME -> _executorDeserializeTime,
207+
EXECUTOR_DESERIALIZE_CPU_TIME -> _executorDeserializeCpuTime,
191208
EXECUTOR_RUN_TIME -> _executorRunTime,
209+
EXECUTOR_CPU_TIME -> _executorCpuTime,
192210
RESULT_SIZE -> _resultSize,
193211
JVM_GC_TIME -> _jvmGCTime,
194212
RESULT_SERIALIZATION_TIME -> _resultSerializationTime,

core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.scheduler
1919

2020
import java.io._
21+
import java.lang.management.ManagementFactory
2122
import java.nio.ByteBuffer
2223
import java.util.Properties
2324

@@ -61,11 +62,18 @@ private[spark] class ResultTask[T, U](
6162

6263
override def runTask(context: TaskContext): U = {
6364
// Deserialize the RDD and the func using the broadcast variables.
65+
val threadMXBean = ManagementFactory.getThreadMXBean
6466
val deserializeStartTime = System.currentTimeMillis()
67+
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
68+
threadMXBean.getCurrentThreadCpuTime
69+
} else 0L
6570
val ser = SparkEnv.get.closureSerializer.newInstance()
6671
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
6772
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
6873
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
74+
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
75+
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
76+
} else 0L
6977

7078
func(context, rdd.iterator(partition, context))
7179
}

core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.scheduler
1919

20+
import java.lang.management.ManagementFactory
2021
import java.nio.ByteBuffer
2122
import java.util.Properties
2223

@@ -66,11 +67,18 @@ private[spark] class ShuffleMapTask(
6667

6768
override def runTask(context: TaskContext): MapStatus = {
6869
// Deserialize the RDD using the broadcast variable.
70+
val threadMXBean = ManagementFactory.getThreadMXBean
6971
val deserializeStartTime = System.currentTimeMillis()
72+
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
73+
threadMXBean.getCurrentThreadCpuTime
74+
} else 0L
7075
val ser = SparkEnv.get.closureSerializer.newInstance()
7176
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
7277
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
7378
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
79+
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
80+
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
81+
} else 0L
7482

7583
var writer: ShuffleWriter[Any, Any] = null
7684
try {

core/src/main/scala/org/apache/spark/scheduler/Task.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ private[spark] abstract class Task[T](
139139
@volatile @transient private var _killed = false
140140

141141
protected var _executorDeserializeTime: Long = 0
142+
protected var _executorDeserializeCpuTime: Long = 0
142143

143144
/**
144145
* Whether the task has been killed.
@@ -149,6 +150,7 @@ private[spark] abstract class Task[T](
149150
* Returns the amount of time spent deserializing the RDD and function to be run.
150151
*/
151152
def executorDeserializeTime: Long = _executorDeserializeTime
153+
def executorDeserializeCpuTime: Long = _executorDeserializeCpuTime
152154

153155
/**
154156
* Collect the latest values of accumulators used in this task. If the task failed,

core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ private[v1] object AllStagesResource {
101101
numCompleteTasks = stageUiData.numCompleteTasks,
102102
numFailedTasks = stageUiData.numFailedTasks,
103103
executorRunTime = stageUiData.executorRunTime,
104+
executorCpuTime = stageUiData.executorCpuTime,
104105
submissionTime = stageInfo.submissionTime.map(new Date(_)),
105106
firstTaskLaunchedTime,
106107
completionTime = stageInfo.completionTime.map(new Date(_)),
@@ -220,7 +221,9 @@ private[v1] object AllStagesResource {
220221
new TaskMetricDistributions(
221222
quantiles = quantiles,
222223
executorDeserializeTime = metricQuantiles(_.executorDeserializeTime),
224+
executorDeserializeCpuTime = metricQuantiles(_.executorDeserializeCpuTime),
223225
executorRunTime = metricQuantiles(_.executorRunTime),
226+
executorCpuTime = metricQuantiles(_.executorCpuTime),
224227
resultSize = metricQuantiles(_.resultSize),
225228
jvmGcTime = metricQuantiles(_.jvmGCTime),
226229
resultSerializationTime = metricQuantiles(_.resultSerializationTime),
@@ -241,7 +244,9 @@ private[v1] object AllStagesResource {
241244
def convertUiTaskMetrics(internal: InternalTaskMetrics): TaskMetrics = {
242245
new TaskMetrics(
243246
executorDeserializeTime = internal.executorDeserializeTime,
247+
executorDeserializeCpuTime = internal.executorDeserializeCpuTime,
244248
executorRunTime = internal.executorRunTime,
249+
executorCpuTime = internal.executorCpuTime,
245250
resultSize = internal.resultSize,
246251
jvmGcTime = internal.jvmGCTime,
247252
resultSerializationTime = internal.resultSerializationTime,

core/src/main/scala/org/apache/spark/status/api/v1/api.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ class StageData private[spark](
128128
val numFailedTasks: Int,
129129

130130
val executorRunTime: Long,
131+
val executorCpuTime: Long,
131132
val submissionTime: Option[Date],
132133
val firstTaskLaunchedTime: Option[Date],
133134
val completionTime: Option[Date],
@@ -166,7 +167,9 @@ class TaskData private[spark](
166167

167168
class TaskMetrics private[spark](
168169
val executorDeserializeTime: Long,
170+
val executorDeserializeCpuTime: Long,
169171
val executorRunTime: Long,
172+
val executorCpuTime: Long,
170173
val resultSize: Long,
171174
val jvmGcTime: Long,
172175
val resultSerializationTime: Long,
@@ -202,7 +205,9 @@ class TaskMetricDistributions private[spark](
202205
val quantiles: IndexedSeq[Double],
203206

204207
val executorDeserializeTime: IndexedSeq[Double],
208+
val executorDeserializeCpuTime: IndexedSeq[Double],
205209
val executorRunTime: IndexedSeq[Double],
210+
val executorCpuTime: IndexedSeq[Double],
206211
val resultSize: IndexedSeq[Double],
207212
val jvmGcTime: IndexedSeq[Double],
208213
val resultSerializationTime: IndexedSeq[Double],

core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
503503
val timeDelta =
504504
taskMetrics.executorRunTime - oldMetrics.map(_.executorRunTime).getOrElse(0L)
505505
stageData.executorRunTime += timeDelta
506+
507+
val cpuTimeDelta =
508+
taskMetrics.executorCpuTime - oldMetrics.map(_.executorCpuTime).getOrElse(0L)
509+
stageData.executorCpuTime += cpuTimeDelta
506510
}
507511

508512
override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) {

core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ private[spark] object UIData {
8080
var numKilledTasks: Int = _
8181

8282
var executorRunTime: Long = _
83+
var executorCpuTime: Long = _
8384

8485
var inputBytes: Long = _
8586
var inputRecords: Long = _
@@ -137,7 +138,9 @@ private[spark] object UIData {
137138
metrics.map { m =>
138139
TaskMetricsUIData(
139140
executorDeserializeTime = m.executorDeserializeTime,
141+
executorDeserializeCpuTime = m.executorDeserializeCpuTime,
140142
executorRunTime = m.executorRunTime,
143+
executorCpuTime = m.executorCpuTime,
141144
resultSize = m.resultSize,
142145
jvmGCTime = m.jvmGCTime,
143146
resultSerializationTime = m.resultSerializationTime,
@@ -179,7 +182,9 @@ private[spark] object UIData {
179182

180183
case class TaskMetricsUIData(
181184
executorDeserializeTime: Long,
185+
executorDeserializeCpuTime: Long,
182186
executorRunTime: Long,
187+
executorCpuTime: Long,
183188
resultSize: Long,
184189
jvmGCTime: Long,
185190
resultSerializationTime: Long,

0 commit comments

Comments
 (0)