Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.deploy.StandaloneResourceUtils._
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.executor.{ExecutorMetrics, ExecutorMetricsSource}
import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
Expand Down Expand Up @@ -551,9 +551,16 @@ class SparkContext(config: SparkConf) extends Logging {
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

val _executorMetricsSource =
if (_conf.get(METRICS_EXECUTORMETRICS_SOURCE_ENABLED)) {
Some(new ExecutorMetricsSource)
} else {
None
}

// create and start the heartbeater for collecting memory metrics
_heartbeater = new Heartbeater(
() => SparkContext.this.reportHeartBeat(),
() => SparkContext.this.reportHeartBeat(_executorMetricsSource),
"driver-heartbeater",
conf.get(EXECUTOR_HEARTBEAT_INTERVAL))
_heartbeater.start()
Expand Down Expand Up @@ -622,6 +629,7 @@ class SparkContext(config: SparkConf) extends Logging {
_env.metricsSystem.registerSource(_dagScheduler.metricsSource)
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
_env.metricsSystem.registerSource(new JVMCPUSource())
_executorMetricsSource.foreach(_.register(_env.metricsSystem))
_executorAllocationManager.foreach { e =>
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
}
Expand Down Expand Up @@ -2473,8 +2481,10 @@ class SparkContext(config: SparkConf) extends Logging {
}

/** Reports heartbeat metrics for the driver. */
private def reportHeartBeat(): Unit = {
private def reportHeartBeat(executorMetricsSource: Option[ExecutorMetricsSource]): Unit = {
val currentMetrics = ExecutorMetrics.getCurrentMetrics(env.memoryManager)
executorMetricsSource.foreach(_.updateMetricsSnapshot(currentMetrics))

val driverUpdates = new HashMap[(Int, Int), ExecutorMetrics]
// In the driver, we do not track per-stage metrics, so use a dummy stage for the key
driverUpdates.put(EventLoggingListener.DRIVER_STAGE_KEY, new ExecutorMetrics(currentMetrics))
Expand Down
11 changes: 10 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,18 @@ private[spark] class Executor(
// create. The map key is a task id.
private val taskReaperForTask: HashMap[Long, TaskReaper] = HashMap[Long, TaskReaper]()

val executorMetricsSource =
if (conf.get(METRICS_EXECUTORMETRICS_SOURCE_ENABLED)) {
Some(new ExecutorMetricsSource)
} else {
None
}

if (!isLocal) {
env.blockManager.initialize(conf.getAppId)
env.metricsSystem.registerSource(executorSource)
env.metricsSystem.registerSource(new JVMCPUSource())
executorMetricsSource.foreach(_.register(env.metricsSystem))
env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource)
}

Expand Down Expand Up @@ -210,7 +218,8 @@ private[spark] class Executor(
// Poller for the memory metrics. Visible for testing.
private[executor] val metricsPoller = new ExecutorMetricsPoller(
env.memoryManager,
METRICS_POLLING_INTERVAL_MS)
METRICS_POLLING_INTERVAL_MS,
executorMetricsSource)

// Executor for the heartbeat task.
private val heartbeater = new Heartbeater(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ import org.apache.spark.util.{ThreadUtils, Utils}
*/
private[spark] class ExecutorMetricsPoller(
memoryManager: MemoryManager,
pollingInterval: Long) extends Logging {
pollingInterval: Long,
executorMetricsSource: Option[ExecutorMetricsSource]) extends Logging {

type StageKey = (Int, Int)
// Task Count and Metric Peaks
Expand Down Expand Up @@ -79,6 +80,7 @@ private[spark] class ExecutorMetricsPoller(

// get the latest values for the metrics
val latestMetrics = ExecutorMetrics.getCurrentMetrics(memoryManager)
executorMetricsSource.foreach(_.updateMetricsSnapshot(latestMetrics))

def updatePeaks(metrics: AtomicLongArray): Unit = {
(0 until metrics.length).foreach { i =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.executor

import com.codahale.metrics.{Gauge, MetricRegistry}

import org.apache.spark.metrics.{ExecutorMetricType, MetricsSystem}
import org.apache.spark.metrics.source.Source

/**
* Expose executor metrics from [[ExecutorMetricsType]] using the Dropwizard metrics system.
*
* Metrics related to the memory system can be expensive to gather, therefore
* we implement some optimizations:
* (1) Metrics values are cached, updated at each heartbeat (default period is 10 seconds).
* An alternative faster polling mechanism is used, only if activated, by setting
* spark.executor.metrics.pollingInterval=<interval in ms>.
* (2) Procfs metrics are gathered all in one-go and only conditionally:
* if the /proc filesystem exists
* and spark.eventLog.logStageExecutorProcessTreeMetrics.enabled=true
* and spark.eventLog.logStageExecutorMetrics.enabled=true.
*/
private[spark] class ExecutorMetricsSource extends Source {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd include some comments here explaining why this is setup this way (eg. we're exposing metrics that are a little expensive to check, so a couple of optimizations; (1) the procfs metrics are gathered all in one-go; (2) the values are not checked too often; (3) we re-use the cached values for spark internal stage-level metrics.


override val metricRegistry = new MetricRegistry()
override val sourceName = "ExecutorMetrics"
@volatile var metricsSnapshot: Array[Long] = Array.fill(ExecutorMetricType.numMetrics)(0L)

// called by ExecutorMetricsPoller
def updateMetricsSnapshot(metricsUpdates: Array[Long]): Unit = {
metricsSnapshot = metricsUpdates
}

class ExecutorMetricGauge(idx: Int) extends Gauge[Long] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this class private when I merged it

def getValue: Long = metricsSnapshot(idx)
}

def register(metricsSystem: MetricsSystem): Unit = {
val gauges: IndexedSeq[ExecutorMetricGauge] = (0 until ExecutorMetricType.numMetrics).map {
idx => new ExecutorMetricGauge(idx)
}.toIndexedSeq

ExecutorMetricType.metricToOffset.foreach {
case (name, idx) =>
metricRegistry.register(MetricRegistry.name(name), gauges(idx))
}

metricsSystem.registerSource(this)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,12 @@ package object config {
.stringConf
.createOptional

private[spark] val METRICS_EXECUTORMETRICS_SOURCE_ENABLED =
ConfigBuilder("spark.metrics.executorMetricsSource.enabled")
.doc("Whether to register the ExecutorMetrics source with the metrics system.")
.booleanConf
.createWithDefault(true)

private[spark] val METRICS_STATIC_SOURCES_ENABLED =
ConfigBuilder("spark.metrics.static.sources.enabled")
.doc("Whether to register static sources with the metrics system.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.metrics.source

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.internal.config.METRICS_STATIC_SOURCES_ENABLED
import org.apache.spark.internal.config.{METRICS_EXECUTORMETRICS_SOURCE_ENABLED, METRICS_STATIC_SOURCES_ENABLED}

class SourceConfigSuite extends SparkFunSuite with LocalSparkContext {

Expand Down Expand Up @@ -52,4 +52,32 @@ class SourceConfigSuite extends SparkFunSuite with LocalSparkContext {
}
}

test("Test configuration for adding ExecutorMetrics source registration") {
val conf = new SparkConf()
conf.set(METRICS_EXECUTORMETRICS_SOURCE_ENABLED, true)
val sc = new SparkContext("local", "test", conf)
try {
val metricsSystem = sc.env.metricsSystem

// ExecutorMetrics source should be registered
assert (metricsSystem.getSourcesByName("ExecutorMetrics").nonEmpty)
} finally {
sc.stop()
}
}

test("Test configuration for skipping ExecutorMetrics source registration") {
val conf = new SparkConf()
conf.set(METRICS_EXECUTORMETRICS_SOURCE_ENABLED, false)
val sc = new SparkContext("local", "test", conf)
try {
val metricsSystem = sc.env.metricsSystem

// ExecutorMetrics source should not be registered
assert (metricsSystem.getSourcesByName("ExecutorMetrics").isEmpty)
} finally {
sc.stop()
}
}

}
41 changes: 41 additions & 0 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,12 @@ This is the component with the largest amount of instrumented metrics
- namespace=JVMCPU
- jvmCpuTime

- namespace=ExecutorMetrics
- **note:** these metrics are conditional to a configuration parameter:
`spark.metrics.executorMetricsSource.enabled` (default is true)
- This source contains memory-related metrics. A full list of available metrics in this
namespace can be found in the corresponding entry for the Executor component instance.

- namespace=plugin.\<Plugin Class Name>
- Optional namespace(s). Metrics in this namespace are defined by user-supplied code, and
configured using the Spark plugin API. See "Advanced Instrumentation" below for how to load
Expand Down Expand Up @@ -1046,6 +1052,41 @@ when running in local mode.
- threadpool.maxPool_size
- threadpool.startedTasks

- namespace=ExecutorMetrics
- **notes:**
- These metrics are conditional to a configuration parameter:
`spark.metrics.executorMetricsSource.enabled` (default value is true)
- ExecutorMetrics are updated as part of heartbeat processes scheduled
for the executors and for the driver at regular intervals: `spark.executor.heartbeatInterval` (default value is 10 seconds)
- An optional faster polling mechanism is available for executor memory metrics,
it can be activated by setting a polling interval (in milliseconds) using the configuration parameter `spark.executor.metrics.pollingInterval`
- JVMHeapMemory
- JVMOffHeapMemory
- OnHeapExecutionMemory
- OnHeapStorageMemory
- OnHeapUnifiedMemory
- OffHeapExecutionMemory
- OffHeapStorageMemory
- OffHeapUnifiedMemory
- DirectPoolMemory
- MappedPoolMemory
- MinorGCCount
- MinorGCTime
- MajorGCCount
- MajorGCTime
- "ProcessTree*" metric counters:
- ProcessTreeJVMVMemory
- ProcessTreeJVMRSSMemory
- ProcessTreePythonVMemory
- ProcessTreePythonRSSMemory
- ProcessTreeOtherVMemory
- ProcessTreeOtherRSSMemory
- **note:** "ProcessTree*" metrics are collected only under certain conditions.
The conditions are the logical AND of the following: `/proc` filesystem exists,
`spark.eventLog.logStageExecutorProcessTreeMetrics.enabled=true`,
`spark.eventLog.logStageExecutorMetrics.enabled=true`.
"ProcessTree*" metrics report 0 when those conditions are not met.

- namespace=JVMCPU
- jvmCpuTime

Expand Down