From c715096657c36beedd43c64104f56a78b2eb268d Mon Sep 17 00:00:00 2001 From: LucaCanali Date: Fri, 24 Aug 2018 14:05:59 +0200 Subject: [PATCH 1/7] Add Executor CPU Time metric --- .../scala/org/apache/spark/executor/ExecutorSource.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index 669ce63325d0e..37ab7648cf557 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -17,11 +17,13 @@ package org.apache.spark.executor +import java.lang.management.ManagementFactory import java.util.concurrent.ThreadPoolExecutor import scala.collection.JavaConverters._ import com.codahale.metrics.{Gauge, MetricRegistry} +import com.sun.management.OperatingSystemMXBean import org.apache.hadoop.fs.FileSystem import org.apache.spark.metrics.source.Source @@ -73,6 +75,13 @@ class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0) } + // Dropwizard metrics gauge measuring the executor's process (JVM) CPU time. + // The value is returned in nanoseconds, the method return -1 if this operation is not supported. + val osMXBean = ManagementFactory.getOperatingSystemMXBean.asInstanceOf[OperatingSystemMXBean] + metricRegistry.register(MetricRegistry.name("executorCPUTime" ), new Gauge[Long] { + override def getValue: Long = osMXBean.getProcessCpuTime() + }) + // Expose executor task metrics using the Dropwizard metrics system. // The list is taken from TaskMetrics.scala val METRIC_CPU_TIME = metricRegistry.counter(MetricRegistry.name("cpuTime")) From 438bf90bc3d89fd302bac9686473951aad4fecb8 Mon Sep 17 00:00:00 2001 From: LucaCanali Date: Fri, 31 Aug 2018 17:30:28 +0200 Subject: [PATCH 2/7] Add executor CPU Time using ProcessCpuTime method in OperatingSystem MX Bean if available --- .../spark/executor/ExecutorSource.scala | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index 37ab7648cf557..d3c0a22fde4cf 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -19,11 +19,11 @@ package org.apache.spark.executor import java.lang.management.ManagementFactory import java.util.concurrent.ThreadPoolExecutor +import javax.management.{MBeanServer, ObjectName} import scala.collection.JavaConverters._ import com.codahale.metrics.{Gauge, MetricRegistry} -import com.sun.management.OperatingSystemMXBean import org.apache.hadoop.fs.FileSystem import org.apache.spark.metrics.source.Source @@ -75,11 +75,29 @@ class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0) } + // will try to get JVM Process CPU time or return -1 otherwise + // will use proprietary extensions as com.sun.management.OperatingSystemMXBean or + // com.ibm.lang.management.OperatingSystemMXBean if available + def tryToGetJVMProcessPCUTime() : Long = { + val mBean: MBeanServer = ManagementFactory.getPlatformMBeanServer + try { + val name = new ObjectName("java.lang", "type", "OperatingSystem") + val attribute = mBean.getAttribute(name, "ProcessCpuTime") + if (attribute != null) { + attribute.asInstanceOf[Long] + } + else { + -1L + } + } catch { + case _ : Exception => -1L + } + } + // Dropwizard metrics gauge measuring the executor's process (JVM) CPU time. // The value is returned in nanoseconds, the method return -1 if this operation is not supported. - val osMXBean = ManagementFactory.getOperatingSystemMXBean.asInstanceOf[OperatingSystemMXBean] metricRegistry.register(MetricRegistry.name("executorCPUTime" ), new Gauge[Long] { - override def getValue: Long = osMXBean.getProcessCpuTime() + override def getValue: Long = tryToGetJVMProcessPCUTime() }) // Expose executor task metrics using the Dropwizard metrics system. From 807119b61f3e60163b80bd22802a6d12a5e1a50e Mon Sep 17 00:00:00 2001 From: LucaCanali Date: Fri, 31 Aug 2018 21:28:18 +0200 Subject: [PATCH 3/7] Add executor CPU Time using ProcessCpuTime method in OperatingSystem MX Bean, if available --- .../spark/executor/ExecutorSource.scala | 40 +++++++++---------- 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index d3c0a22fde4cf..19d954edbb6ee 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -75,29 +75,27 @@ class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0) } - // will try to get JVM Process CPU time or return -1 otherwise - // will use proprietary extensions as com.sun.management.OperatingSystemMXBean or - // com.ibm.lang.management.OperatingSystemMXBean if available - def tryToGetJVMProcessPCUTime() : Long = { - val mBean: MBeanServer = ManagementFactory.getPlatformMBeanServer - try { - val name = new ObjectName("java.lang", "type", "OperatingSystem") - val attribute = mBean.getAttribute(name, "ProcessCpuTime") - if (attribute != null) { - attribute.asInstanceOf[Long] - } - else { - -1L + /** Dropwizard metrics gauge measuring the executor's process CPU time. + * This code will try to get JVM Process CPU time or return -1 otherwise. + * The CPU time value is returned in nanoseconds. + * It will use proprietary extensions as com.sun.management.OperatingSystemMXBean or + * com.ibm.lang.management.OperatingSystemMXBean if available + */ + val mBean: MBeanServer = ManagementFactory.getPlatformMBeanServer + val name = new ObjectName("java.lang", "type", "OperatingSystem") + metricRegistry.register(MetricRegistry.name("executorCPUTime" ), new Gauge[Long] { + override def getValue: Long = { + try { + val attribute = mBean.getAttribute(name, "ProcessCpuTime") + if (attribute != null) { + attribute.asInstanceOf[Long] + } else { + -1L + } + } catch { + case _ : Exception => -1L } - } catch { - case _ : Exception => -1L } - } - - // Dropwizard metrics gauge measuring the executor's process (JVM) CPU time. - // The value is returned in nanoseconds, the method return -1 if this operation is not supported. - metricRegistry.register(MetricRegistry.name("executorCPUTime" ), new Gauge[Long] { - override def getValue: Long = tryToGetJVMProcessPCUTime() }) // Expose executor task metrics using the Dropwizard metrics system. From d522fa2dc759a98f9915f4d16562f52c32259280 Mon Sep 17 00:00:00 2001 From: LucaCanali Date: Sat, 1 Sep 2018 21:50:15 +0200 Subject: [PATCH 4/7] Add executor CPU Time metric (jvmCpuTime) using ProcessCpuTime method in OperatingSystem MX Bean, if available. --- .../spark/executor/ExecutorSource.scala | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index 19d954edbb6ee..da32ee50a3939 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -22,12 +22,15 @@ import java.util.concurrent.ThreadPoolExecutor import javax.management.{MBeanServer, ObjectName} import scala.collection.JavaConverters._ +import scala.util.control.NonFatal import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.hadoop.fs.FileSystem import org.apache.spark.metrics.source.Source + + private[spark] class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends Source { @@ -75,16 +78,15 @@ class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0) } - /** Dropwizard metrics gauge measuring the executor's process CPU time. - * This code will try to get JVM Process CPU time or return -1 otherwise. - * The CPU time value is returned in nanoseconds. - * It will use proprietary extensions as com.sun.management.OperatingSystemMXBean or - * com.ibm.lang.management.OperatingSystemMXBean if available - */ - val mBean: MBeanServer = ManagementFactory.getPlatformMBeanServer - val name = new ObjectName("java.lang", "type", "OperatingSystem") - metricRegistry.register(MetricRegistry.name("executorCPUTime" ), new Gauge[Long] { + // Dropwizard metrics gauge measuring the executor's process CPU time. + // This Gauge will try to get and return the JVM Process CPU time or return -1 otherwise. + // The CPU time value is returned in nanoseconds. + // It will use proprietary extensions such as com.sun.management.OperatingSystemMXBean or + // com.ibm.lang.management.OperatingSystemMXBean, if available. + metricRegistry.register(MetricRegistry.name("jvmCpuTime"), new Gauge[Long] { override def getValue: Long = { + val mBean: MBeanServer = ManagementFactory.getPlatformMBeanServer + val name = new ObjectName("java.lang", "type", "OperatingSystem") try { val attribute = mBean.getAttribute(name, "ProcessCpuTime") if (attribute != null) { @@ -93,7 +95,7 @@ class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends -1L } } catch { - case _ : Exception => -1L + case NonFatal(_) => -1L } } }) From b7fdec21cd4cf0f528ad1450cfbaf67043a1f17b Mon Sep 17 00:00:00 2001 From: LucaCanali Date: Sat, 1 Sep 2018 21:53:40 +0200 Subject: [PATCH 5/7] Add executor CPU Time metric (jvmCpuTime) using ProcessCpuTime method in OperatingSystem MX Bean, if available. --- .../main/scala/org/apache/spark/executor/ExecutorSource.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index da32ee50a3939..fbadd63f94c93 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -29,8 +29,6 @@ import org.apache.hadoop.fs.FileSystem import org.apache.spark.metrics.source.Source - - private[spark] class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends Source { From 95d31f69cac02af4eee63f8a1a364c485190951c Mon Sep 17 00:00:00 2001 From: LucaCanali Date: Sat, 1 Sep 2018 22:24:07 +0200 Subject: [PATCH 6/7] Add executor CPU Time metric (jvmCpuTime) using ProcessCpuTime method in OperatingSystem MX Bean, if available. --- .../main/scala/org/apache/spark/executor/ExecutorSource.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index fbadd63f94c93..200b6d60a0374 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -82,9 +82,9 @@ class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends // It will use proprietary extensions such as com.sun.management.OperatingSystemMXBean or // com.ibm.lang.management.OperatingSystemMXBean, if available. metricRegistry.register(MetricRegistry.name("jvmCpuTime"), new Gauge[Long] { + val mBean: MBeanServer = ManagementFactory.getPlatformMBeanServer + val name = new ObjectName("java.lang", "type", "OperatingSystem") override def getValue: Long = { - val mBean: MBeanServer = ManagementFactory.getPlatformMBeanServer - val name = new ObjectName("java.lang", "type", "OperatingSystem") try { val attribute = mBean.getAttribute(name, "ProcessCpuTime") if (attribute != null) { From e72966e38dc50be7b501387a9f719f85017a7aa8 Mon Sep 17 00:00:00 2001 From: LucaCanali Date: Mon, 3 Sep 2018 10:08:33 +0200 Subject: [PATCH 7/7] removed check for null value. --- .../scala/org/apache/spark/executor/ExecutorSource.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index 200b6d60a0374..a8264022a0aff 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -86,12 +86,8 @@ class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends val name = new ObjectName("java.lang", "type", "OperatingSystem") override def getValue: Long = { try { - val attribute = mBean.getAttribute(name, "ProcessCpuTime") - if (attribute != null) { - attribute.asInstanceOf[Long] - } else { - -1L - } + // return JVM process CPU time if the ProcessCpuTime method is available + mBean.getAttribute(name, "ProcessCpuTime").asInstanceOf[Long] } catch { case NonFatal(_) => -1L }