@@ -147,25 +147,8 @@ private[spark] class ExecutorAllocationManager(
147147 private val executor =
148148 ThreadUtils .newDaemonSingleThreadScheduledExecutor(" spark-dynamic-executor-allocation" )
149149
150- // Metric source for ExecutorAllocationManager to expose the its internal executor allocation
151- // status to MetricsSystem.
152- private [spark] val executorAllocationManagerSource = new Source {
153- val sourceName = " ExecutorAllocationManager"
154- val metricRegistry = new MetricRegistry ()
155-
156- private def registerGauge [T ](name : String , value : => T , defaultValue : T ): Unit = {
157- metricRegistry.register(MetricRegistry .name(" executors" , name), new Gauge [T ] {
158- override def getValue : T = synchronized { Option (value).getOrElse(defaultValue) }
159- })
160- }
161-
162- registerGauge(" numberExecutorsToAdd" , numExecutorsToAdd, 0 )
163- registerGauge(" numberExecutorsPending" , numExecutorsPending, 0 )
164- registerGauge(" numberExecutorsPendingToRemove" , executorsPendingToRemove.size, 0 )
165- registerGauge(" numberAllExecutors" , executorIds.size, 0 )
166- registerGauge(" numberTargetExecutors" , targetNumExecutors(), 0 )
167- registerGauge(" numberMaxNeededExecutors" , maxNumExecutorsNeeded(), 0 )
168- }
150+ // Metric source for ExecutorAllocationManager to expose internal status to MetricsSystem.
151+ private [spark] val executorAllocationManagerSource = new ExecutorAllocationManagerSource
169152
170153 /**
171154 * Verify that the settings specified through the config are valid.
@@ -602,6 +585,26 @@ private[spark] class ExecutorAllocationManager(
602585 }
603586 }
604587
588+ // Metric source for ExecutorAllocationManager to expose the its internal executor allocation
589+ // status to MetricsSystem.
590+ // Note: these metrics may not be stable across Spark version.
591+ private [spark] class ExecutorAllocationManagerSource extends Source {
592+ val sourceName = " ExecutorAllocationManager"
593+ val metricRegistry = new MetricRegistry ()
594+
595+ private def registerGauge [T ](name : String , value : => T , defaultValue : T ): Unit = {
596+ metricRegistry.register(MetricRegistry .name(" executors" , name), new Gauge [T ] {
597+ override def getValue : T = synchronized { Option (value).getOrElse(defaultValue) }
598+ })
599+ }
600+
601+ registerGauge(" numberExecutorsToAdd" , numExecutorsToAdd, 0 )
602+ registerGauge(" numberExecutorsPending" , numExecutorsPending, 0 )
603+ registerGauge(" numberExecutorsPendingToRemove" , executorsPendingToRemove.size, 0 )
604+ registerGauge(" numberAllExecutors" , executorIds.size, 0 )
605+ registerGauge(" numberTargetExecutors" , targetNumExecutors(), 0 )
606+ registerGauge(" numberMaxNeededExecutors" , maxNumExecutorsNeeded(), 0 )
607+ }
605608}
606609
607610private object ExecutorAllocationManager {
0 commit comments