Skip to content

Commit 9f1f9b1

Browse files
jerryshaoAndrew Or
authored andcommitted
[SPARK-7007] [CORE] Add a metric source for ExecutorAllocationManager
Add a metric source to expose the internal status of ExecutorAllocationManager to better monitoring the resource usage of executors when dynamic allocation is enable. Please help to review, thanks a lot. Author: jerryshao <[email protected]> Closes apache#5589 from jerryshao/dynamic-allocation-source and squashes the following commits: 104d155 [jerryshao] rebase and address the comments c501a2c [jerryshao] Address the comments d237ba5 [jerryshao] Address the comments 2c3540f [jerryshao] Add a metric source for ExecutorAllocationManager
1 parent 57e9f29 commit 9f1f9b1

File tree

2 files changed

+32
-0
lines changed

2 files changed

+32
-0
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@ import java.util.concurrent.TimeUnit
2121

2222
import scala.collection.mutable
2323

24+
import com.codahale.metrics.{Gauge, MetricRegistry}
25+
2426
import org.apache.spark.scheduler._
27+
import org.apache.spark.metrics.source.Source
2528
import org.apache.spark.util.{ThreadUtils, Clock, SystemClock, Utils}
2629

2730
/**
@@ -144,6 +147,9 @@ private[spark] class ExecutorAllocationManager(
144147
private val executor =
145148
ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")
146149

150+
// Metric source for ExecutorAllocationManager to expose internal status to MetricsSystem.
151+
val executorAllocationManagerSource = new ExecutorAllocationManagerSource
152+
147153
/**
148154
* Verify that the settings specified through the config are valid.
149155
* If not, throw an appropriate exception.
@@ -579,6 +585,29 @@ private[spark] class ExecutorAllocationManager(
579585
}
580586
}
581587

588+
/**
589+
* Metric source for ExecutorAllocationManager to expose its internal executor allocation
590+
* status to MetricsSystem.
591+
* Note: These metrics heavily rely on the internal implementation of
592+
* ExecutorAllocationManager, metrics or value of metrics will be changed when internal
593+
* implementation is changed, so these metrics are not stable across Spark version.
594+
*/
595+
private[spark] class ExecutorAllocationManagerSource extends Source {
596+
val sourceName = "ExecutorAllocationManager"
597+
val metricRegistry = new MetricRegistry()
598+
599+
private def registerGauge[T](name: String, value: => T, defaultValue: T): Unit = {
600+
metricRegistry.register(MetricRegistry.name("executors", name), new Gauge[T] {
601+
override def getValue: T = synchronized { Option(value).getOrElse(defaultValue) }
602+
})
603+
}
604+
605+
registerGauge("numberExecutorsToAdd", numExecutorsToAdd, 0)
606+
registerGauge("numberExecutorsPendingToRemove", executorsPendingToRemove.size, 0)
607+
registerGauge("numberAllExecutors", executorIds.size, 0)
608+
registerGauge("numberTargetExecutors", numExecutorsTarget, 0)
609+
registerGauge("numberMaxNeededExecutors", maxNumExecutorsNeeded(), 0)
610+
}
582611
}
583612

584613
private object ExecutorAllocationManager {

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
537537
_taskScheduler.postStartHook()
538538
_env.metricsSystem.registerSource(new DAGSchedulerSource(dagScheduler))
539539
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
540+
_executorAllocationManager.foreach { e =>
541+
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
542+
}
540543

541544
// Make sure the context is stopped if the user forgets about it. This avoids leaving
542545
// unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM

0 commit comments

Comments
 (0)