Skip to content

Commit 756e849

Browse files
committed
Executor Metrics integration with the Spark metrics system re-implemeted after #23767
1 parent 8a4378c commit 756e849

File tree

5 files changed

+109
-8
lines changed

5 files changed

+109
-8
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,22 +27,20 @@ import scala.collection.JavaConverters._
2727
import scala.collection.Map
2828
import scala.collection.mutable.HashMap
2929
import scala.language.implicitConversions
30-
import scala.reflect.{classTag, ClassTag}
30+
import scala.reflect.{ClassTag, classTag}
3131
import scala.util.control.NonFatal
32-
3332
import com.google.common.collect.MapMaker
3433
import org.apache.hadoop.conf.Configuration
3534
import org.apache.hadoop.fs.{FileSystem, Path}
3635
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
3736
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, TextInputFormat}
3837
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
3938
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
40-
4139
import org.apache.spark.annotation.DeveloperApi
4240
import org.apache.spark.broadcast.Broadcast
4341
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
4442
import org.apache.spark.deploy.StandaloneResourceUtils._
45-
import org.apache.spark.executor.ExecutorMetrics
43+
import org.apache.spark.executor.{ExecutorMetrics, ExecutorMetricsSource}
4644
import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
4745
import org.apache.spark.internal.Logging
4846
import org.apache.spark.internal.config._
@@ -546,9 +544,12 @@ class SparkContext(config: SparkConf) extends Logging {
546544
_dagScheduler = new DAGScheduler(this)
547545
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
548546

547+
val executorMetricsSource = new ExecutorMetricsSource
548+
executorMetricsSource.register
549+
549550
// create and start the heartbeater for collecting memory metrics
550551
_heartbeater = new Heartbeater(
551-
() => SparkContext.this.reportHeartBeat(),
552+
() => SparkContext.this.reportHeartBeat(executorMetricsSource),
552553
"driver-heartbeater",
553554
conf.get(EXECUTOR_HEARTBEAT_INTERVAL))
554555
_heartbeater.start()
@@ -617,6 +618,7 @@ class SparkContext(config: SparkConf) extends Logging {
617618
_env.metricsSystem.registerSource(_dagScheduler.metricsSource)
618619
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
619620
_env.metricsSystem.registerSource(new JVMCPUSource())
621+
env.metricsSystem.registerSource(executorMetricsSource)
620622
_executorAllocationManager.foreach { e =>
621623
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
622624
}
@@ -2464,8 +2466,10 @@ class SparkContext(config: SparkConf) extends Logging {
24642466
}
24652467

24662468
/** Reports heartbeat metrics for the driver. */
2467-
private def reportHeartBeat(): Unit = {
2469+
private def reportHeartBeat(executorMetricsSource: ExecutorMetricsSource): Unit = {
24682470
val currentMetrics = ExecutorMetrics.getCurrentMetrics(env.memoryManager)
2471+
executorMetricsSource.updateMetricsSnapshot(currentMetrics)
2472+
24692473
val driverUpdates = new HashMap[(Int, Int), ExecutorMetrics]
24702474
// In the driver, we do not track per-stage metrics, so use a dummy stage for the key
24712475
driverUpdates.put(EventLoggingListener.DRIVER_STAGE_KEY, new ExecutorMetrics(currentMetrics))

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,14 @@ private[spark] class Executor(
112112
// create. The map key is a task id.
113113
private val taskReaperForTask: HashMap[Long, TaskReaper] = HashMap[Long, TaskReaper]()
114114

115+
val executorMetricsSource = new ExecutorMetricsSource
116+
115117
if (!isLocal) {
116118
env.blockManager.initialize(conf.getAppId)
117119
env.metricsSystem.registerSource(executorSource)
118120
env.metricsSystem.registerSource(new JVMCPUSource())
121+
executorMetricsSource.register
122+
env.metricsSystem.registerSource(executorMetricsSource)
119123
env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource)
120124
}
121125

@@ -204,7 +208,8 @@ private[spark] class Executor(
204208
// Poller for the memory metrics. Visible for testing.
205209
private[executor] val metricsPoller = new ExecutorMetricsPoller(
206210
env.memoryManager,
207-
METRICS_POLLING_INTERVAL_MS)
211+
METRICS_POLLING_INTERVAL_MS,
212+
executorMetricsSource)
208213

209214
// Executor for the heartbeat task.
210215
private val heartbeater = new Heartbeater(

core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ import org.apache.spark.util.{ThreadUtils, Utils}
4848
*/
4949
private[spark] class ExecutorMetricsPoller(
5050
memoryManager: MemoryManager,
51-
pollingInterval: Long) extends Logging {
51+
pollingInterval: Long,
52+
executorMetricsSource: ExecutorMetricsSource) extends Logging {
5253

5354
type StageKey = (Int, Int)
5455
// Task Count and Metric Peaks
@@ -80,6 +81,8 @@ private[spark] class ExecutorMetricsPoller(
8081
// get the latest values for the metrics
8182
val latestMetrics = ExecutorMetrics.getCurrentMetrics(memoryManager)
8283

84+
executorMetricsSource.updateMetricsSnapshot(latestMetrics)
85+
8386
def updatePeaks(metrics: AtomicLongArray): Unit = {
8487
(0 until metrics.length).foreach { i =>
8588
metrics.getAndAccumulate(i, latestMetrics(i), math.max)
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.executor
19+
20+
import com.codahale.metrics.{Gauge, MetricRegistry}
21+
22+
import org.apache.spark.metrics.ExecutorMetricType
23+
import org.apache.spark.metrics.source.Source
24+
25+
private[spark]
26+
class ExecutorMetricsSource extends Source {
27+
28+
override val metricRegistry = new MetricRegistry()
29+
override val sourceName = "ExecutorMetrics"
30+
@volatile var metricsSnapshot: Array[Long] = Array.fill(ExecutorMetricType.numMetrics)(0L)
31+
32+
// called by ExecutorMetricsPoller
33+
def updateMetricsSnapshot(metricsUpdates: Array[Long]): Unit = {
34+
metricsSnapshot = metricsUpdates
35+
}
36+
37+
class ExecutorMetricGauge(idx: Int) extends Gauge[Long] {
38+
def getValue: Long = metricsSnapshot(idx)
39+
}
40+
41+
def register: Unit = {
42+
// This looks like a bunch of independent gauges as far the metric system
43+
// is concerned, but actually they're all using one shared snapshot.
44+
val gauges: IndexedSeq[ExecutorMetricGauge] = (0 until ExecutorMetricType.numMetrics).map {
45+
idx => new ExecutorMetricGauge(idx)
46+
}.toIndexedSeq
47+
48+
ExecutorMetricType.metricToOffset.foreach {
49+
case (name, idx) =>
50+
metricRegistry.register(MetricRegistry.name(name), gauges(idx))
51+
}
52+
}
53+
}

docs/monitoring.md

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -991,6 +991,10 @@ This is the component with the largest amount of instrumented metrics
991991
- namespace=JVMCPU
992992
- jvmCpuTime
993993

994+
- namespace=ExecutorMetrics
995+
- This contains memory-related metrics. A full list of available metrics in this namespace can be
996+
found in the corresponding entry for the Executor component instance.
997+
994998
### Component instance = Executor
995999
These metrics are exposed by Spark executors. Note, currently they are not available
9961000
when running in local mode.
@@ -1037,6 +1041,38 @@ when running in local mode.
10371041
- threadpool.maxPool_size
10381042
- threadpool.startedTasks
10391043

1044+
- namespace=ExecutorMetrics
1045+
- **note:** ExecutorMetrics are updated as part of heartbeat processes scheduled
1046+
for the executors and for the driver at regular intervals: `spark.executor.heartbeatInterval`, default 10 seconds
1047+
An optional faster polling mechanism is available for executor memory metrics,
1048+
it can be activated by setting a polling interval (in milliseconds) using the configuration parameter `spark.executor.metrics.pollingInterval`
1049+
- JVMHeapMemory
1050+
- JVMOffHeapMemory
1051+
- OnHeapExecutionMemory
1052+
- OnHeapStorageMemory
1053+
- OnHeapUnifiedMemory
1054+
- OffHeapExecutionMemory
1055+
- OffHeapStorageMemory
1056+
- OffHeapUnifiedMemory
1057+
- DirectPoolMemory
1058+
- MappedPoolMemory
1059+
- MinorGCCount
1060+
- MinorGCTime
1061+
- MajorGCCount
1062+
- MajorGCTime
1063+
- "ProcessTree*" metric counters:
1064+
- ProcessTreeJVMVMemory
1065+
- ProcessTreeJVMRSSMemory
1066+
- ProcessTreePythonVMemory
1067+
- ProcessTreePythonRSSMemory
1068+
- ProcessTreeOtherVMemory
1069+
- ProcessTreeOtherRSSMemory
1070+
- **note:** "ProcessTree*" metrics are collected only under certain conditions.
1071+
The conditions are the logical AND of the following: `/proc` filesystem exists,
1072+
`spark.eventLog.logStageExecutorProcessTreeMetrics.enabled=true`,
1073+
`spark.eventLog.logStageExecutorMetrics.enabled=true`.
1074+
"ProcessTree*" metrics report 0 when those conditions are not met.
1075+
10401076
- namespace=JVMCPU
10411077
- jvmCpuTime
10421078

0 commit comments

Comments
 (0)