Skip to content

Commit 9fed6ab

Browse files
committed
[SPARK-8344] Add message processing time metric to DAGScheduler
This commit adds a new metric, `messageProcessingTime`, to the DAGScheduler metrics source. This metrics tracks the time taken to process messages in the scheduler's event processing loop, which is a helpful debugging aid for diagnosing performance issues in the scheduler (such as SPARK-4961). In order to do this, I moved the creation of the DAGSchedulerSource metrics source into DAGScheduler itself, similar to how MasterSource is created and registered in Master. Author: Josh Rosen <[email protected]> Closes #7002 from JoshRosen/SPARK-8344 and squashes the following commits: 57f914b [Josh Rosen] Fix import ordering 7d6bb83 [Josh Rosen] Add message processing time metrics to DAGScheduler
1 parent 1a79f0e commit 9fed6ab

File tree

3 files changed

+22
-5
lines changed

3 files changed

+22
-5
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -545,7 +545,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
545545

546546
// Post init
547547
_taskScheduler.postStartHook()
548-
_env.metricsSystem.registerSource(new DAGSchedulerSource(dagScheduler))
549548
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
550549
_executorAllocationManager.foreach { e =>
551550
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ class DAGScheduler(
8181

8282
def this(sc: SparkContext) = this(sc, sc.taskScheduler)
8383

84+
private[scheduler] val metricsSource: DAGSchedulerSource = new DAGSchedulerSource(this)
85+
8486
private[scheduler] val nextJobId = new AtomicInteger(0)
8587
private[scheduler] def numTotalJobs: Int = nextJobId.get()
8688
private val nextStageId = new AtomicInteger(0)
@@ -1438,17 +1440,29 @@ class DAGScheduler(
14381440
taskScheduler.stop()
14391441
}
14401442

1441-
// Start the event thread at the end of the constructor
1443+
// Start the event thread and register the metrics source at the end of the constructor
1444+
env.metricsSystem.registerSource(metricsSource)
14421445
eventProcessLoop.start()
14431446
}
14441447

14451448
private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
14461449
extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging {
14471450

1451+
private[this] val timer = dagScheduler.metricsSource.messageProcessingTimer
1452+
14481453
/**
14491454
* The main event loop of the DAG scheduler.
14501455
*/
1451-
override def onReceive(event: DAGSchedulerEvent): Unit = event match {
1456+
override def onReceive(event: DAGSchedulerEvent): Unit = {
1457+
val timerContext = timer.time()
1458+
try {
1459+
doOnReceive(event)
1460+
} finally {
1461+
timerContext.stop()
1462+
}
1463+
}
1464+
1465+
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
14521466
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
14531467
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
14541468
listener, properties)

core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717

1818
package org.apache.spark.scheduler
1919

20-
import com.codahale.metrics.{Gauge, MetricRegistry}
20+
import com.codahale.metrics.{Gauge, MetricRegistry, Timer}
2121

2222
import org.apache.spark.metrics.source.Source
2323

24-
private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler)
24+
private[scheduler] class DAGSchedulerSource(val dagScheduler: DAGScheduler)
2525
extends Source {
2626
override val metricRegistry = new MetricRegistry()
2727
override val sourceName = "DAGScheduler"
@@ -45,4 +45,8 @@ private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler)
4545
metricRegistry.register(MetricRegistry.name("job", "activeJobs"), new Gauge[Int] {
4646
override def getValue: Int = dagScheduler.activeJobs.size
4747
})
48+
49+
/** Timer that tracks the time to process messages in the DAGScheduler's event loop */
50+
val messageProcessingTimer: Timer =
51+
metricRegistry.timer(MetricRegistry.name("messageProcessingTime"))
4852
}

0 commit comments

Comments
 (0)