Skip to content

Commit bb5f2b8

Browse files
committed
addressed review comment
1 parent 4ab39cd commit bb5f2b8

File tree

4 files changed

+17
-31
lines changed

4 files changed

+17
-31
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -629,11 +629,7 @@ class SparkContext(config: SparkConf) extends Logging {
629629
_env.metricsSystem.registerSource(_dagScheduler.metricsSource)
630630
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
631631
_env.metricsSystem.registerSource(new JVMCPUSource())
632-
_executorMetricsSource match {
633-
case Some(executorMetricsSource: ExecutorMetricsSource) =>
634-
executorMetricsSource.register(_env.metricsSystem)
635-
case None => None
636-
}
632+
_executorMetricsSource.foreach(_.register(_env.metricsSystem))
637633
_executorAllocationManager.foreach { e =>
638634
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
639635
}
@@ -2487,12 +2483,7 @@ class SparkContext(config: SparkConf) extends Logging {
24872483
/** Reports heartbeat metrics for the driver. */
24882484
private def reportHeartBeat(executorMetricsSource: Option[ExecutorMetricsSource]): Unit = {
24892485
val currentMetrics = ExecutorMetrics.getCurrentMetrics(env.memoryManager)
2490-
2491-
executorMetricsSource match {
2492-
case Some(executorMetricsSource: ExecutorMetricsSource) =>
2493-
executorMetricsSource.updateMetricsSnapshot(currentMetrics)
2494-
case None => None
2495-
}
2486+
executorMetricsSource.foreach(_.updateMetricsSnapshot(currentMetrics))
24962487

24972488
val driverUpdates = new HashMap[(Int, Int), ExecutorMetrics]
24982489
// In the driver, we do not track per-stage metrics, so use a dummy stage for the key

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -124,11 +124,7 @@ private[spark] class Executor(
124124
env.blockManager.initialize(conf.getAppId)
125125
env.metricsSystem.registerSource(executorSource)
126126
env.metricsSystem.registerSource(new JVMCPUSource())
127-
executorMetricsSource match {
128-
case Some(executorMetricsSource: ExecutorMetricsSource) =>
129-
executorMetricsSource.register(env.metricsSystem)
130-
case None => None
131-
}
127+
executorMetricsSource.foreach(_.register(env.metricsSystem))
132128
env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource)
133129
}
134130

core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,7 @@ private[spark] class ExecutorMetricsPoller(
8080

8181
// get the latest values for the metrics
8282
val latestMetrics = ExecutorMetrics.getCurrentMetrics(memoryManager)
83-
84-
executorMetricsSource match {
85-
case Some(executorMetricsSource: ExecutorMetricsSource) =>
86-
executorMetricsSource.updateMetricsSnapshot(latestMetrics)
87-
case None => None
88-
}
83+
executorMetricsSource.foreach(_.updateMetricsSnapshot(latestMetrics))
8984

9085
def updatePeaks(metrics: AtomicLongArray): Unit = {
9186
(0 until metrics.length).foreach { i =>

core/src/main/scala/org/apache/spark/executor/ExecutorMetricsSource.scala

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,19 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
2222
import org.apache.spark.metrics.{ExecutorMetricType, MetricsSystem}
2323
import org.apache.spark.metrics.source.Source
2424

25-
// Expose executor metrics from [[ExecutorMetricsType]] using the Dropwizard metrics system
26-
// Metrics related the memory system can be expensive to gather, we implement some optimizations:
27-
// (1) Metrics values are cached, updated at each heartbeat (default period is 10 seconds)
28-
// An alternative faster polling mechanism is used only if activated, by setting
29-
// spark.executor.metrics.pollingInterval=<interval in ms>
30-
// (2) procfs metrics are gathered all in one-go and only conditionally:
31-
// if the /proc filesystem exists
32-
// and spark.eventLog.logStageExecutorProcessTreeMetrics.enabled=true
33-
// and spark.eventLog.logStageExecutorMetrics.enabled=true
25+
/**
26+
* Expose executor metrics from [[ExecutorMetricsType]] using the Dropwizard metrics system.
27+
*
28+
* Metrics related to the memory system can be expensive to gather, therefore
29+
* we implement some optimizations:
30+
* (1) Metrics values are cached, updated at each heartbeat (default period is 10 seconds).
31+
* An alternative faster polling mechanism is used, only if activated, by setting
32+
* spark.executor.metrics.pollingInterval=<interval in ms>.
33+
* (2) Procfs metrics are gathered all in one-go and only conditionally:
34+
* if the /proc filesystem exists
35+
* and spark.eventLog.logStageExecutorProcessTreeMetrics.enabled=true
36+
* and spark.eventLog.logStageExecutorMetrics.enabled=true.
37+
*/
3438
private[spark] class ExecutorMetricsSource extends Source {
3539

3640
override val metricRegistry = new MetricRegistry()

0 commit comments

Comments
 (0)