diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index d3384fb29773..837118beb7c3 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -21,12 +21,12 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.collection.mutable.LinkedHashSet - -import org.apache.avro.{SchemaNormalization, Schema} - +import org.apache.avro.{Schema, SchemaNormalization} import org.apache.spark.serializer.KryoSerializer import org.apache.spark.util.Utils +import scala.util.matching.Regex + /** * Configuration for a Spark application. Used to set various Spark parameters as key-value pairs. * @@ -312,6 +312,11 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { Option(settings.get(key)).orElse(getDeprecatedConfig(key, this)) } + /** Get a parameter as an Option and substitute for parameter references */ + def getOptionSubstituted(key: String): Option[String] = { + getOption(key).map( v => substitute(v, Set[String]()) ) + } + /** Get all parameters as a list of pairs */ def getAll: Array[(String, String)] = { settings.entrySet().asScala.map(x => (x.getKey, x.getValue)).toArray @@ -524,6 +529,22 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { getAll.sorted.map{case (k, v) => k + "=" + v}.mkString("\n") } + private val REF_RE = """\$\{(\S+?)\}""".r + private def substitute(input: String, usedRefs: Set[String]): String = { + if (input != null) { + REF_RE.replaceAllIn(input, { m => + val name = m.group(1) + require(!usedRefs.contains(name), s"Circular reference in $input: $name") + + val replacement = this.getOption(name) + .map { v => substitute(v, usedRefs + name) } + .getOrElse(m.matched) + Regex.quoteReplacement(replacement) + }) + } else { + input + } + } } private[spark] object SparkConf extends Logging { diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala index dd2d325d8703..d7510faae73a 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala @@ -34,7 +34,7 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging { private val DEFAULT_METRICS_CONF_FILENAME = "metrics.properties" private[metrics] val properties = new Properties() - private[metrics] var propertyCategories: mutable.HashMap[String, Properties] = null + private[metrics] var perInstanceSubProperties: mutable.HashMap[String, Properties] = null private def setDefaultProperties(prop: Properties) { prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet") @@ -43,6 +43,10 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging { prop.setProperty("applications.sink.servlet.path", "/metrics/applications/json") } + /** + * Load properties from various places, based on precedence + * If the same property is set again latter on in the method, it overwrites the previous value + */ def initialize() { // Add default properties in case there's no properties file setDefaultProperties(properties) @@ -57,16 +61,47 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging { case _ => } - propertyCategories = subProperties(properties, INSTANCE_REGEX) - if (propertyCategories.contains(DEFAULT_PREFIX)) { - val defaultProperty = propertyCategories(DEFAULT_PREFIX).asScala - for((inst, prop) <- propertyCategories if (inst != DEFAULT_PREFIX); - (k, v) <- defaultProperty if (prop.get(k) == null)) { + // Now, let's populate a list of sub-properties per instance, instance being the prefix that + // appears before the first dot in the property name. + // Add to the sub-properties per instance, the default properties (those with prefix "*"), if + // they don't have that exact same sub-property already defined. + // + // For example, if properties has ("*.class"->"default_class", "*.path"->"default_path, + // "driver.path"->"driver_path"), for driver specific sub-properties, we'd like the output to be + // ("driver"->Map("path"->"driver_path", "class"->"default_class") + // Note how class got added to based on the default property, but path remained the same + // since "driver.path" already existed and took precedence over "*.path" + // + perInstanceSubProperties = subProperties(properties, INSTANCE_REGEX) + if (perInstanceSubProperties.contains(DEFAULT_PREFIX)) { + val defaultSubProperties = perInstanceSubProperties(DEFAULT_PREFIX).asScala + for ((instance, prop) <- perInstanceSubProperties if (instance != DEFAULT_PREFIX); + (k, v) <- defaultSubProperties if (prop.get(k) == null)) { prop.put(k, v) } } } + /** + * Take a simple set of properties and a regex that the instance names (part before the first dot) + * have to conform to. And, return a map of the first order prefix (before the first dot) to the + * sub-properties under that prefix. + * + * For example, if the properties sent were Properties("*.sink.servlet.class"->"class1", + * "*.sink.servlet.path"->"path1"), the returned map would be + * Map("*" -> Properties("sink.servlet.class" -> "class1", "sink.servlet.path" -> "path1")) + * Note in the subProperties (value of the returned Map), only the suffixes are used as property + * keys. + * If, in the passed properties, there is only one property with a given prefix, it is still + * "unflattened". For example, if the input was Properties("*.sink.servlet.class" -> "class1" + * the returned Map would contain one key-value pair + * Map("*" -> Properties("sink.servlet.class" -> "class1")) + * Any passed in properties, not complying with the regex are ignored. + * + * @param prop the flat list of properties to "unflatten" based on prefixes + * @param regex the regex that the prefix has to comply with + * @return an unflatted map, mapping prefix with sub-properties under that prefix + */ def subProperties(prop: Properties, regex: Regex): mutable.HashMap[String, Properties] = { val subProperties = new mutable.HashMap[String, Properties] prop.asScala.foreach { kv => @@ -79,9 +114,9 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging { } def getInstance(inst: String): Properties = { - propertyCategories.get(inst) match { + perInstanceSubProperties.get(inst) match { case Some(s) => s - case None => propertyCategories.getOrElse(DEFAULT_PREFIX, new Properties) + case None => perInstanceSubProperties.getOrElse(DEFAULT_PREFIX, new Properties) } } diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index fdf76d312db3..d64aff857a07 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -20,8 +20,6 @@ package org.apache.spark.metrics import java.util.Properties import java.util.concurrent.TimeUnit -import org.apache.spark.util.Utils - import scala.collection.mutable import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry} @@ -30,26 +28,27 @@ import org.eclipse.jetty.servlet.ServletContextHandler import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.metrics.sink.{MetricsServlet, Sink} import org.apache.spark.metrics.source.Source +import org.apache.spark.util.Utils /** - * Spark Metrics System, created by specific "instance", combined by source, - * sink, periodically poll source metrics data to sink destinations. + * Spark Metrics System, created by a specific "instance", combined by source, + * sink, periodically polls source metrics data to sink destinations. * - * "instance" specify "who" (the role) use metrics system. In spark there are several roles - * like master, worker, executor, client driver, these roles will create metrics system - * for monitoring. So instance represents these roles. Currently in Spark, several instances + * "instance" specifies "who" (the role) uses the metrics system. In Spark, there are several roles + * like master, worker, executor, client driver. These roles will create metrics system + * for monitoring. So, "instance" represents these roles. Currently in Spark, several instances * have already implemented: master, worker, executor, driver, applications. * - * "source" specify "where" (source) to collect metrics data. In metrics system, there exists + * "source" specifies "where" (source) to collect metrics data from. In metrics system, there exists * two kinds of source: * 1. Spark internal source, like MasterSource, WorkerSource, etc, which will collect * Spark component's internal state, these sources are related to instance and will be - * added after specific metrics system is created. + * added after a specific metrics system is created. * 2. Common source, like JvmSource, which will collect low level state, is configured by * configuration and loaded through reflection. * - * "sink" specify "where" (destination) to output metrics data to. Several sinks can be - * coexisted and flush metrics to all these sinks. + * "sink" specifies "where" (destination) to output metrics data to. Several sinks can + * coexist and metrics can be flushed to all these sinks. * * Metrics configuration format is like below: * [instance].[sink|source].[name].[options] = xxxx @@ -62,9 +61,9 @@ import org.apache.spark.metrics.source.Source * [sink|source] means this property belongs to source or sink. This field can only be * source or sink. * - * [name] specify the name of sink or source, it is custom defined. + * [name] specify the name of sink or source, if it is custom defined. * - * [options] is the specific property of this source or sink. + * [options] represent the specific property of this source or sink. */ private[spark] class MetricsSystem private ( val instance: String, @@ -124,19 +123,27 @@ private[spark] class MetricsSystem private ( * application, executor/driver and metric source. */ private[spark] def buildRegistryName(source: Source): String = { - val appId = conf.getOption("spark.app.id") + val metricsNamespace = conf + .getOptionSubstituted(MetricsSystem.METRICS_NAMESPACE_CONFIG_NAME) + .orElse(conf.getOption("spark.app.id")) + val executorId = conf.getOption("spark.executor.id") val defaultName = MetricRegistry.name(source.sourceName) if (instance == "driver" || instance == "executor") { - if (appId.isDefined && executorId.isDefined) { - MetricRegistry.name(appId.get, executorId.get, source.sourceName) + if (metricsNamespace.isDefined && executorId.isDefined) { + MetricRegistry.name(metricsNamespace.get, executorId.get, source.sourceName) } else { // Only Driver and Executor set spark.app.id and spark.executor.id. // Other instance types, e.g. Master and Worker, are not related to a specific application. - val warningMsg = s"Using default name $defaultName for source because %s is not set." - if (appId.isEmpty) { logWarning(warningMsg.format("spark.app.id")) } - if (executorId.isEmpty) { logWarning(warningMsg.format("spark.executor.id")) } + if (metricsNamespace.isEmpty) { + logWarning(s"Using default name $defaultName for source because neither " + + s"${MetricsSystem.METRICS_NAMESPACE_CONFIG_NAME} nor spark.app.id is set.") + } + if (executorId.isEmpty) { + logWarning(s"Using default name $defaultName for source because spark.executor.id is " + + s"not set.") + } defaultName } } else { defaultName } @@ -210,6 +217,8 @@ private[spark] object MetricsSystem { val SINK_REGEX = "^sink\\.(.+)\\.(.+)".r val SOURCE_REGEX = "^source\\.(.+)\\.(.+)".r + val METRICS_NAMESPACE_CONFIG_NAME = "spark.metrics.namespace" + private[this] val MINIMAL_POLL_UNIT = TimeUnit.SECONDS private[this] val MINIMAL_POLL_PERIOD = 1 diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala index 41f2ff725a17..d5ea6fabc850 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala @@ -140,7 +140,7 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter { val conf = new MetricsConfig(sparkConf) conf.initialize() - val propCategories = conf.propertyCategories + val propCategories = conf.perInstanceSubProperties assert(propCategories.size === 3) val masterProp = conf.getInstance("master") diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala index 9c389c76bf3b..b8a995369dd4 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala @@ -184,4 +184,108 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM assert(metricName != s"$appId.$executorId.${source.sourceName}") assert(metricName === source.sourceName) } + + test("MetricsSystem with Executor instance, with litteral custom namespace") { + val source = new Source { + override val sourceName = "dummySource" + override val metricRegistry = new MetricRegistry() + } + + val appId = "testId" + val metricsNamespace = "metricsNamespace" + val executorId = "1" + conf.set("spark.app.id", appId) + conf.set("spark.executor.id", executorId) + conf.set(MetricsSystem.METRICS_NAMESPACE_CONFIG_NAME, metricsNamespace) + + val instanceName = "executor" + val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr) + + val metricName = driverMetricsSystem.buildRegistryName(source) + assert(metricName === s"$metricsNamespace.$executorId.${source.sourceName}") + } + + test("MetricsSystem with Executor instance, with reference to config key as custom namespace ") { + val source = new Source { + override val sourceName = "dummySource" + override val metricRegistry = new MetricRegistry() + } + + val appId = "testId" + val appName = "testName" + val executorId = "1" + conf.set("spark.app.id", appId) + conf.set("spark.app.name", appName) + conf.set("spark.executor.id", executorId) + conf.set(MetricsSystem.METRICS_NAMESPACE_CONFIG_NAME, "${spark.app.name}") + + val instanceName = "executor" + val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr) + + val metricName = driverMetricsSystem.buildRegistryName(source) + assert(metricName === s"$appName.$executorId.${source.sourceName}") + } + + test("MetricsSystem with Executor instance, custom namespace which is not set") { + val source = new Source { + override val sourceName = "dummySource" + override val metricRegistry = new MetricRegistry() + } + + val executorId = "1" + val namespaceToResolve = "${spark.doesnotexist}" + conf.set("spark.executor.id", executorId) + conf.set(MetricsSystem.METRICS_NAMESPACE_CONFIG_NAME, namespaceToResolve) + + val instanceName = "executor" + val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr) + + val metricName = driverMetricsSystem.buildRegistryName(source) + // If the user set the spark.metrics.namespace property to an expansion of another property + // (say ${spark.doesnotexist}, the unresolved name (i.e. literally ${spark.doesnotexist}) + // is used as the root logger name. + assert(metricName === s"$namespaceToResolve.$executorId.${source.sourceName}") + } + + test("MetricsSystem with Executor instance, custom namespace, spark.executor.id not set") { + val source = new Source { + override val sourceName = "dummySource" + override val metricRegistry = new MetricRegistry() + } + + val appId = "testId" + conf.set("spark.app.name", appId) + conf.set(MetricsSystem.METRICS_NAMESPACE_CONFIG_NAME, "${spark.app.name}") + + val instanceName = "executor" + val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr) + + val metricName = driverMetricsSystem.buildRegistryName(source) + assert(metricName === source.sourceName) + } + + test("MetricsSystem with non-driver, non-executor instance with custom namespace") { + val source = new Source { + override val sourceName = "dummySource" + override val metricRegistry = new MetricRegistry() + } + + val appId = "testId" + val appName = "testName" + val executorId = "dummyExecutorId" + conf.set("spark.app.id", appId) + conf.set("spark.app.name", appName) + conf.set(MetricsSystem.METRICS_NAMESPACE_CONFIG_NAME, "${spark.app.name}") + conf.set("spark.executor.id", executorId) + + val instanceName = "testInstance" + val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr) + + val metricName = driverMetricsSystem.buildRegistryName(source) + + // Even if spark.app.id and spark.executor.id are set, they are not used for the metric name. + assert(metricName != s"$appId.$executorId.${source.sourceName}") + assert(metricName === source.sourceName) + } + } diff --git a/docs/monitoring.md b/docs/monitoring.md index cedceb295802..141810cd40d7 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -264,6 +264,19 @@ This allows users to report Spark metrics to a variety of sinks including HTTP, files. The metrics system is configured via a configuration file that Spark expects to be present at `$SPARK_HOME/conf/metrics.properties`. A custom file location can be specified via the `spark.metrics.conf` [configuration property](configuration.html#spark-properties). +By default, the root namespace used for driver or executor metrics is +the value of `spark.app.id`. However, often times, users want to be able to track the metrics +across apps for driver and executors, which is hard to do with application ID +(i.e. `spark.app.id`) since it changes with every invocation of the app. For such use cases, +a custom namespace can be specified for metrics reporting using `spark.metrics.namespace` +configuration property. +If, say, users wanted to set the metrics namespace to the name of the application, they +can set the `spark.metrics.namespace` property to a value like `${spark.app.name}`. This value is +then expanded the Spark configuration properties only (in contrast with the substitution system in +version 2.x) and is used as the root namespace of the metrics system. +Non driver and executor metrics are never prefixed with `spark.app.id`, nor does the +`spark.metrics.namespace` property have any such affect on such metrics. + Spark's metrics are decoupled into different _instances_ corresponding to Spark components. Within each instance, you can configure a set of sinks to which metrics are reported. The following instances are currently supported: