From 005f95e5b823f5ac6e20dd630413790e9299ecbc Mon Sep 17 00:00:00 2001 From: Luca Canali Date: Fri, 1 Nov 2019 20:06:47 +0100 Subject: [PATCH 1/6] Executor Metrics integration with the Spark metrics system re-implemeted after #23767 --- .../scala/org/apache/spark/SparkContext.scala | 12 +++-- .../org/apache/spark/executor/Executor.scala | 7 ++- .../executor/ExecutorMetricsPoller.scala | 5 +- .../executor/ExecutorMetricsSource.scala | 52 +++++++++++++++++++ docs/monitoring.md | 36 +++++++++++++ 5 files changed, 107 insertions(+), 5 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/executor/ExecutorMetricsSource.scala diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index db3f2266cf33..ac051d212c4f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -42,7 +42,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} import org.apache.spark.deploy.StandaloneResourceUtils._ -import org.apache.spark.executor.ExecutorMetrics +import org.apache.spark.executor.{ExecutorMetrics, ExecutorMetricsSource} import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -551,9 +551,12 @@ class SparkContext(config: SparkConf) extends Logging { _dagScheduler = new DAGScheduler(this) _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet) + val executorMetricsSource = new ExecutorMetricsSource + executorMetricsSource.register + // create and start the heartbeater for collecting memory metrics _heartbeater = new Heartbeater( - () => SparkContext.this.reportHeartBeat(), + () => SparkContext.this.reportHeartBeat(executorMetricsSource), "driver-heartbeater", conf.get(EXECUTOR_HEARTBEAT_INTERVAL)) _heartbeater.start() @@ -622,6 +625,7 @@ class SparkContext(config: SparkConf) extends Logging { _env.metricsSystem.registerSource(_dagScheduler.metricsSource) _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager)) _env.metricsSystem.registerSource(new JVMCPUSource()) + env.metricsSystem.registerSource(executorMetricsSource) _executorAllocationManager.foreach { e => _env.metricsSystem.registerSource(e.executorAllocationManagerSource) } @@ -2473,8 +2477,10 @@ class SparkContext(config: SparkConf) extends Logging { } /** Reports heartbeat metrics for the driver. */ - private def reportHeartBeat(): Unit = { + private def reportHeartBeat(executorMetricsSource: ExecutorMetricsSource): Unit = { val currentMetrics = ExecutorMetrics.getCurrentMetrics(env.memoryManager) + executorMetricsSource.updateMetricsSnapshot(currentMetrics) + val driverUpdates = new HashMap[(Int, Int), ExecutorMetrics] // In the driver, we do not track per-stage metrics, so use a dummy stage for the key driverUpdates.put(EventLoggingListener.DRIVER_STAGE_KEY, new ExecutorMetrics(currentMetrics)) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 0f595d095a22..5c1a2dc3572f 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -113,10 +113,14 @@ private[spark] class Executor( // create. The map key is a task id. private val taskReaperForTask: HashMap[Long, TaskReaper] = HashMap[Long, TaskReaper]() + val executorMetricsSource = new ExecutorMetricsSource + if (!isLocal) { env.blockManager.initialize(conf.getAppId) env.metricsSystem.registerSource(executorSource) env.metricsSystem.registerSource(new JVMCPUSource()) + executorMetricsSource.register + env.metricsSystem.registerSource(executorMetricsSource) env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource) } @@ -210,7 +214,8 @@ private[spark] class Executor( // Poller for the memory metrics. Visible for testing. private[executor] val metricsPoller = new ExecutorMetricsPoller( env.memoryManager, - METRICS_POLLING_INTERVAL_MS) + METRICS_POLLING_INTERVAL_MS, + executorMetricsSource) // Executor for the heartbeat task. private val heartbeater = new Heartbeater( diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala index 805b0f729b12..37424eea0c7f 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala @@ -48,7 +48,8 @@ import org.apache.spark.util.{ThreadUtils, Utils} */ private[spark] class ExecutorMetricsPoller( memoryManager: MemoryManager, - pollingInterval: Long) extends Logging { + pollingInterval: Long, + executorMetricsSource: ExecutorMetricsSource) extends Logging { type StageKey = (Int, Int) // Task Count and Metric Peaks @@ -80,6 +81,8 @@ private[spark] class ExecutorMetricsPoller( // get the latest values for the metrics val latestMetrics = ExecutorMetrics.getCurrentMetrics(memoryManager) + executorMetricsSource.updateMetricsSnapshot(latestMetrics) + def updatePeaks(metrics: AtomicLongArray): Unit = { (0 until metrics.length).foreach { i => metrics.getAndAccumulate(i, latestMetrics(i), math.max) diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsSource.scala new file mode 100644 index 000000000000..bcaea7c4975e --- /dev/null +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsSource.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import org.apache.spark.metrics.ExecutorMetricType +import org.apache.spark.metrics.source.Source + +private[spark] class ExecutorMetricsSource extends Source { + + override val metricRegistry = new MetricRegistry() + override val sourceName = "ExecutorMetrics" + @volatile var metricsSnapshot: Array[Long] = Array.fill(ExecutorMetricType.numMetrics)(0L) + + // called by ExecutorMetricsPoller + def updateMetricsSnapshot(metricsUpdates: Array[Long]): Unit = { + metricsSnapshot = metricsUpdates + } + + class ExecutorMetricGauge(idx: Int) extends Gauge[Long] { + def getValue: Long = metricsSnapshot(idx) + } + + def register: Unit = { + // This looks like a bunch of independent gauges as far the metric system + // is concerned, but actually they're all using one shared snapshot. + val gauges: IndexedSeq[ExecutorMetricGauge] = (0 until ExecutorMetricType.numMetrics).map { + idx => new ExecutorMetricGauge(idx) + }.toIndexedSeq + + ExecutorMetricType.metricToOffset.foreach { + case (name, idx) => + metricRegistry.register(MetricRegistry.name(name), gauges(idx)) + } + } +} diff --git a/docs/monitoring.md b/docs/monitoring.md index f094d0ba0dfc..c8135bc5e831 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -995,6 +995,10 @@ This is the component with the largest amount of instrumented metrics - namespace=JVMCPU - jvmCpuTime +- namespace=ExecutorMetrics + - This contains memory-related metrics. A full list of available metrics in this namespace can be + found in the corresponding entry for the Executor component instance. + - namespace=plugin.\ - Optional namespace(s). Metrics in this namespace are defined by user-supplied code, and configured using the Spark plugin API. See "Advanced Instrumentation" below for how to load @@ -1046,6 +1050,38 @@ when running in local mode. - threadpool.maxPool_size - threadpool.startedTasks +- namespace=ExecutorMetrics + - **note:** ExecutorMetrics are updated as part of heartbeat processes scheduled + for the executors and for the driver at regular intervals: `spark.executor.heartbeatInterval`, default 10 seconds + An optional faster polling mechanism is available for executor memory metrics, + it can be activated by setting a polling interval (in milliseconds) using the configuration parameter `spark.executor.metrics.pollingInterval` + - JVMHeapMemory + - JVMOffHeapMemory + - OnHeapExecutionMemory + - OnHeapStorageMemory + - OnHeapUnifiedMemory + - OffHeapExecutionMemory + - OffHeapStorageMemory + - OffHeapUnifiedMemory + - DirectPoolMemory + - MappedPoolMemory + - MinorGCCount + - MinorGCTime + - MajorGCCount + - MajorGCTime + - "ProcessTree*" metric counters: + - ProcessTreeJVMVMemory + - ProcessTreeJVMRSSMemory + - ProcessTreePythonVMemory + - ProcessTreePythonRSSMemory + - ProcessTreeOtherVMemory + - ProcessTreeOtherRSSMemory + - **note:** "ProcessTree*" metrics are collected only under certain conditions. + The conditions are the logical AND of the following: `/proc` filesystem exists, + `spark.eventLog.logStageExecutorProcessTreeMetrics.enabled=true`, + `spark.eventLog.logStageExecutorMetrics.enabled=true`. + "ProcessTree*" metrics report 0 when those conditions are not met. + - namespace=JVMCPU - jvmCpuTime From 3f2e8ff2928af35b8b9c56e803dc914e9943eac1 Mon Sep 17 00:00:00 2001 From: Luca Canali Date: Mon, 11 Nov 2019 15:28:55 +0100 Subject: [PATCH 2/6] Added configuration parameter and tests --- .../scala/org/apache/spark/SparkContext.scala | 13 ++++++-- .../org/apache/spark/executor/Executor.scala | 13 ++++++-- .../executor/ExecutorMetricsPoller.scala | 4 ++- .../spark/internal/config/package.scala | 6 ++++ .../metrics/source/SourceConfigSuite.scala | 30 ++++++++++++++++++- docs/monitoring.md | 15 ++++++---- 6 files changed, 68 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ac051d212c4f..2156226e3857 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -551,8 +551,12 @@ class SparkContext(config: SparkConf) extends Logging { _dagScheduler = new DAGScheduler(this) _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet) - val executorMetricsSource = new ExecutorMetricsSource - executorMetricsSource.register + val executorMetricsSource = + if (_conf.get(METRICS_EXECUTORMETRICS_SOURCE_ENABLED)) { + new ExecutorMetricsSource + } else { + null + } // create and start the heartbeater for collecting memory metrics _heartbeater = new Heartbeater( @@ -625,7 +629,10 @@ class SparkContext(config: SparkConf) extends Logging { _env.metricsSystem.registerSource(_dagScheduler.metricsSource) _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager)) _env.metricsSystem.registerSource(new JVMCPUSource()) - env.metricsSystem.registerSource(executorMetricsSource) + if (executorMetricsSource != null) { + executorMetricsSource.register + env.metricsSystem.registerSource(executorMetricsSource) + } _executorAllocationManager.foreach { e => _env.metricsSystem.registerSource(e.executorAllocationManagerSource) } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 5c1a2dc3572f..d6725fc86016 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -113,14 +113,21 @@ private[spark] class Executor( // create. The map key is a task id. private val taskReaperForTask: HashMap[Long, TaskReaper] = HashMap[Long, TaskReaper]() - val executorMetricsSource = new ExecutorMetricsSource + val executorMetricsSource = + if (conf.get(METRICS_EXECUTORMETRICS_SOURCE_ENABLED)) { + new ExecutorMetricsSource + } else { + null + } if (!isLocal) { env.blockManager.initialize(conf.getAppId) env.metricsSystem.registerSource(executorSource) env.metricsSystem.registerSource(new JVMCPUSource()) - executorMetricsSource.register - env.metricsSystem.registerSource(executorMetricsSource) + if (conf.get(METRICS_EXECUTORMETRICS_SOURCE_ENABLED)) { + executorMetricsSource.register + env.metricsSystem.registerSource(executorMetricsSource) + } env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource) } diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala index 37424eea0c7f..d2a6ed134f2b 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala @@ -81,7 +81,9 @@ private[spark] class ExecutorMetricsPoller( // get the latest values for the metrics val latestMetrics = ExecutorMetrics.getCurrentMetrics(memoryManager) - executorMetricsSource.updateMetricsSnapshot(latestMetrics) + if (executorMetricsSource != null) { + executorMetricsSource.updateMetricsSnapshot(latestMetrics) + } def updatePeaks(metrics: AtomicLongArray): Unit = { (0 until metrics.length).foreach { i => diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 00acb1ff115f..14213662a0ba 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -615,6 +615,12 @@ package object config { .stringConf .createOptional + private[spark] val METRICS_EXECUTORMETRICS_SOURCE_ENABLED = + ConfigBuilder("spark.metrics.executormetrics.source.enabled") + .doc("Whether to register the ExecutorMetrics source with the metrics system.") + .booleanConf + .createWithDefault(true) + private[spark] val METRICS_STATIC_SOURCES_ENABLED = ConfigBuilder("spark.metrics.static.sources.enabled") .doc("Whether to register static sources with the metrics system.") diff --git a/core/src/test/scala/org/apache/spark/metrics/source/SourceConfigSuite.scala b/core/src/test/scala/org/apache/spark/metrics/source/SourceConfigSuite.scala index 76c568056aee..8f5ab7419d4f 100644 --- a/core/src/test/scala/org/apache/spark/metrics/source/SourceConfigSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/source/SourceConfigSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.metrics.source import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} -import org.apache.spark.internal.config.METRICS_STATIC_SOURCES_ENABLED +import org.apache.spark.internal.config.{METRICS_EXECUTORMETRICS_SOURCE_ENABLED, METRICS_STATIC_SOURCES_ENABLED} class SourceConfigSuite extends SparkFunSuite with LocalSparkContext { @@ -52,4 +52,32 @@ class SourceConfigSuite extends SparkFunSuite with LocalSparkContext { } } + test("Test configuration for adding ExecutorMetrics source registration") { + val conf = new SparkConf() + conf.set(METRICS_EXECUTORMETRICS_SOURCE_ENABLED, true) + val sc = new SparkContext("local", "test", conf) + try { + val metricsSystem = sc.env.metricsSystem + + // ExecutorMetrics source should be registered + assert (metricsSystem.getSourcesByName("ExecutorMetrics").nonEmpty) + } finally { + sc.stop() + } + } + + test("Test configuration for skipping ExecutorMetrics source registration") { + val conf = new SparkConf() + conf.set(METRICS_EXECUTORMETRICS_SOURCE_ENABLED, false) + val sc = new SparkContext("local", "test", conf) + try { + val metricsSystem = sc.env.metricsSystem + + // ExecutorMetrics source should not be registered + assert (metricsSystem.getSourcesByName("ExecutorMetrics").isEmpty) + } finally { + sc.stop() + } + } + } diff --git a/docs/monitoring.md b/docs/monitoring.md index c8135bc5e831..5cc1028c4d24 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -996,9 +996,11 @@ This is the component with the largest amount of instrumented metrics - jvmCpuTime - namespace=ExecutorMetrics - - This contains memory-related metrics. A full list of available metrics in this namespace can be - found in the corresponding entry for the Executor component instance. - + - **note:** these metrics are conditional to a configuration parameter: + `spark.metrics.executormetrics.source.enabled` (default is true) + - This source contains memory-related metrics. A full list of available metrics in this + namespace can be found in the corresponding entry for the Executor component instance. + - namespace=plugin.\ - Optional namespace(s). Metrics in this namespace are defined by user-supplied code, and configured using the Spark plugin API. See "Advanced Instrumentation" below for how to load @@ -1051,9 +1053,12 @@ when running in local mode. - threadpool.startedTasks - namespace=ExecutorMetrics - - **note:** ExecutorMetrics are updated as part of heartbeat processes scheduled + - **notes:** + - These metrics are conditional to a configuration parameter: + `spark.metrics.executormetrics.source.enabled` (default is true) + - ExecutorMetrics are updated as part of heartbeat processes scheduled for the executors and for the driver at regular intervals: `spark.executor.heartbeatInterval`, default 10 seconds - An optional faster polling mechanism is available for executor memory metrics, + - An optional faster polling mechanism is available for executor memory metrics, it can be activated by setting a polling interval (in milliseconds) using the configuration parameter `spark.executor.metrics.pollingInterval` - JVMHeapMemory - JVMOffHeapMemory From 01c592522999748ca92b92bfe23cd4fbe0050acc Mon Sep 17 00:00:00 2001 From: Luca Canali Date: Mon, 25 Nov 2019 11:36:27 +0100 Subject: [PATCH 3/6] Updates following review comments. --- .../scala/org/apache/spark/SparkContext.scala | 22 +++++++++++-------- .../org/apache/spark/executor/Executor.scala | 10 ++++----- .../executor/ExecutorMetricsPoller.scala | 7 +++--- .../executor/ExecutorMetricsSource.scala | 17 ++++++++++---- docs/monitoring.md | 4 ++-- 5 files changed, 37 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 2156226e3857..41f438fbeb4c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -551,16 +551,16 @@ class SparkContext(config: SparkConf) extends Logging { _dagScheduler = new DAGScheduler(this) _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet) - val executorMetricsSource = + val _executorMetricsSource = if (_conf.get(METRICS_EXECUTORMETRICS_SOURCE_ENABLED)) { - new ExecutorMetricsSource + Some(new ExecutorMetricsSource) } else { - null + None } // create and start the heartbeater for collecting memory metrics _heartbeater = new Heartbeater( - () => SparkContext.this.reportHeartBeat(executorMetricsSource), + () => SparkContext.this.reportHeartBeat(_executorMetricsSource), "driver-heartbeater", conf.get(EXECUTOR_HEARTBEAT_INTERVAL)) _heartbeater.start() @@ -629,9 +629,9 @@ class SparkContext(config: SparkConf) extends Logging { _env.metricsSystem.registerSource(_dagScheduler.metricsSource) _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager)) _env.metricsSystem.registerSource(new JVMCPUSource()) - if (executorMetricsSource != null) { - executorMetricsSource.register - env.metricsSystem.registerSource(executorMetricsSource) + _executorMetricsSource match { + case Some(executorMetricsSource: ExecutorMetricsSource) => + executorMetricsSource.register(_env.metricsSystem) } _executorAllocationManager.foreach { e => _env.metricsSystem.registerSource(e.executorAllocationManagerSource) @@ -2484,9 +2484,13 @@ class SparkContext(config: SparkConf) extends Logging { } /** Reports heartbeat metrics for the driver. */ - private def reportHeartBeat(executorMetricsSource: ExecutorMetricsSource): Unit = { + private def reportHeartBeat(executorMetricsSource: Option[ExecutorMetricsSource]): Unit = { val currentMetrics = ExecutorMetrics.getCurrentMetrics(env.memoryManager) - executorMetricsSource.updateMetricsSnapshot(currentMetrics) + + executorMetricsSource match { + case Some(executorMetricsSource: ExecutorMetricsSource) => + executorMetricsSource.updateMetricsSnapshot(currentMetrics) + } val driverUpdates = new HashMap[(Int, Int), ExecutorMetrics] // In the driver, we do not track per-stage metrics, so use a dummy stage for the key diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index d6725fc86016..96ebeaae018c 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -115,18 +115,18 @@ private[spark] class Executor( val executorMetricsSource = if (conf.get(METRICS_EXECUTORMETRICS_SOURCE_ENABLED)) { - new ExecutorMetricsSource + Some(new ExecutorMetricsSource) } else { - null + None } if (!isLocal) { env.blockManager.initialize(conf.getAppId) env.metricsSystem.registerSource(executorSource) env.metricsSystem.registerSource(new JVMCPUSource()) - if (conf.get(METRICS_EXECUTORMETRICS_SOURCE_ENABLED)) { - executorMetricsSource.register - env.metricsSystem.registerSource(executorMetricsSource) + executorMetricsSource match { + case Some(executorMetricsSource: ExecutorMetricsSource) => + executorMetricsSource.register(env.metricsSystem) } env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource) } diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala index d2a6ed134f2b..e247dd546e14 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala @@ -49,7 +49,7 @@ import org.apache.spark.util.{ThreadUtils, Utils} private[spark] class ExecutorMetricsPoller( memoryManager: MemoryManager, pollingInterval: Long, - executorMetricsSource: ExecutorMetricsSource) extends Logging { + executorMetricsSource: Option[ExecutorMetricsSource]) extends Logging { type StageKey = (Int, Int) // Task Count and Metric Peaks @@ -81,8 +81,9 @@ private[spark] class ExecutorMetricsPoller( // get the latest values for the metrics val latestMetrics = ExecutorMetrics.getCurrentMetrics(memoryManager) - if (executorMetricsSource != null) { - executorMetricsSource.updateMetricsSnapshot(latestMetrics) + executorMetricsSource match { + case Some(executorMetricsSource: ExecutorMetricsSource) => + executorMetricsSource.updateMetricsSnapshot(latestMetrics) } def updatePeaks(metrics: AtomicLongArray): Unit = { diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsSource.scala index bcaea7c4975e..48e2311e4189 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsSource.scala @@ -19,9 +19,18 @@ package org.apache.spark.executor import com.codahale.metrics.{Gauge, MetricRegistry} -import org.apache.spark.metrics.ExecutorMetricType +import org.apache.spark.metrics.{ExecutorMetricType, MetricsSystem} import org.apache.spark.metrics.source.Source +// Expose executor metrics from [[ExecutorMetricsType]] using the Dropwizard metrics system +// Metrics related the memory system can be expensive to gather, we implement some optimizations: +// (1) Metrics values are cached, updated at each heartbeat (default period is 10 seconds) +// An alternative faster polling mechanism is used only if activated, by setting +// spark.executor.metrics.pollingInterval= +// (2) procfs metrics are gathered all in one-go and only conditionally: +// if the /proc filesystem exists +// and spark.eventLog.logStageExecutorProcessTreeMetrics.enabled=true +// and spark.eventLog.logStageExecutorMetrics.enabled=true private[spark] class ExecutorMetricsSource extends Source { override val metricRegistry = new MetricRegistry() @@ -37,9 +46,7 @@ private[spark] class ExecutorMetricsSource extends Source { def getValue: Long = metricsSnapshot(idx) } - def register: Unit = { - // This looks like a bunch of independent gauges as far the metric system - // is concerned, but actually they're all using one shared snapshot. + def register(metricsSystem: MetricsSystem): Unit = { val gauges: IndexedSeq[ExecutorMetricGauge] = (0 until ExecutorMetricType.numMetrics).map { idx => new ExecutorMetricGauge(idx) }.toIndexedSeq @@ -48,5 +55,7 @@ private[spark] class ExecutorMetricsSource extends Source { case (name, idx) => metricRegistry.register(MetricRegistry.name(name), gauges(idx)) } + + metricsSystem.registerSource(this) } } diff --git a/docs/monitoring.md b/docs/monitoring.md index 5cc1028c4d24..f8c142ad24d2 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -1055,9 +1055,9 @@ when running in local mode. - namespace=ExecutorMetrics - **notes:** - These metrics are conditional to a configuration parameter: - `spark.metrics.executormetrics.source.enabled` (default is true) + `spark.metrics.executormetrics.source.enabled` (default value is true) - ExecutorMetrics are updated as part of heartbeat processes scheduled - for the executors and for the driver at regular intervals: `spark.executor.heartbeatInterval`, default 10 seconds + for the executors and for the driver at regular intervals: `spark.executor.heartbeatInterval` (default value is 10 seconds) - An optional faster polling mechanism is available for executor memory metrics, it can be activated by setting a polling interval (in milliseconds) using the configuration parameter `spark.executor.metrics.pollingInterval` - JVMHeapMemory From 1a0a3e9439df7490f7fb3a1f3451a463ad67be5b Mon Sep 17 00:00:00 2001 From: Luca Canali Date: Fri, 29 Nov 2019 09:30:30 +0100 Subject: [PATCH 4/6] Implement naming convention for spark metrics configuration parameters --- .../main/scala/org/apache/spark/internal/config/package.scala | 2 +- docs/monitoring.md | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 14213662a0ba..9f66f5c9a2f1 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -616,7 +616,7 @@ package object config { .createOptional private[spark] val METRICS_EXECUTORMETRICS_SOURCE_ENABLED = - ConfigBuilder("spark.metrics.executormetrics.source.enabled") + ConfigBuilder("spark.metrics.executorMetricsSource.enabled") .doc("Whether to register the ExecutorMetrics source with the metrics system.") .booleanConf .createWithDefault(true) diff --git a/docs/monitoring.md b/docs/monitoring.md index f8c142ad24d2..88055b68e651 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -997,7 +997,7 @@ This is the component with the largest amount of instrumented metrics - namespace=ExecutorMetrics - **note:** these metrics are conditional to a configuration parameter: - `spark.metrics.executormetrics.source.enabled` (default is true) + `spark.metrics.executorMetricsSource.enabled` (default is true) - This source contains memory-related metrics. A full list of available metrics in this namespace can be found in the corresponding entry for the Executor component instance. @@ -1055,7 +1055,7 @@ when running in local mode. - namespace=ExecutorMetrics - **notes:** - These metrics are conditional to a configuration parameter: - `spark.metrics.executormetrics.source.enabled` (default value is true) + `spark.metrics.executorMetricsSource.enabled` (default value is true) - ExecutorMetrics are updated as part of heartbeat processes scheduled for the executors and for the driver at regular intervals: `spark.executor.heartbeatInterval` (default value is 10 seconds) - An optional faster polling mechanism is available for executor memory metrics, From 4ab39cd21de28d8caa8a755ed6832a69e0d9fc8f Mon Sep 17 00:00:00 2001 From: Luca Canali Date: Mon, 2 Dec 2019 21:21:35 +0100 Subject: [PATCH 5/6] Added case None --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 ++ core/src/main/scala/org/apache/spark/executor/Executor.scala | 1 + .../scala/org/apache/spark/executor/ExecutorMetricsPoller.scala | 1 + 3 files changed, 4 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 41f438fbeb4c..9939e7ace994 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -632,6 +632,7 @@ class SparkContext(config: SparkConf) extends Logging { _executorMetricsSource match { case Some(executorMetricsSource: ExecutorMetricsSource) => executorMetricsSource.register(_env.metricsSystem) + case None => None } _executorAllocationManager.foreach { e => _env.metricsSystem.registerSource(e.executorAllocationManagerSource) @@ -2490,6 +2491,7 @@ class SparkContext(config: SparkConf) extends Logging { executorMetricsSource match { case Some(executorMetricsSource: ExecutorMetricsSource) => executorMetricsSource.updateMetricsSnapshot(currentMetrics) + case None => None } val driverUpdates = new HashMap[(Int, Int), ExecutorMetrics] diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 96ebeaae018c..4b01ee11c822 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -127,6 +127,7 @@ private[spark] class Executor( executorMetricsSource match { case Some(executorMetricsSource: ExecutorMetricsSource) => executorMetricsSource.register(env.metricsSystem) + case None => None } env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource) } diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala index e247dd546e14..5de93c11f6c1 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala @@ -84,6 +84,7 @@ private[spark] class ExecutorMetricsPoller( executorMetricsSource match { case Some(executorMetricsSource: ExecutorMetricsSource) => executorMetricsSource.updateMetricsSnapshot(latestMetrics) + case None => None } def updatePeaks(metrics: AtomicLongArray): Unit = { From bb5f2b8e5bc00223c8b45d358f49b550c72a84a0 Mon Sep 17 00:00:00 2001 From: Luca Canali Date: Thu, 5 Dec 2019 08:57:56 +0100 Subject: [PATCH 6/6] addressed review comment --- .../scala/org/apache/spark/SparkContext.scala | 13 ++--------- .../org/apache/spark/executor/Executor.scala | 6 +---- .../executor/ExecutorMetricsPoller.scala | 7 +----- .../executor/ExecutorMetricsSource.scala | 22 +++++++++++-------- 4 files changed, 17 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 9939e7ace994..2fdbd4f77aae 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -629,11 +629,7 @@ class SparkContext(config: SparkConf) extends Logging { _env.metricsSystem.registerSource(_dagScheduler.metricsSource) _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager)) _env.metricsSystem.registerSource(new JVMCPUSource()) - _executorMetricsSource match { - case Some(executorMetricsSource: ExecutorMetricsSource) => - executorMetricsSource.register(_env.metricsSystem) - case None => None - } + _executorMetricsSource.foreach(_.register(_env.metricsSystem)) _executorAllocationManager.foreach { e => _env.metricsSystem.registerSource(e.executorAllocationManagerSource) } @@ -2487,12 +2483,7 @@ class SparkContext(config: SparkConf) extends Logging { /** Reports heartbeat metrics for the driver. */ private def reportHeartBeat(executorMetricsSource: Option[ExecutorMetricsSource]): Unit = { val currentMetrics = ExecutorMetrics.getCurrentMetrics(env.memoryManager) - - executorMetricsSource match { - case Some(executorMetricsSource: ExecutorMetricsSource) => - executorMetricsSource.updateMetricsSnapshot(currentMetrics) - case None => None - } + executorMetricsSource.foreach(_.updateMetricsSnapshot(currentMetrics)) val driverUpdates = new HashMap[(Int, Int), ExecutorMetrics] // In the driver, we do not track per-stage metrics, so use a dummy stage for the key diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 4b01ee11c822..41332d05978e 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -124,11 +124,7 @@ private[spark] class Executor( env.blockManager.initialize(conf.getAppId) env.metricsSystem.registerSource(executorSource) env.metricsSystem.registerSource(new JVMCPUSource()) - executorMetricsSource match { - case Some(executorMetricsSource: ExecutorMetricsSource) => - executorMetricsSource.register(env.metricsSystem) - case None => None - } + executorMetricsSource.foreach(_.register(env.metricsSystem)) env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource) } diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala index 5de93c11f6c1..1c1a1ca8035d 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala @@ -80,12 +80,7 @@ private[spark] class ExecutorMetricsPoller( // get the latest values for the metrics val latestMetrics = ExecutorMetrics.getCurrentMetrics(memoryManager) - - executorMetricsSource match { - case Some(executorMetricsSource: ExecutorMetricsSource) => - executorMetricsSource.updateMetricsSnapshot(latestMetrics) - case None => None - } + executorMetricsSource.foreach(_.updateMetricsSnapshot(latestMetrics)) def updatePeaks(metrics: AtomicLongArray): Unit = { (0 until metrics.length).foreach { i => diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsSource.scala index 48e2311e4189..e47452fe7291 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsSource.scala @@ -22,15 +22,19 @@ import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.spark.metrics.{ExecutorMetricType, MetricsSystem} import org.apache.spark.metrics.source.Source -// Expose executor metrics from [[ExecutorMetricsType]] using the Dropwizard metrics system -// Metrics related the memory system can be expensive to gather, we implement some optimizations: -// (1) Metrics values are cached, updated at each heartbeat (default period is 10 seconds) -// An alternative faster polling mechanism is used only if activated, by setting -// spark.executor.metrics.pollingInterval= -// (2) procfs metrics are gathered all in one-go and only conditionally: -// if the /proc filesystem exists -// and spark.eventLog.logStageExecutorProcessTreeMetrics.enabled=true -// and spark.eventLog.logStageExecutorMetrics.enabled=true +/** + * Expose executor metrics from [[ExecutorMetricsType]] using the Dropwizard metrics system. + * + * Metrics related to the memory system can be expensive to gather, therefore + * we implement some optimizations: + * (1) Metrics values are cached, updated at each heartbeat (default period is 10 seconds). + * An alternative faster polling mechanism is used, only if activated, by setting + * spark.executor.metrics.pollingInterval=. + * (2) Procfs metrics are gathered all in one-go and only conditionally: + * if the /proc filesystem exists + * and spark.eventLog.logStageExecutorProcessTreeMetrics.enabled=true + * and spark.eventLog.logStageExecutorMetrics.enabled=true. + */ private[spark] class ExecutorMetricsSource extends Source { override val metricRegistry = new MetricRegistry()