diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d943087ab6b80..d33cafb019dae 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -53,7 +53,7 @@ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend} import org.apache.spark.scheduler.local.LocalSchedulerBackend -import org.apache.spark.status.AppStatusStore +import org.apache.spark.status.{AppStatusSource, AppStatusStore} import org.apache.spark.status.api.v1.ThreadStackTrace import org.apache.spark.storage._ import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump @@ -418,7 +418,8 @@ class SparkContext(config: SparkConf) extends Logging { // Initialize the app status store and listener before SparkEnv is created so that it gets // all events. - _statusStore = AppStatusStore.createLiveStore(conf) + val appStatusSource = AppStatusSource.createSource(conf) + _statusStore = AppStatusStore.createLiveStore(conf, appStatusSource) listenerBus.addToStatusQueue(_statusStore.listener.get) // Create the Spark execution environment (cache, map output tracker, etc) @@ -569,7 +570,7 @@ class SparkContext(config: SparkConf) extends Logging { _executorAllocationManager.foreach { e => _env.metricsSystem.registerSource(e.executorAllocationManagerSource) } - + appStatusSource.foreach(_env.metricsSystem.registerSource(_)) // Make sure the context is stopped if the user forgets about it. This avoids leaving // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM // is killed, though. diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 304d0922a37d2..81d12afaaf285 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -17,6 +17,7 @@ package org.apache.spark.status +import java.time.Duration import java.util.Date import java.util.concurrent.ConcurrentHashMap import java.util.function.Function @@ -44,6 +45,7 @@ private[spark] class AppStatusListener( kvstore: ElementTrackingStore, conf: SparkConf, live: Boolean, + appStatusSource: Option[AppStatusSource] = None, lastUpdateTime: Option[Long] = None) extends SparkListener with Logging { import config._ @@ -280,6 +282,11 @@ private[spark] class AppStatusListener( private def updateBlackListStatus(execId: String, blacklisted: Boolean): Unit = { liveExecutors.get(execId).foreach { exec => exec.isBlacklisted = blacklisted + if (blacklisted) { + appStatusSource.foreach(_.BLACKLISTED_EXECUTORS.inc()) + } else { + appStatusSource.foreach(_.UNBLACKLISTED_EXECUTORS.inc()) + } liveUpdate(exec, System.nanoTime()) } } @@ -382,11 +389,34 @@ private[spark] class AppStatusListener( } job.status = event.jobResult match { - case JobSucceeded => JobExecutionStatus.SUCCEEDED - case JobFailed(_) => JobExecutionStatus.FAILED + case JobSucceeded => + appStatusSource.foreach{_.SUCCEEDED_JOBS.inc()} + JobExecutionStatus.SUCCEEDED + case JobFailed(_) => + appStatusSource.foreach{_.FAILED_JOBS.inc()} + JobExecutionStatus.FAILED } job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None + + for { + source <- appStatusSource + submissionTime <- job.submissionTime + completionTime <- job.completionTime + } { + source.JOB_DURATION.value.set(completionTime.getTime() - submissionTime.getTime()) + } + + // update global app status counters + appStatusSource.foreach { source => + source.COMPLETED_STAGES.inc(job.completedStages.size) + source.FAILED_STAGES.inc(job.failedStages) + source.COMPLETED_TASKS.inc(job.completedTasks) + source.FAILED_TASKS.inc(job.failedTasks) + source.KILLED_TASKS.inc(job.killedTasks) + source.SKIPPED_TASKS.inc(job.skippedTasks) + source.SKIPPED_STAGES.inc(job.skippedStages.size) + } update(job, now, last = true) } diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala b/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala new file mode 100644 index 0000000000000..7a231edaa4bed --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status + +import java.util.concurrent.atomic.AtomicLong + +import AppStatusSource.getCounter +import com.codahale.metrics.{Counter, Gauge, MetricRegistry} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.metrics.source.Source + +private [spark] class JobDuration(val value: AtomicLong) extends Gauge[Long] { + override def getValue: Long = value.get() +} + +private[spark] class AppStatusSource extends Source { + + override implicit val metricRegistry = new MetricRegistry() + + override val sourceName = "appStatus" + + val jobDuration = new JobDuration(new AtomicLong(0L)) + + // Duration of each job in milliseconds + val JOB_DURATION = metricRegistry + .register(MetricRegistry.name("jobDuration"), jobDuration) + + val FAILED_STAGES = getCounter("stages", "failedStages") + + val SKIPPED_STAGES = getCounter("stages", "skippedStages") + + val COMPLETED_STAGES = getCounter("stages", "completedStages") + + val SUCCEEDED_JOBS = getCounter("jobs", "succeededJobs") + + val FAILED_JOBS = getCounter("jobs", "failedJobs") + + val COMPLETED_TASKS = getCounter("tasks", "completedTasks") + + val FAILED_TASKS = getCounter("tasks", "failedTasks") + + val KILLED_TASKS = getCounter("tasks", "killedTasks") + + val SKIPPED_TASKS = getCounter("tasks", "skippedTasks") + + val BLACKLISTED_EXECUTORS = getCounter("tasks", "blackListedExecutors") + + val UNBLACKLISTED_EXECUTORS = getCounter("tasks", "unblackListedExecutors") +} + +private[spark] object AppStatusSource { + + def getCounter(prefix: String, name: String)(implicit metricRegistry: MetricRegistry): Counter = { + metricRegistry.counter (MetricRegistry.name (prefix, name) ) + } + + def createSource(conf: SparkConf): Option[AppStatusSource] = { + Option(conf.get(AppStatusSource.APP_STATUS_METRICS_ENABLED)) + .filter(identity) + .map {_ => new AppStatusSource()} + } + + val APP_STATUS_METRICS_ENABLED = + ConfigBuilder("spark.app.status.metrics.enabled") + .doc("Whether Dropwizard/Codahale metrics " + + "will be reported for the status of the running spark app.") + .booleanConf + .createWithDefault(false) +} diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index e237281c552b1..289d4ab323cd1 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -503,10 +503,11 @@ private[spark] object AppStatusStore { /** * Create an in-memory store for a live application. */ - def createLiveStore(conf: SparkConf): AppStatusStore = { + def createLiveStore( + conf: SparkConf, + appStatusSource: Option[AppStatusSource] = None): AppStatusStore = { val store = new ElementTrackingStore(new InMemoryStore(), conf) - val listener = new AppStatusListener(store, conf, true) + val listener = new AppStatusListener(store, conf, true, appStatusSource) new AppStatusStore(store, listener = Some(listener)) } - } diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index a0b2458549fbb..57fc939b0e82e 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -62,7 +62,7 @@ private[spark] abstract class LiveEntity { private class LiveJob( val jobId: Int, name: String, - submissionTime: Option[Date], + val submissionTime: Option[Date], val stageIds: Seq[Int], jobGroup: Option[String], numTasks: Int) extends LiveEntity {