From deea685c4d4d95e6889f5bafa86938ea4293af33 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Thu, 22 Jun 2017 16:53:30 +0100 Subject: [PATCH 1/3] Merge existing registry with default one or configure default metric registry --- .../apache/spark/metrics/MetricsSystem.scala | 72 ++++++++++++++++--- .../spark/metrics/MetricsSystemSuite.scala | 22 +++++- 2 files changed, 83 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 57dcbe501c6dd..612213025cf13 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit import scala.collection.mutable -import com.codahale.metrics.{Metric, MetricRegistry} +import com.codahale.metrics._ import org.eclipse.jetty.servlet.ServletContextHandler import org.apache.spark.{SecurityManager, SparkConf} @@ -70,14 +70,15 @@ import org.apache.spark.util.Utils private[spark] class MetricsSystem private ( val instance: String, conf: SparkConf, - securityMgr: SecurityManager) + securityMgr: SecurityManager, + registry: MetricRegistry) extends Logging { private[this] val metricsConfig = new MetricsConfig(conf) private val sinks = new mutable.ArrayBuffer[Sink] - private val sources = new mutable.ArrayBuffer[Source] - private val registry = new MetricRegistry() + private val sourceToListeners = new mutable.HashMap[Source, MetricRegistryListener] + private val defaultListener = new MetricsSystemListener("") private var running: Boolean = false @@ -103,6 +104,7 @@ private[spark] class MetricsSystem private ( StaticSources.allSources.foreach(registerSource) registerSources() } + SharedMetricRegistries.getDefault.addListener(defaultListener) registerSinks() sinks.foreach(_.start) } @@ -111,6 +113,8 @@ private[spark] class MetricsSystem private ( if (running) { sinks.foreach(_.stop) registry.removeMatching((_: String, _: Metric) => true) + sourceToListeners.keySet.foreach(removeSource) + SharedMetricRegistries.getDefault.removeListener(defaultListener) } else { logWarning("Stopping a MetricsSystem that is not running") } @@ -156,22 +160,23 @@ private[spark] class MetricsSystem private ( } def getSourcesByName(sourceName: String): Seq[Source] = - sources.filter(_.sourceName == sourceName) + sourceToListeners.keySet.filter(_.sourceName == sourceName).toSeq def registerSource(source: Source): Unit = { - sources += source try { - val regName = buildRegistryName(source) - registry.register(regName, source.metricRegistry) + val listener = new MetricsSystemListener(buildRegistryName(source)) + source.metricRegistry.addListener(listener) + sourceToListeners += source -> listener } catch { case e: IllegalArgumentException => logInfo("Metrics already registered", e) } } def removeSource(source: Source): Unit = { - sources -= source val regName = buildRegistryName(source) registry.removeMatching((name: String, _: Metric) => name.startsWith(regName)) + sourceToListeners.get(source).foreach(source.metricRegistry.removeListener) + sourceToListeners.remove(source) } private def registerSources(): Unit = { @@ -225,6 +230,41 @@ private[spark] class MetricsSystem private ( } } } + + private[spark] class MetricsSystemListener(prefix: String) + extends MetricRegistryListener { + def metricName(name: String): String = MetricRegistry.name(prefix, name) + + override def onHistogramAdded(name: String, histogram: Histogram): Unit = + registry.register(metricName(name), histogram) + + override def onCounterAdded(name: String, counter: Counter): Unit = + registry.register(metricName(name), counter) + + override def onHistogramRemoved(name: String): Unit = + registry.remove(metricName(name)) + + override def onGaugeRemoved(name: String): Unit = + registry.remove(metricName(name)) + + override def onMeterRemoved(name: String): Unit = + registry.remove(metricName(name)) + + override def onTimerAdded(name: String, timer: Timer): Unit = + registry.register(metricName(name), timer) + + override def onCounterRemoved(name: String): Unit = + registry.remove(metricName(name)) + + override def onGaugeAdded(name: String, gauge: Gauge[_]): Unit = + registry.register(metricName(name), gauge) + + override def onTimerRemoved(name: String): Unit = + registry.remove(metricName(name)) + + override def onMeterAdded(name: String, meter: Meter): Unit = + registry.register(metricName(name), meter) + } } private[spark] object MetricsSystem { @@ -234,6 +274,10 @@ private[spark] object MetricsSystem { private[this] val MINIMAL_POLL_UNIT = TimeUnit.SECONDS private[this] val MINIMAL_POLL_PERIOD = 1 + scala.util.control.Exception.ignoring(classOf[IllegalStateException]) { + SharedMetricRegistries.setDefault("spark", new MetricRegistry()) + } + def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int): Unit = { val period = MINIMAL_POLL_UNIT.convert(pollPeriod, pollUnit) if (period < MINIMAL_POLL_PERIOD) { @@ -244,7 +288,15 @@ private[spark] object MetricsSystem { def createMetricsSystem( instance: String, conf: SparkConf, securityMgr: SecurityManager): MetricsSystem = { - new MetricsSystem(instance, conf, securityMgr) + new MetricsSystem(instance, conf, securityMgr, new MetricRegistry()) + } + + def createMetricsSystem( + instance: String, + conf: SparkConf, + securityMgr: SecurityManager, + registry: MetricRegistry): MetricsSystem = { + new MetricsSystem(instance, conf, securityMgr, registry) } } diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala index 70b6c9a112142..003adbabc3042 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.metrics import scala.collection.mutable.ArrayBuffer -import com.codahale.metrics.MetricRegistry +import com.codahale.metrics.{Gauge, MetricRegistry} import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} @@ -269,4 +269,24 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM assert(metricName === source.sourceName) } + test("MetricsSystem registers dynamically added metrics") { + val registry = new MetricRegistry() + val source = new Source { + override val sourceName = "dummySource" + override val metricRegistry = new MetricRegistry() + } + + val instanceName = "testInstance" + val metricsSystem = MetricsSystem.createMetricsSystem( + instanceName, conf, securityMgr, registry) + metricsSystem.registerSource(source) + assert(!registry.getNames.contains("dummySource.newMetric"), "Metric shouldn't be registered") + + source.metricRegistry.register("newMetric", new Gauge[Integer] { + override def getValue: Integer = 1 + }) + assert(registry.getNames.contains("dummySource.newMetric"), + "Metric should have been registered") + } + } From ecbd70b39c08eae166742987d187424e6787637a Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Tue, 27 Jun 2017 22:50:37 +0100 Subject: [PATCH 2/3] fix invoke private usage --- .../org/apache/spark/metrics/MetricsSystem.scala | 5 +++++ .../spark/metrics/MetricsSystemSuite.scala | 16 +++++----------- .../spark/streaming/StreamingContextSuite.scala | 8 ++------ 3 files changed, 12 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 612213025cf13..7e84eb0461be1 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -159,6 +159,11 @@ private[spark] class MetricsSystem private ( } else { defaultName } } + def getSources: Seq[Source] = + sourceToListeners.keySet.to[collection.immutable.Seq] + + def getSinks: Seq[Sink] = sinks.to[collection.immutable.Seq] + def getSourcesByName(sourceName: String): Seq[Source] = sourceToListeners.keySet.filter(_.sourceName == sourceName).toSeq diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala index 003adbabc3042..d7d01bdd8c738 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.metrics -import scala.collection.mutable.ArrayBuffer - import com.codahale.metrics.{Gauge, MetricRegistry} import org.scalatest.{BeforeAndAfter, PrivateMethodTester} @@ -42,27 +40,23 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM test("MetricsSystem with default config") { val metricsSystem = MetricsSystem.createMetricsSystem("default", conf, securityMgr) metricsSystem.start() - val sources = PrivateMethod[ArrayBuffer[Source]](Symbol("sources")) - val sinks = PrivateMethod[ArrayBuffer[Sink]](Symbol("sinks")) - assert(metricsSystem.invokePrivate(sources()).length === StaticSources.allSources.length) - assert(metricsSystem.invokePrivate(sinks()).length === 0) + assert(metricsSystem.getSources.length === StaticSources.allSources.length) + assert(metricsSystem.getSinks.length === 0) assert(metricsSystem.getServletHandlers.nonEmpty) } test("MetricsSystem with sources add") { val metricsSystem = MetricsSystem.createMetricsSystem("test", conf, securityMgr) metricsSystem.start() - val sources = PrivateMethod[ArrayBuffer[Source]](Symbol("sources")) - val sinks = PrivateMethod[ArrayBuffer[Sink]](Symbol("sinks")) - assert(metricsSystem.invokePrivate(sources()).length === StaticSources.allSources.length) - assert(metricsSystem.invokePrivate(sinks()).length === 1) + assert(metricsSystem.getSources.length === StaticSources.allSources.length) + assert(metricsSystem.getSinks.length === 1) assert(metricsSystem.getServletHandlers.nonEmpty) val source = new MasterSource(null) metricsSystem.registerSource(source) - assert(metricsSystem.invokePrivate(sources()).length === StaticSources.allSources.length + 1) + assert(metricsSystem.getSources.length === StaticSources.allSources.length + 1) } test("MetricsSystem with Driver instance") { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 48d278dd1c7b8..c3cc8816d858a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -374,13 +374,13 @@ class StreamingContextSuite addInputStream(ssc).register() ssc.start() - val sources = StreamingContextSuite.getSources(ssc.env.metricsSystem) + val sources = ssc.env.metricsSystem.getSources val streamingSource = StreamingContextSuite.getStreamingSource(ssc) assert(sources.contains(streamingSource)) assert(ssc.getState() === StreamingContextState.ACTIVE) ssc.stop() - val sourcesAfterStop = StreamingContextSuite.getSources(ssc.env.metricsSystem) + val sourcesAfterStop = ssc.env.metricsSystem.getSources val streamingSourceAfterStop = StreamingContextSuite.getStreamingSource(ssc) assert(ssc.getState() === StreamingContextState.STOPPED) assert(!sourcesAfterStop.contains(streamingSourceAfterStop)) @@ -1026,10 +1026,6 @@ package object testPackage extends Assertions { * This includes methods to access private methods and fields in StreamingContext and MetricsSystem */ private object StreamingContextSuite extends PrivateMethodTester { - private val _sources = PrivateMethod[ArrayBuffer[Source]](Symbol("sources")) - private def getSources(metricsSystem: MetricsSystem): ArrayBuffer[Source] = { - metricsSystem.invokePrivate(_sources()) - } private val _streamingSource = PrivateMethod[StreamingSource](Symbol("streamingSource")) private def getStreamingSource(streamingContext: StreamingContext): StreamingSource = { streamingContext.invokePrivate(_streamingSource()) From 8fcaafbb04fb4f0f134969645dfc9c55996d12ed Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Fri, 6 Dec 2019 15:42:07 +0200 Subject: [PATCH 3/3] don't set default --- .../scala/org/apache/spark/metrics/MetricsSystem.scala | 7 ------- 1 file changed, 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 7e84eb0461be1..0b3a981158a37 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -78,7 +78,6 @@ private[spark] class MetricsSystem private ( private val sinks = new mutable.ArrayBuffer[Sink] private val sourceToListeners = new mutable.HashMap[Source, MetricRegistryListener] - private val defaultListener = new MetricsSystemListener("") private var running: Boolean = false @@ -104,7 +103,6 @@ private[spark] class MetricsSystem private ( StaticSources.allSources.foreach(registerSource) registerSources() } - SharedMetricRegistries.getDefault.addListener(defaultListener) registerSinks() sinks.foreach(_.start) } @@ -114,7 +112,6 @@ private[spark] class MetricsSystem private ( sinks.foreach(_.stop) registry.removeMatching((_: String, _: Metric) => true) sourceToListeners.keySet.foreach(removeSource) - SharedMetricRegistries.getDefault.removeListener(defaultListener) } else { logWarning("Stopping a MetricsSystem that is not running") } @@ -279,10 +276,6 @@ private[spark] object MetricsSystem { private[this] val MINIMAL_POLL_UNIT = TimeUnit.SECONDS private[this] val MINIMAL_POLL_PERIOD = 1 - scala.util.control.Exception.ignoring(classOf[IllegalStateException]) { - SharedMetricRegistries.setDefault("spark", new MetricRegistry()) - } - def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int): Unit = { val period = MINIMAL_POLL_UNIT.convert(pollPeriod, pollUnit) if (period < MINIMAL_POLL_PERIOD) {