From c1c57fbb8593346410995578668cc7bff79f77e0 Mon Sep 17 00:00:00 2001 From: Anthony Truchet Date: Thu, 1 Sep 2016 11:37:37 +0200 Subject: [PATCH] [SPARK-5847][CORE][BRANCH-1.6] Allow for configuring MetricsSystem's use of app ID to namespace all metrics This is a backport of #14270. Because the spark.internal.config system does not exists in branch 1.6, a simpler substitution scheme for ${} in the spark.metrics.namespace value, using only Spark configuration had to be added to preserve the behaviour discussed in the tickets and tested. This backport is contributed by Criteo SA under the Apache v2 licence. Adding a new property to SparkConf called spark.metrics.namespace that allows users to set a custom namespace for executor and driver metrics in the metrics systems. By default, the root namespace used for driver or executor metrics is the value of `spark.app.id`. However, often times, users want to be able to track the metrics across apps for driver and executor metrics, which is hard to do with application ID (i.e. `spark.app.id`) since it changes with every invocation of the app. For such use cases, users can set the `spark.metrics.namespace` property to any given value or to another spark configuration key reference like `${spark.app.name}` which is then used to populate the root namespace of the metrics system (with the app name in our example). `spark.metrics.namespace` property can be set to any arbitrary spark property key, whose value would be used to set the root namespace of the metrics system. Non driver and executor metrics are never prefixed with `spark.app.id`, nor does the `spark.metrics.namespace` property have any such affect on such metrics. Added new unit tests, modified existing unit tests. --- .../scala/org/apache/spark/SparkConf.scala | 27 ++++- .../apache/spark/metrics/MetricsConfig.scala | 51 +++++++-- .../apache/spark/metrics/MetricsSystem.scala | 47 ++++---- .../spark/metrics/MetricsConfigSuite.scala | 2 +- .../spark/metrics/MetricsSystemSuite.scala | 104 ++++++++++++++++++ docs/monitoring.md | 13 +++ 6 files changed, 213 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index d3384fb29773..837118beb7c3 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -21,12 +21,12 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.collection.mutable.LinkedHashSet - -import org.apache.avro.{SchemaNormalization, Schema} - +import org.apache.avro.{Schema, SchemaNormalization} import org.apache.spark.serializer.KryoSerializer import org.apache.spark.util.Utils +import scala.util.matching.Regex + /** * Configuration for a Spark application. Used to set various Spark parameters as key-value pairs. * @@ -312,6 +312,11 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { Option(settings.get(key)).orElse(getDeprecatedConfig(key, this)) } + /** Get a parameter as an Option and substitute for parameter references */ + def getOptionSubstituted(key: String): Option[String] = { + getOption(key).map( v => substitute(v, Set[String]()) ) + } + /** Get all parameters as a list of pairs */ def getAll: Array[(String, String)] = { settings.entrySet().asScala.map(x => (x.getKey, x.getValue)).toArray @@ -524,6 +529,22 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { getAll.sorted.map{case (k, v) => k + "=" + v}.mkString("\n") } + private val REF_RE = """\$\{(\S+?)\}""".r + private def substitute(input: String, usedRefs: Set[String]): String = { + if (input != null) { + REF_RE.replaceAllIn(input, { m => + val name = m.group(1) + require(!usedRefs.contains(name), s"Circular reference in $input: $name") + + val replacement = this.getOption(name) + .map { v => substitute(v, usedRefs + name) } + .getOrElse(m.matched) + Regex.quoteReplacement(replacement) + }) + } else { + input + } + } } private[spark] object SparkConf extends Logging { diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala index dd2d325d8703..d7510faae73a 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala @@ -34,7 +34,7 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging { private val DEFAULT_METRICS_CONF_FILENAME = "metrics.properties" private[metrics] val properties = new Properties() - private[metrics] var propertyCategories: mutable.HashMap[String, Properties] = null + private[metrics] var perInstanceSubProperties: mutable.HashMap[String, Properties] = null private def setDefaultProperties(prop: Properties) { prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet") @@ -43,6 +43,10 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging { prop.setProperty("applications.sink.servlet.path", "/metrics/applications/json") } + /** + * Load properties from various places, based on precedence + * If the same property is set again latter on in the method, it overwrites the previous value + */ def initialize() { // Add default properties in case there's no properties file setDefaultProperties(properties) @@ -57,16 +61,47 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging { case _ => } - propertyCategories = subProperties(properties, INSTANCE_REGEX) - if (propertyCategories.contains(DEFAULT_PREFIX)) { - val defaultProperty = propertyCategories(DEFAULT_PREFIX).asScala - for((inst, prop) <- propertyCategories if (inst != DEFAULT_PREFIX); - (k, v) <- defaultProperty if (prop.get(k) == null)) { + // Now, let's populate a list of sub-properties per instance, instance being the prefix that + // appears before the first dot in the property name. + // Add to the sub-properties per instance, the default properties (those with prefix "*"), if + // they don't have that exact same sub-property already defined. + // + // For example, if properties has ("*.class"->"default_class", "*.path"->"default_path, + // "driver.path"->"driver_path"), for driver specific sub-properties, we'd like the output to be + // ("driver"->Map("path"->"driver_path", "class"->"default_class") + // Note how class got added to based on the default property, but path remained the same + // since "driver.path" already existed and took precedence over "*.path" + // + perInstanceSubProperties = subProperties(properties, INSTANCE_REGEX) + if (perInstanceSubProperties.contains(DEFAULT_PREFIX)) { + val defaultSubProperties = perInstanceSubProperties(DEFAULT_PREFIX).asScala + for ((instance, prop) <- perInstanceSubProperties if (instance != DEFAULT_PREFIX); + (k, v) <- defaultSubProperties if (prop.get(k) == null)) { prop.put(k, v) } } } + /** + * Take a simple set of properties and a regex that the instance names (part before the first dot) + * have to conform to. And, return a map of the first order prefix (before the first dot) to the + * sub-properties under that prefix. + * + * For example, if the properties sent were Properties("*.sink.servlet.class"->"class1", + * "*.sink.servlet.path"->"path1"), the returned map would be + * Map("*" -> Properties("sink.servlet.class" -> "class1", "sink.servlet.path" -> "path1")) + * Note in the subProperties (value of the returned Map), only the suffixes are used as property + * keys. + * If, in the passed properties, there is only one property with a given prefix, it is still + * "unflattened". For example, if the input was Properties("*.sink.servlet.class" -> "class1" + * the returned Map would contain one key-value pair + * Map("*" -> Properties("sink.servlet.class" -> "class1")) + * Any passed in properties, not complying with the regex are ignored. + * + * @param prop the flat list of properties to "unflatten" based on prefixes + * @param regex the regex that the prefix has to comply with + * @return an unflatted map, mapping prefix with sub-properties under that prefix + */ def subProperties(prop: Properties, regex: Regex): mutable.HashMap[String, Properties] = { val subProperties = new mutable.HashMap[String, Properties] prop.asScala.foreach { kv => @@ -79,9 +114,9 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging { } def getInstance(inst: String): Properties = { - propertyCategories.get(inst) match { + perInstanceSubProperties.get(inst) match { case Some(s) => s - case None => propertyCategories.getOrElse(DEFAULT_PREFIX, new Properties) + case None => perInstanceSubProperties.getOrElse(DEFAULT_PREFIX, new Properties) } } diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index fdf76d312db3..d64aff857a07 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -20,8 +20,6 @@ package org.apache.spark.metrics import java.util.Properties import java.util.concurrent.TimeUnit -import org.apache.spark.util.Utils - import scala.collection.mutable import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry} @@ -30,26 +28,27 @@ import org.eclipse.jetty.servlet.ServletContextHandler import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.metrics.sink.{MetricsServlet, Sink} import org.apache.spark.metrics.source.Source +import org.apache.spark.util.Utils /** - * Spark Metrics System, created by specific "instance", combined by source, - * sink, periodically poll source metrics data to sink destinations. + * Spark Metrics System, created by a specific "instance", combined by source, + * sink, periodically polls source metrics data to sink destinations. * - * "instance" specify "who" (the role) use metrics system. In spark there are several roles - * like master, worker, executor, client driver, these roles will create metrics system - * for monitoring. So instance represents these roles. Currently in Spark, several instances + * "instance" specifies "who" (the role) uses the metrics system. In Spark, there are several roles + * like master, worker, executor, client driver. These roles will create metrics system + * for monitoring. So, "instance" represents these roles. Currently in Spark, several instances * have already implemented: master, worker, executor, driver, applications. * - * "source" specify "where" (source) to collect metrics data. In metrics system, there exists + * "source" specifies "where" (source) to collect metrics data from. In metrics system, there exists * two kinds of source: * 1. Spark internal source, like MasterSource, WorkerSource, etc, which will collect * Spark component's internal state, these sources are related to instance and will be - * added after specific metrics system is created. + * added after a specific metrics system is created. * 2. Common source, like JvmSource, which will collect low level state, is configured by * configuration and loaded through reflection. * - * "sink" specify "where" (destination) to output metrics data to. Several sinks can be - * coexisted and flush metrics to all these sinks. + * "sink" specifies "where" (destination) to output metrics data to. Several sinks can + * coexist and metrics can be flushed to all these sinks. * * Metrics configuration format is like below: * [instance].[sink|source].[name].[options] = xxxx @@ -62,9 +61,9 @@ import org.apache.spark.metrics.source.Source * [sink|source] means this property belongs to source or sink. This field can only be * source or sink. * - * [name] specify the name of sink or source, it is custom defined. + * [name] specify the name of sink or source, if it is custom defined. * - * [options] is the specific property of this source or sink. + * [options] represent the specific property of this source or sink. */ private[spark] class MetricsSystem private ( val instance: String, @@ -124,19 +123,27 @@ private[spark] class MetricsSystem private ( * application, executor/driver and metric source. */ private[spark] def buildRegistryName(source: Source): String = { - val appId = conf.getOption("spark.app.id") + val metricsNamespace = conf + .getOptionSubstituted(MetricsSystem.METRICS_NAMESPACE_CONFIG_NAME) + .orElse(conf.getOption("spark.app.id")) + val executorId = conf.getOption("spark.executor.id") val defaultName = MetricRegistry.name(source.sourceName) if (instance == "driver" || instance == "executor") { - if (appId.isDefined && executorId.isDefined) { - MetricRegistry.name(appId.get, executorId.get, source.sourceName) + if (metricsNamespace.isDefined && executorId.isDefined) { + MetricRegistry.name(metricsNamespace.get, executorId.get, source.sourceName) } else { // Only Driver and Executor set spark.app.id and spark.executor.id. // Other instance types, e.g. Master and Worker, are not related to a specific application. - val warningMsg = s"Using default name $defaultName for source because %s is not set." - if (appId.isEmpty) { logWarning(warningMsg.format("spark.app.id")) } - if (executorId.isEmpty) { logWarning(warningMsg.format("spark.executor.id")) } + if (metricsNamespace.isEmpty) { + logWarning(s"Using default name $defaultName for source because neither " + + s"${MetricsSystem.METRICS_NAMESPACE_CONFIG_NAME} nor spark.app.id is set.") + } + if (executorId.isEmpty) { + logWarning(s"Using default name $defaultName for source because spark.executor.id is " + + s"not set.") + } defaultName } } else { defaultName } @@ -210,6 +217,8 @@ private[spark] object MetricsSystem { val SINK_REGEX = "^sink\\.(.+)\\.(.+)".r val SOURCE_REGEX = "^source\\.(.+)\\.(.+)".r + val METRICS_NAMESPACE_CONFIG_NAME = "spark.metrics.namespace" + private[this] val MINIMAL_POLL_UNIT = TimeUnit.SECONDS private[this] val MINIMAL_POLL_PERIOD = 1 diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala index 41f2ff725a17..d5ea6fabc850 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala @@ -140,7 +140,7 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter { val conf = new MetricsConfig(sparkConf) conf.initialize() - val propCategories = conf.propertyCategories + val propCategories = conf.perInstanceSubProperties assert(propCategories.size === 3) val masterProp = conf.getInstance("master") diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala index 9c389c76bf3b..b8a995369dd4 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala @@ -184,4 +184,108 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM assert(metricName != s"$appId.$executorId.${source.sourceName}") assert(metricName === source.sourceName) } + + test("MetricsSystem with Executor instance, with litteral custom namespace") { + val source = new Source { + override val sourceName = "dummySource" + override val metricRegistry = new MetricRegistry() + } + + val appId = "testId" + val metricsNamespace = "metricsNamespace" + val executorId = "1" + conf.set("spark.app.id", appId) + conf.set("spark.executor.id", executorId) + conf.set(MetricsSystem.METRICS_NAMESPACE_CONFIG_NAME, metricsNamespace) + + val instanceName = "executor" + val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr) + + val metricName = driverMetricsSystem.buildRegistryName(source) + assert(metricName === s"$metricsNamespace.$executorId.${source.sourceName}") + } + + test("MetricsSystem with Executor instance, with reference to config key as custom namespace ") { + val source = new Source { + override val sourceName = "dummySource" + override val metricRegistry = new MetricRegistry() + } + + val appId = "testId" + val appName = "testName" + val executorId = "1" + conf.set("spark.app.id", appId) + conf.set("spark.app.name", appName) + conf.set("spark.executor.id", executorId) + conf.set(MetricsSystem.METRICS_NAMESPACE_CONFIG_NAME, "${spark.app.name}") + + val instanceName = "executor" + val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr) + + val metricName = driverMetricsSystem.buildRegistryName(source) + assert(metricName === s"$appName.$executorId.${source.sourceName}") + } + + test("MetricsSystem with Executor instance, custom namespace which is not set") { + val source = new Source { + override val sourceName = "dummySource" + override val metricRegistry = new MetricRegistry() + } + + val executorId = "1" + val namespaceToResolve = "${spark.doesnotexist}" + conf.set("spark.executor.id", executorId) + conf.set(MetricsSystem.METRICS_NAMESPACE_CONFIG_NAME, namespaceToResolve) + + val instanceName = "executor" + val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr) + + val metricName = driverMetricsSystem.buildRegistryName(source) + // If the user set the spark.metrics.namespace property to an expansion of another property + // (say ${spark.doesnotexist}, the unresolved name (i.e. literally ${spark.doesnotexist}) + // is used as the root logger name. + assert(metricName === s"$namespaceToResolve.$executorId.${source.sourceName}") + } + + test("MetricsSystem with Executor instance, custom namespace, spark.executor.id not set") { + val source = new Source { + override val sourceName = "dummySource" + override val metricRegistry = new MetricRegistry() + } + + val appId = "testId" + conf.set("spark.app.name", appId) + conf.set(MetricsSystem.METRICS_NAMESPACE_CONFIG_NAME, "${spark.app.name}") + + val instanceName = "executor" + val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr) + + val metricName = driverMetricsSystem.buildRegistryName(source) + assert(metricName === source.sourceName) + } + + test("MetricsSystem with non-driver, non-executor instance with custom namespace") { + val source = new Source { + override val sourceName = "dummySource" + override val metricRegistry = new MetricRegistry() + } + + val appId = "testId" + val appName = "testName" + val executorId = "dummyExecutorId" + conf.set("spark.app.id", appId) + conf.set("spark.app.name", appName) + conf.set(MetricsSystem.METRICS_NAMESPACE_CONFIG_NAME, "${spark.app.name}") + conf.set("spark.executor.id", executorId) + + val instanceName = "testInstance" + val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr) + + val metricName = driverMetricsSystem.buildRegistryName(source) + + // Even if spark.app.id and spark.executor.id are set, they are not used for the metric name. + assert(metricName != s"$appId.$executorId.${source.sourceName}") + assert(metricName === source.sourceName) + } + } diff --git a/docs/monitoring.md b/docs/monitoring.md index cedceb295802..141810cd40d7 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -264,6 +264,19 @@ This allows users to report Spark metrics to a variety of sinks including HTTP, files. The metrics system is configured via a configuration file that Spark expects to be present at `$SPARK_HOME/conf/metrics.properties`. A custom file location can be specified via the `spark.metrics.conf` [configuration property](configuration.html#spark-properties). +By default, the root namespace used for driver or executor metrics is +the value of `spark.app.id`. However, often times, users want to be able to track the metrics +across apps for driver and executors, which is hard to do with application ID +(i.e. `spark.app.id`) since it changes with every invocation of the app. For such use cases, +a custom namespace can be specified for metrics reporting using `spark.metrics.namespace` +configuration property. +If, say, users wanted to set the metrics namespace to the name of the application, they +can set the `spark.metrics.namespace` property to a value like `${spark.app.name}`. This value is +then expanded the Spark configuration properties only (in contrast with the substitution system in +version 2.x) and is used as the root namespace of the metrics system. +Non driver and executor metrics are never prefixed with `spark.app.id`, nor does the +`spark.metrics.namespace` property have any such affect on such metrics. + Spark's metrics are decoupled into different _instances_ corresponding to Spark components. Within each instance, you can configure a set of sinks to which metrics are reported. The following instances are currently supported: