Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
8267974
[SPARK-11373] First pass at adding metrics to the history server, wit…
steveloughran Nov 9, 2015
0825756
[SPARK-11373] tests and review; found and fixed a re-entrancy in Hist…
steveloughran Nov 9, 2015
9e63bcf
[SPARK-11373] scala-style javadoc tuning in FsHistoryProvider
steveloughran Nov 9, 2015
a658769
SPARK-11373: move to MetricsSystem, though retaining/expanding health…
steveloughran Nov 26, 2015
f02323f
scalacheck
steveloughran Nov 26, 2015
dbde3ab
SPARK-11373: cut out the notion of binding information; simplify prov…
steveloughran Nov 26, 2015
59a4a67
SPARK-11373 cut the health checks out
steveloughran Dec 9, 2015
4c30404
[SPARK-11373] tail end of rebase operation
steveloughran Feb 5, 2016
ee74f81
[SPARK-11373] scalastyle and import ordering
steveloughran Feb 8, 2016
a3a6383
SPARK-11373 finish review of merge with trunk; add new Timestamp gaug…
steveloughran Feb 12, 2016
e936d5b
[SPARK-11373] finish rebasing to master, correct tightened style checks
steveloughran Apr 26, 2016
cef1577
[SPARK-11373] Address review comments and add a new counter of events…
steveloughran Jun 10, 2016
4bf9d13
[SPARK-11373] more metrics, all sources have prefixes, common feature…
steveloughran Jul 14, 2016
818e14b
[SPARK-11373] add a check that before there's been any events loaded,…
steveloughran Jul 14, 2016
bfea17f
[SPARK-11373] style check in the javadocs
steveloughran Jul 14, 2016
b01facd
[SPARK-11373] IDE had mysteriously re-ordered imports in the same lin…
steveloughran Jul 14, 2016
a956243
[SPARK-11373] address latest comments
steveloughran Aug 16, 2016
50727e7
[SPARK-1137] cull surplus lines
steveloughran Aug 27, 2016
012cf92
[SPARK-11373] don't register appui.load.timer as a metric
steveloughran Aug 30, 2016
1e59b68
[SPARK-11373] sync with master; tests all happy
steveloughran Oct 10, 2016
ec1f2d7
[SPARK-11373] sync this PR up with the master branch
steveloughran Mar 1, 2017
8903dcf
HADOOP-11374 fix line length errors
steveloughran Apr 11, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import com.codahale.metrics.{Counter, MetricRegistry, Timer}
import com.codahale.metrics.{Counter, Counting, Metric, Timer}
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, RemovalListener, RemovalNotification}
import org.eclipse.jetty.servlet.FilterHolder

import org.apache.spark.internal.Logging
import org.apache.spark.metrics.source.Source
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.Clock

Expand Down Expand Up @@ -388,7 +387,7 @@ private[history] final case class CacheKey(appId: String, attemptId: Option[Stri
* Metrics of the cache
* @param prefix prefix to register all entries under
*/
private[history] class CacheMetrics(prefix: String) extends Source {
private[history] class CacheMetrics(prefix: String) extends HistoryMetricSource(prefix) {

/* metrics: counters and timers */
val lookupCount = new Counter()
Expand All @@ -410,34 +409,25 @@ private[history] class CacheMetrics(prefix: String) extends Source {
("update.triggered.count", updateTriggeredCount))

/** all metrics, including timers */
private val allMetrics = counters ++ Seq(
private val allMetrics: Seq[(String, Metric with Counting)] = counters ++ Seq(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is declaring the type useful here in some way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either I was just being explicit about what came in, or the IDE decided to get involved. Removed

("load.timer", loadTimer),
("update.probe.timer", updateProbeTimer))

/**
* Name of metric source
*/
override val sourceName = "ApplicationCache"

override val metricRegistry: MetricRegistry = new MetricRegistry
override val sourceName = "application.cache"

/**
* Startup actions.
* This includes registering metrics with [[metricRegistry]]
*/
private def init(): Unit = {
allMetrics.foreach { case (name, metric) =>
metricRegistry.register(MetricRegistry.name(prefix, name), metric)
}
register(allMetrics)
}

override def toString: String = {
val sb = new StringBuilder()
counters.foreach { case (name, counter) =>
sb.append(name).append(" = ").append(counter.getCount).append('\n')
}
sb.toString()
}
init()

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@

package org.apache.spark.deploy.history

import java.util.concurrent.atomic.AtomicBoolean
import java.util.zip.ZipOutputStream

import scala.xml.Node

import org.apache.spark.SparkException
import org.apache.spark.{SparkException, SparkFirehoseListener}
import org.apache.spark.metrics.source.Source
import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI

private[spark] case class ApplicationAttemptInfo(
Expand Down Expand Up @@ -74,6 +77,8 @@ private[history] case class LoadedAppUI(

private[history] abstract class ApplicationHistoryProvider {

private val started = new AtomicBoolean(false)

/**
* Returns the count of application event logs that the provider is currently still processing.
* History Server UI can use this to indicate to a user that the application listing on the UI
Expand All @@ -98,6 +103,19 @@ private[history] abstract class ApplicationHistoryProvider {
return 0;
}

/**
* Bind to the History Server: threads should be started here; exceptions may be raised
* Start the provider: threads should be started here; exceptions may be raised
* if the history provider cannot be started.
* The base implementation contains a re-entrancy check and should
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes this interface awkward. Why can't the HistoryServer keep track of that?

Copy link
Contributor Author

@steveloughran steveloughran Apr 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the work on Yarn service model, and that of SmartFrog before it, have given me a fear of startup logic. I'll see about doing it there though. Anyway, cut: if problems arise, people can restate it. This patch does change HistoryServer such that HistoryServerSuite doesn't call initalize() twice BTW

One thing to consider here is that really the FsHistoryProvider should be starting its threads in the start() method, so that subclasses, like the test class SafeModeTestProvider can be sure that they are fully inited before they start. I left that alone.

* be invoked first.
* @return the metric information for registration
*/
def start(): Option[Source] = {
require(!started.getAndSet(true), "History provider already started")
None
}

/**
* Returns a list of applications available for the history server to show.
*
Expand Down Expand Up @@ -145,3 +163,15 @@ private[history] abstract class ApplicationHistoryProvider {
*/
def getEmptyListingHtml(): Seq[Node] = Seq.empty
}

/**
* A simple counter of events.
* There is no concurrency support here: all events must come in sequentially.
*/
private[history] class EventCountListener extends SparkFirehoseListener {
var eventCount = 0L

override def onEvent(event: SparkListenerEvent): Unit = {
eventCount += 1
}
}
Loading