diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index 669ce63325d0e..a8264022a0aff 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -17,9 +17,12 @@ package org.apache.spark.executor +import java.lang.management.ManagementFactory import java.util.concurrent.ThreadPoolExecutor +import javax.management.{MBeanServer, ObjectName} import scala.collection.JavaConverters._ +import scala.util.control.NonFatal import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.hadoop.fs.FileSystem @@ -73,6 +76,24 @@ class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0) } + // Dropwizard metrics gauge measuring the executor's process CPU time. + // This Gauge will try to get and return the JVM Process CPU time or return -1 otherwise. + // The CPU time value is returned in nanoseconds. + // It will use proprietary extensions such as com.sun.management.OperatingSystemMXBean or + // com.ibm.lang.management.OperatingSystemMXBean, if available. + metricRegistry.register(MetricRegistry.name("jvmCpuTime"), new Gauge[Long] { + val mBean: MBeanServer = ManagementFactory.getPlatformMBeanServer + val name = new ObjectName("java.lang", "type", "OperatingSystem") + override def getValue: Long = { + try { + // return JVM process CPU time if the ProcessCpuTime method is available + mBean.getAttribute(name, "ProcessCpuTime").asInstanceOf[Long] + } catch { + case NonFatal(_) => -1L + } + } + }) + // Expose executor task metrics using the Dropwizard metrics system. // The list is taken from TaskMetrics.scala val METRIC_CPU_TIME = metricRegistry.counter(MetricRegistry.name("cpuTime"))