diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index db3f2266cf33..2fdbd4f77aae 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -42,7 +42,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} import org.apache.spark.deploy.StandaloneResourceUtils._ -import org.apache.spark.executor.ExecutorMetrics +import org.apache.spark.executor.{ExecutorMetrics, ExecutorMetricsSource} import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -551,9 +551,16 @@ class SparkContext(config: SparkConf) extends Logging { _dagScheduler = new DAGScheduler(this) _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet) + val _executorMetricsSource = + if (_conf.get(METRICS_EXECUTORMETRICS_SOURCE_ENABLED)) { + Some(new ExecutorMetricsSource) + } else { + None + } + // create and start the heartbeater for collecting memory metrics _heartbeater = new Heartbeater( - () => SparkContext.this.reportHeartBeat(), + () => SparkContext.this.reportHeartBeat(_executorMetricsSource), "driver-heartbeater", conf.get(EXECUTOR_HEARTBEAT_INTERVAL)) _heartbeater.start() @@ -622,6 +629,7 @@ class SparkContext(config: SparkConf) extends Logging { _env.metricsSystem.registerSource(_dagScheduler.metricsSource) _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager)) _env.metricsSystem.registerSource(new JVMCPUSource()) + _executorMetricsSource.foreach(_.register(_env.metricsSystem)) _executorAllocationManager.foreach { e => _env.metricsSystem.registerSource(e.executorAllocationManagerSource) } @@ -2473,8 +2481,10 @@ class SparkContext(config: SparkConf) extends Logging { } /** Reports heartbeat metrics for the driver. */ - private def reportHeartBeat(): Unit = { + private def reportHeartBeat(executorMetricsSource: Option[ExecutorMetricsSource]): Unit = { val currentMetrics = ExecutorMetrics.getCurrentMetrics(env.memoryManager) + executorMetricsSource.foreach(_.updateMetricsSnapshot(currentMetrics)) + val driverUpdates = new HashMap[(Int, Int), ExecutorMetrics] // In the driver, we do not track per-stage metrics, so use a dummy stage for the key driverUpdates.put(EventLoggingListener.DRIVER_STAGE_KEY, new ExecutorMetrics(currentMetrics)) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 0f595d095a22..41332d05978e 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -113,10 +113,18 @@ private[spark] class Executor( // create. The map key is a task id. private val taskReaperForTask: HashMap[Long, TaskReaper] = HashMap[Long, TaskReaper]() + val executorMetricsSource = + if (conf.get(METRICS_EXECUTORMETRICS_SOURCE_ENABLED)) { + Some(new ExecutorMetricsSource) + } else { + None + } + if (!isLocal) { env.blockManager.initialize(conf.getAppId) env.metricsSystem.registerSource(executorSource) env.metricsSystem.registerSource(new JVMCPUSource()) + executorMetricsSource.foreach(_.register(env.metricsSystem)) env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource) } @@ -210,7 +218,8 @@ private[spark] class Executor( // Poller for the memory metrics. Visible for testing. private[executor] val metricsPoller = new ExecutorMetricsPoller( env.memoryManager, - METRICS_POLLING_INTERVAL_MS) + METRICS_POLLING_INTERVAL_MS, + executorMetricsSource) // Executor for the heartbeat task. private val heartbeater = new Heartbeater( diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala index 805b0f729b12..1c1a1ca8035d 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala @@ -48,7 +48,8 @@ import org.apache.spark.util.{ThreadUtils, Utils} */ private[spark] class ExecutorMetricsPoller( memoryManager: MemoryManager, - pollingInterval: Long) extends Logging { + pollingInterval: Long, + executorMetricsSource: Option[ExecutorMetricsSource]) extends Logging { type StageKey = (Int, Int) // Task Count and Metric Peaks @@ -79,6 +80,7 @@ private[spark] class ExecutorMetricsPoller( // get the latest values for the metrics val latestMetrics = ExecutorMetrics.getCurrentMetrics(memoryManager) + executorMetricsSource.foreach(_.updateMetricsSnapshot(latestMetrics)) def updatePeaks(metrics: AtomicLongArray): Unit = { (0 until metrics.length).foreach { i => diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsSource.scala new file mode 100644 index 000000000000..e47452fe7291 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsSource.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import org.apache.spark.metrics.{ExecutorMetricType, MetricsSystem} +import org.apache.spark.metrics.source.Source + +/** + * Expose executor metrics from [[ExecutorMetricsType]] using the Dropwizard metrics system. + * + * Metrics related to the memory system can be expensive to gather, therefore + * we implement some optimizations: + * (1) Metrics values are cached, updated at each heartbeat (default period is 10 seconds). + * An alternative faster polling mechanism is used, only if activated, by setting + * spark.executor.metrics.pollingInterval=. + * (2) Procfs metrics are gathered all in one-go and only conditionally: + * if the /proc filesystem exists + * and spark.eventLog.logStageExecutorProcessTreeMetrics.enabled=true + * and spark.eventLog.logStageExecutorMetrics.enabled=true. + */ +private[spark] class ExecutorMetricsSource extends Source { + + override val metricRegistry = new MetricRegistry() + override val sourceName = "ExecutorMetrics" + @volatile var metricsSnapshot: Array[Long] = Array.fill(ExecutorMetricType.numMetrics)(0L) + + // called by ExecutorMetricsPoller + def updateMetricsSnapshot(metricsUpdates: Array[Long]): Unit = { + metricsSnapshot = metricsUpdates + } + + class ExecutorMetricGauge(idx: Int) extends Gauge[Long] { + def getValue: Long = metricsSnapshot(idx) + } + + def register(metricsSystem: MetricsSystem): Unit = { + val gauges: IndexedSeq[ExecutorMetricGauge] = (0 until ExecutorMetricType.numMetrics).map { + idx => new ExecutorMetricGauge(idx) + }.toIndexedSeq + + ExecutorMetricType.metricToOffset.foreach { + case (name, idx) => + metricRegistry.register(MetricRegistry.name(name), gauges(idx)) + } + + metricsSystem.registerSource(this) + } +} diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 00acb1ff115f..9f66f5c9a2f1 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -615,6 +615,12 @@ package object config { .stringConf .createOptional + private[spark] val METRICS_EXECUTORMETRICS_SOURCE_ENABLED = + ConfigBuilder("spark.metrics.executorMetricsSource.enabled") + .doc("Whether to register the ExecutorMetrics source with the metrics system.") + .booleanConf + .createWithDefault(true) + private[spark] val METRICS_STATIC_SOURCES_ENABLED = ConfigBuilder("spark.metrics.static.sources.enabled") .doc("Whether to register static sources with the metrics system.") diff --git a/core/src/test/scala/org/apache/spark/metrics/source/SourceConfigSuite.scala b/core/src/test/scala/org/apache/spark/metrics/source/SourceConfigSuite.scala index 76c568056aee..8f5ab7419d4f 100644 --- a/core/src/test/scala/org/apache/spark/metrics/source/SourceConfigSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/source/SourceConfigSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.metrics.source import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} -import org.apache.spark.internal.config.METRICS_STATIC_SOURCES_ENABLED +import org.apache.spark.internal.config.{METRICS_EXECUTORMETRICS_SOURCE_ENABLED, METRICS_STATIC_SOURCES_ENABLED} class SourceConfigSuite extends SparkFunSuite with LocalSparkContext { @@ -52,4 +52,32 @@ class SourceConfigSuite extends SparkFunSuite with LocalSparkContext { } } + test("Test configuration for adding ExecutorMetrics source registration") { + val conf = new SparkConf() + conf.set(METRICS_EXECUTORMETRICS_SOURCE_ENABLED, true) + val sc = new SparkContext("local", "test", conf) + try { + val metricsSystem = sc.env.metricsSystem + + // ExecutorMetrics source should be registered + assert (metricsSystem.getSourcesByName("ExecutorMetrics").nonEmpty) + } finally { + sc.stop() + } + } + + test("Test configuration for skipping ExecutorMetrics source registration") { + val conf = new SparkConf() + conf.set(METRICS_EXECUTORMETRICS_SOURCE_ENABLED, false) + val sc = new SparkContext("local", "test", conf) + try { + val metricsSystem = sc.env.metricsSystem + + // ExecutorMetrics source should not be registered + assert (metricsSystem.getSourcesByName("ExecutorMetrics").isEmpty) + } finally { + sc.stop() + } + } + } diff --git a/docs/monitoring.md b/docs/monitoring.md index f094d0ba0dfc..88055b68e651 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -995,6 +995,12 @@ This is the component with the largest amount of instrumented metrics - namespace=JVMCPU - jvmCpuTime +- namespace=ExecutorMetrics + - **note:** these metrics are conditional to a configuration parameter: + `spark.metrics.executorMetricsSource.enabled` (default is true) + - This source contains memory-related metrics. A full list of available metrics in this + namespace can be found in the corresponding entry for the Executor component instance. + - namespace=plugin.\ - Optional namespace(s). Metrics in this namespace are defined by user-supplied code, and configured using the Spark plugin API. See "Advanced Instrumentation" below for how to load @@ -1046,6 +1052,41 @@ when running in local mode. - threadpool.maxPool_size - threadpool.startedTasks +- namespace=ExecutorMetrics + - **notes:** + - These metrics are conditional to a configuration parameter: + `spark.metrics.executorMetricsSource.enabled` (default value is true) + - ExecutorMetrics are updated as part of heartbeat processes scheduled + for the executors and for the driver at regular intervals: `spark.executor.heartbeatInterval` (default value is 10 seconds) + - An optional faster polling mechanism is available for executor memory metrics, + it can be activated by setting a polling interval (in milliseconds) using the configuration parameter `spark.executor.metrics.pollingInterval` + - JVMHeapMemory + - JVMOffHeapMemory + - OnHeapExecutionMemory + - OnHeapStorageMemory + - OnHeapUnifiedMemory + - OffHeapExecutionMemory + - OffHeapStorageMemory + - OffHeapUnifiedMemory + - DirectPoolMemory + - MappedPoolMemory + - MinorGCCount + - MinorGCTime + - MajorGCCount + - MajorGCTime + - "ProcessTree*" metric counters: + - ProcessTreeJVMVMemory + - ProcessTreeJVMRSSMemory + - ProcessTreePythonVMemory + - ProcessTreePythonRSSMemory + - ProcessTreeOtherVMemory + - ProcessTreeOtherRSSMemory + - **note:** "ProcessTree*" metrics are collected only under certain conditions. + The conditions are the logical AND of the following: `/proc` filesystem exists, + `spark.eventLog.logStageExecutorProcessTreeMetrics.enabled=true`, + `spark.eventLog.logStageExecutorMetrics.enabled=true`. + "ProcessTree*" metrics report 0 when those conditions are not met. + - namespace=JVMCPU - jvmCpuTime