Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ import java.util.concurrent.TimeUnit

import scala.collection.mutable

import com.codahale.metrics.{Gauge, MetricRegistry}

import org.apache.spark.scheduler._
import org.apache.spark.metrics.source.Source
import org.apache.spark.util.{ThreadUtils, Clock, SystemClock, Utils}

/**
Expand Down Expand Up @@ -144,6 +147,9 @@ private[spark] class ExecutorAllocationManager(
private val executor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")

// Metric source for ExecutorAllocationManager to expose internal status to MetricsSystem.
val executorAllocationManagerSource = new ExecutorAllocationManagerSource

/**
* Verify that the settings specified through the config are valid.
* If not, throw an appropriate exception.
Expand Down Expand Up @@ -579,6 +585,29 @@ private[spark] class ExecutorAllocationManager(
}
}

/**
* Metric source for ExecutorAllocationManager to expose its internal executor allocation
* status to MetricsSystem.
* Note: These metrics heavily rely on the internal implementation of
* ExecutorAllocationManager, metrics or value of metrics will be changed when internal
* implementation is changed, so these metrics are not stable across Spark version.
*/
private[spark] class ExecutorAllocationManagerSource extends Source {
val sourceName = "ExecutorAllocationManager"
val metricRegistry = new MetricRegistry()

private def registerGauge[T](name: String, value: => T, defaultValue: T): Unit = {
metricRegistry.register(MetricRegistry.name("executors", name), new Gauge[T] {
override def getValue: T = synchronized { Option(value).getOrElse(defaultValue) }
})
}

registerGauge("numberExecutorsToAdd", numExecutorsToAdd, 0)
registerGauge("numberExecutorsPendingToRemove", executorsPendingToRemove.size, 0)
registerGauge("numberAllExecutors", executorIds.size, 0)
registerGauge("numberTargetExecutors", numExecutorsTarget, 0)
registerGauge("numberMaxNeededExecutors", maxNumExecutorsNeeded(), 0)
}
}

private object ExecutorAllocationManager {
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_taskScheduler.postStartHook()
_env.metricsSystem.registerSource(new DAGSchedulerSource(dagScheduler))
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
_executorAllocationManager.foreach { e =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The model for the other metric sources seems to be that the source wraps the object. What's the reason for having the source inside of the ExecutorAllocationManager in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @sryza , thanks a lot for your comments. The reason I put this metric source into ExecutorAllocationManager is that I don't want to change ExecutorAllocationManager a lot to expose the internal status to metric source, so I just wrote this metric source as the inner class. Do you have specific concern about this implementation?

_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
}

// Make sure the context is stopped if the user forgets about it. This avoids leaving
// unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
Expand Down