- - Event Log Location: {parent.baseLogDir}
+ { providerConfig.map(e => - {e._1}: {e._2}
) }
{
- if (parent.appIdToInfo.size > 0) {
+ if (allApps.size > 0) {
- Showing {parent.appIdToInfo.size}/{parent.getNumApplications}
- Completed Application{if (parent.getNumApplications > 1) "s" else ""}
+ Showing {actualFirst + 1}-{last + 1} of {allApps.size}
+
+ {if (actualPage > 1) <}
+ {if (actualPage < pageCount) >}
+
++
appTable
} else {
@@ -56,26 +72,20 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
"Completed",
"Duration",
"Spark User",
- "Log Directory",
"Last Updated")
private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
- val appName = if (info.started) info.name else info.logDirPath.getName
- val uiAddress = parent.getAddress + info.ui.basePath
- val startTime = if (info.started) UIUtils.formatDate(info.startTime) else "Not started"
- val endTime = if (info.completed) UIUtils.formatDate(info.endTime) else "Not completed"
- val difference = if (info.started && info.completed) info.endTime - info.startTime else -1L
- val duration = if (difference > 0) UIUtils.formatDuration(difference) else "---"
- val sparkUser = if (info.started) info.sparkUser else "Unknown user"
- val logDirectory = info.logDirPath.getName
+ val uiAddress = "/history/" + info.id
+ val startTime = UIUtils.formatDate(info.startTime)
+ val endTime = UIUtils.formatDate(info.endTime)
+ val duration = UIUtils.formatDuration(info.endTime - info.startTime)
val lastUpdated = UIUtils.formatDate(info.lastUpdated)
- | {appName} |
+ {info.name} |
{startTime} |
{endTime} |
{duration} |
- {sparkUser} |
- {logDirectory} |
+ {info.sparkUser} |
{lastUpdated} |
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index a9c11dca5678e..29a78a56c8ed5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -17,14 +17,15 @@
package org.apache.spark.deploy.history
-import scala.collection.mutable
+import java.util.NoSuchElementException
+import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
-import org.apache.hadoop.fs.{FileStatus, Path}
+import com.google.common.cache._
+import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.scheduler._
-import org.apache.spark.ui.{WebUI, SparkUI}
+import org.apache.spark.ui.{WebUI, SparkUI, UIUtils}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.Utils
@@ -38,56 +39,68 @@ import org.apache.spark.util.Utils
* application's event logs are maintained in the application's own sub-directory. This
* is the same structure as maintained in the event log write code path in
* EventLoggingListener.
- *
- * @param baseLogDir The base directory in which event logs are found
*/
class HistoryServer(
- val baseLogDir: String,
+ conf: SparkConf,
+ provider: ApplicationHistoryProvider,
securityManager: SecurityManager,
- conf: SparkConf)
- extends WebUI(securityManager, HistoryServer.WEB_UI_PORT, conf) with Logging {
-
- import HistoryServer._
+ port: Int)
+ extends WebUI(securityManager, port, conf) with Logging {
- private val fileSystem = Utils.getHadoopFileSystem(baseLogDir)
- private val localHost = Utils.localHostName()
- private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost)
+ // How many applications to retain
+ private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50)
- // A timestamp of when the disk was last accessed to check for log updates
- private var lastLogCheckTime = -1L
+ private val appLoader = new CacheLoader[String, SparkUI] {
+ override def load(key: String): SparkUI = {
+ val ui = provider.getAppUI(key)
+ if (ui == null) {
+ throw new NoSuchElementException()
+ }
+ attachSparkUI(ui)
+ ui
+ }
+ }
- // Number of completed applications found in this directory
- private var numCompletedApplications = 0
+ private val appCache = CacheBuilder.newBuilder()
+ .maximumSize(retainedApplications)
+ .removalListener(new RemovalListener[String, SparkUI] {
+ override def onRemoval(rm: RemovalNotification[String, SparkUI]) = {
+ detachSparkUI(rm.getValue())
+ }
+ })
+ .build(appLoader)
+
+ private val loaderServlet = new HttpServlet {
+ protected override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = {
+ val parts = Option(req.getPathInfo()).getOrElse("").split("/")
+ if (parts.length < 2) {
+ res.sendError(HttpServletResponse.SC_BAD_REQUEST,
+ s"Unexpected path info in request (URI = ${req.getRequestURI()}")
+ return
+ }
- @volatile private var stopped = false
+ val appId = parts(1)
- /**
- * A background thread that periodically checks for event log updates on disk.
- *
- * If a log check is invoked manually in the middle of a period, this thread re-adjusts the
- * time at which it performs the next log check to maintain the same period as before.
- *
- * TODO: Add a mechanism to update manually.
- */
- private val logCheckingThread = new Thread {
- override def run(): Unit = Utils.logUncaughtExceptions {
- while (!stopped) {
- val now = System.currentTimeMillis
- if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) {
- checkForLogs()
- Thread.sleep(UPDATE_INTERVAL_MS)
- } else {
- // If the user has manually checked for logs recently, wait until
- // UPDATE_INTERVAL_MS after the last check time
- Thread.sleep(lastLogCheckTime + UPDATE_INTERVAL_MS - now)
+ // Note we don't use the UI retrieved from the cache; the cache loader above will register
+ // the app's UI, and all we need to do is redirect the user to the same URI that was
+ // requested, and the proper data should be served at that point.
+ try {
+ appCache.get(appId)
+ res.sendRedirect(res.encodeRedirectURL(req.getRequestURI()))
+ } catch {
+ case e: Exception => e.getCause() match {
+ case nsee: NoSuchElementException =>
+ val msg =
Application {appId} not found.
+ res.setStatus(HttpServletResponse.SC_NOT_FOUND)
+ UIUtils.basicSparkPage(msg, "Not Found").foreach(
+ n => res.getWriter().write(n.toString))
+
+ case cause: Exception => throw cause
}
}
}
}
- // A mapping of application ID to its history information, which includes the rendered UI
- val appIdToInfo = mutable.HashMap[String, ApplicationHistoryInfo]()
-
initialize()
/**
@@ -98,108 +111,23 @@ class HistoryServer(
*/
def initialize() {
attachPage(new HistoryPage(this))
- attachHandler(createStaticHandler(STATIC_RESOURCE_DIR, "/static"))
+ attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
+
+ val contextHandler = new ServletContextHandler
+ contextHandler.setContextPath("/history")
+ contextHandler.addServlet(new ServletHolder(loaderServlet), "/*")
+ attachHandler(contextHandler)
}
/** Bind to the HTTP server behind this web interface. */
override def bind() {
super.bind()
- logCheckingThread.start()
- }
-
- /**
- * Check for any updates to event logs in the base directory. This is only effective once
- * the server has been bound.
- *
- * If a new completed application is found, the server renders the associated SparkUI
- * from the application's event logs, attaches this UI to itself, and stores metadata
- * information for this application.
- *
- * If the logs for an existing completed application are no longer found, the server
- * removes all associated information and detaches the SparkUI.
- */
- def checkForLogs() = synchronized {
- if (serverInfo.isDefined) {
- lastLogCheckTime = System.currentTimeMillis
- logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTime))
- try {
- val logStatus = fileSystem.listStatus(new Path(baseLogDir))
- val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
- val logInfos = logDirs
- .sortBy { dir => getModificationTime(dir) }
- .map { dir => (dir, EventLoggingListener.parseLoggingInfo(dir.getPath, fileSystem)) }
- .filter { case (dir, info) => info.applicationComplete }
-
- // Logging information for applications that should be retained
- val retainedLogInfos = logInfos.takeRight(RETAINED_APPLICATIONS)
- val retainedAppIds = retainedLogInfos.map { case (dir, _) => dir.getPath.getName }
-
- // Remove any applications that should no longer be retained
- appIdToInfo.foreach { case (appId, info) =>
- if (!retainedAppIds.contains(appId)) {
- detachSparkUI(info.ui)
- appIdToInfo.remove(appId)
- }
- }
-
- // Render the application's UI if it is not already there
- retainedLogInfos.foreach { case (dir, info) =>
- val appId = dir.getPath.getName
- if (!appIdToInfo.contains(appId)) {
- renderSparkUI(dir, info)
- }
- }
-
- // Track the total number of completed applications observed this round
- numCompletedApplications = logInfos.size
-
- } catch {
- case e: Exception => logError("Exception in checking for event log updates", e)
- }
- } else {
- logWarning("Attempted to check for event log updates before binding the server.")
- }
- }
-
- /**
- * Render a new SparkUI from the event logs if the associated application is completed.
- *
- * HistoryServer looks for a special file that indicates application completion in the given
- * directory. If this file exists, the associated application is regarded to be completed, in
- * which case the server proceeds to render the SparkUI. Otherwise, the server does nothing.
- */
- private def renderSparkUI(logDir: FileStatus, elogInfo: EventLoggingInfo) {
- val path = logDir.getPath
- val appId = path.getName
- val replayBus = new ReplayListenerBus(elogInfo.logPaths, fileSystem, elogInfo.compressionCodec)
- val appListener = new ApplicationEventListener
- replayBus.addListener(appListener)
- val appConf = conf.clone()
- val appSecManager = new SecurityManager(appConf)
- val ui = new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId)
-
- // Do not call ui.bind() to avoid creating a new server for each application
- replayBus.replay()
- if (appListener.applicationStarted) {
- appSecManager.setUIAcls(HISTORY_UI_ACLS_ENABLED)
- appSecManager.setViewAcls(appListener.sparkUser, appListener.viewAcls)
- attachSparkUI(ui)
- val appName = appListener.appName
- val sparkUser = appListener.sparkUser
- val startTime = appListener.startTime
- val endTime = appListener.endTime
- val lastUpdated = getModificationTime(logDir)
- ui.setAppName(appName + " (completed)")
- appIdToInfo(appId) = ApplicationHistoryInfo(appId, appName, startTime, endTime,
- lastUpdated, sparkUser, path, ui)
- }
}
/** Stop the server and close the file system. */
override def stop() {
super.stop()
- stopped = true
- fileSystem.close()
+ provider.stop()
}
/** Attach a reconstructed UI to this server. Only valid after bind(). */
@@ -215,27 +143,20 @@ class HistoryServer(
ui.getHandlers.foreach(detachHandler)
}
- /** Return the address of this server. */
- def getAddress: String = "http://" + publicHost + ":" + boundPort
+ /**
+ * Returns a list of available applications, in descending order according to their end time.
+ *
+ * @return List of all known applications.
+ */
+ def getApplicationList() = provider.getListing()
- /** Return the number of completed applications found, whether or not the UI is rendered. */
- def getNumApplications: Int = numCompletedApplications
+ /**
+ * Returns the provider configuration to show in the listing page.
+ *
+ * @return A map with the provider's configuration.
+ */
+ def getProviderConfig() = provider.getConfig()
- /** Return when this directory was last modified. */
- private def getModificationTime(dir: FileStatus): Long = {
- try {
- val logFiles = fileSystem.listStatus(dir.getPath)
- if (logFiles != null && !logFiles.isEmpty) {
- logFiles.map(_.getModificationTime).max
- } else {
- dir.getModificationTime
- }
- } catch {
- case e: Exception =>
- logError("Exception in accessing modification time of %s".format(dir.getPath), e)
- -1L
- }
- }
}
/**
@@ -251,30 +172,31 @@ class HistoryServer(
object HistoryServer {
private val conf = new SparkConf
- // Interval between each check for event log updates
- val UPDATE_INTERVAL_MS = conf.getInt("spark.history.updateInterval", 10) * 1000
-
- // How many applications to retain
- val RETAINED_APPLICATIONS = conf.getInt("spark.history.retainedApplications", 250)
-
- // The port to which the web UI is bound
- val WEB_UI_PORT = conf.getInt("spark.history.ui.port", 18080)
-
- // set whether to enable or disable view acls for all applications
- val HISTORY_UI_ACLS_ENABLED = conf.getBoolean("spark.history.ui.acls.enable", false)
-
- val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR
-
def main(argStrings: Array[String]) {
initSecurity()
- val args = new HistoryServerArguments(argStrings)
+ val args = new HistoryServerArguments(conf, argStrings)
val securityManager = new SecurityManager(conf)
- val server = new HistoryServer(args.logDir, securityManager, conf)
+
+ val providerName = conf.getOption("spark.history.provider")
+ .getOrElse(classOf[FsHistoryProvider].getName())
+ val provider = Class.forName(providerName)
+ .getConstructor(classOf[SparkConf])
+ .newInstance(conf)
+ .asInstanceOf[ApplicationHistoryProvider]
+
+ val port = conf.getInt("spark.history.ui.port", 18080)
+
+ val server = new HistoryServer(conf, provider, securityManager, port)
server.bind()
+ Runtime.getRuntime().addShutdownHook(new Thread("HistoryServerStopper") {
+ override def run() = {
+ server.stop()
+ }
+ })
+
// Wait until the end of the world... or if the HistoryServer process is manually stopped
while(true) { Thread.sleep(Int.MaxValue) }
- server.stop()
}
def initSecurity() {
@@ -291,17 +213,3 @@ object HistoryServer {
}
}
-
-
-private[spark] case class ApplicationHistoryInfo(
- id: String,
- name: String,
- startTime: Long,
- endTime: Long,
- lastUpdated: Long,
- sparkUser: String,
- logDirPath: Path,
- ui: SparkUI) {
- def started = startTime != -1
- def completed = endTime != -1
-}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
index 943c061743dbd..be9361b754fc3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
@@ -17,17 +17,14 @@
package org.apache.spark.deploy.history
-import java.net.URI
-
-import org.apache.hadoop.fs.Path
-
+import org.apache.spark.SparkConf
import org.apache.spark.util.Utils
/**
* Command-line parser for the master.
*/
-private[spark] class HistoryServerArguments(args: Array[String]) {
- var logDir = ""
+private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]) {
+ private var logDir: String = null
parse(args.toList)
@@ -45,32 +42,36 @@ private[spark] class HistoryServerArguments(args: Array[String]) {
case _ =>
printUsageAndExit(1)
}
- validateLogDir()
- }
-
- private def validateLogDir() {
- if (logDir == "") {
- System.err.println("Logging directory must be specified.")
- printUsageAndExit(1)
- }
- val fileSystem = Utils.getHadoopFileSystem(new URI(logDir))
- val path = new Path(logDir)
- if (!fileSystem.exists(path)) {
- System.err.println("Logging directory specified does not exist: %s".format(logDir))
- printUsageAndExit(1)
- }
- if (!fileSystem.getFileStatus(path).isDir) {
- System.err.println("Logging directory specified is not a directory: %s".format(logDir))
- printUsageAndExit(1)
+ if (logDir != null) {
+ conf.set("spark.history.fs.logDirectory", logDir)
}
}
private def printUsageAndExit(exitCode: Int) {
System.err.println(
- "Usage: HistoryServer [options]\n" +
- "\n" +
- "Options:\n" +
- " -d DIR, --dir DIR Location of event log files")
+ """
+ |Usage: HistoryServer
+ |
+ |Configuration options can be set by setting the corresponding JVM system property.
+ |History Server options are always available; additional options depend on the provider.
+ |
+ |History Server options:
+ |
+ | spark.history.ui.port Port where server will listen for connections
+ | (default 18080)
+ | spark.history.acls.enable Whether to enable view acls for all applications
+ | (default false)
+ | spark.history.provider Name of history provider class (defaults to
+ | file system-based provider)
+ | spark.history.retainedApplications Max number of application UIs to keep loaded in memory
+ | (default 50)
+ |FsHistoryProvider options:
+ |
+ | spark.history.fs.logDirectory Directory where app logs are stored (required)
+ | spark.history.fs.updateInterval How often to reload log data from storage (in seconds,
+ | default 10)
+ |""".stripMargin)
System.exit(exitCode)
}
+
}
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index b08f308fda1dd..856273e1d4e21 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -51,6 +51,7 @@ private[spark] abstract class WebUI(
def getTabs: Seq[WebUITab] = tabs.toSeq
def getHandlers: Seq[ServletContextHandler] = handlers.toSeq
+ def getSecurityManager: SecurityManager = securityManager
/** Attach a tab to this UI, along with all of its attached pages. */
def attachTab(tab: WebUITab) {
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 2b9e9e5bd7ea0..84073fe4d949a 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -35,11 +35,13 @@ If Spark is run on Mesos or YARN, it is still possible to reconstruct the UI of
application through Spark's history server, provided that the application's event logs exist.
You can start a the history server by executing:
- ./sbin/start-history-server.sh
+ ./sbin/start-history-server.sh
-The base logging directory must be supplied, and should contain sub-directories that each
-represents an application's event logs. This creates a web interface at
-`http://:18080` by default. The history server can be configured as follows:
+When using the file-system provider class (see spark.history.provider below), the base logging
+directory must be supplied in the spark.history.fs.logDirectory configuration option,
+and should contain sub-directories that each represents an application's event logs. This creates a
+web interface at `http://:18080` by default. The history server can be configured as
+follows:
| Environment Variable | Meaning |
@@ -69,7 +71,14 @@ represents an application's event logs. This creates a web interface at
| Property Name | Default | Meaning |
- | spark.history.updateInterval |
+ spark.history.provider |
+ org.apache.spark.deploy.history.FsHistoryProvider |
+ Name of the class implementing the application history backend. Currently there is only
+ one implementation, provided by Spark, which looks for application logs stored in the
+ file system. |
+
+
+ | spark.history.fs.updateInterval |
10 |
The period, in seconds, at which information displayed by this history server is updated.
@@ -78,7 +87,7 @@ represents an application's event logs. This creates a web interface at
|
| spark.history.retainedApplications |
- 250 |
+ 50 |
The number of application UIs to retain. If this cap is exceeded, then the oldest
applications will be removed.
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 042fdfcc47261..933faab5c9431 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -34,7 +34,10 @@ object MimaExcludes {
val excludes =
SparkBuild.SPARK_VERSION match {
case v if v.startsWith("1.1") =>
- Seq(MimaBuild.excludeSparkPackage("graphx")) ++
+ Seq(
+ MimaBuild.excludeSparkPackage("deploy"),
+ MimaBuild.excludeSparkPackage("graphx")
+ ) ++
Seq(
// We made a mistake earlier (ed06500d3) in the Java API to use default parameter values
// for countApproxDistinct* functions, which does not work in Java. We later removed
diff --git a/sbin/start-history-server.sh b/sbin/start-history-server.sh
index 4a90c68763b68..e30493da32a7a 100755
--- a/sbin/start-history-server.sh
+++ b/sbin/start-history-server.sh
@@ -19,19 +19,18 @@
# Starts the history server on the machine this script is executed on.
#
-# Usage: start-history-server.sh []
-# Example: ./start-history-server.sh --dir /tmp/spark-events --port 18080
+# Usage: start-history-server.sh
+#
+# Use the SPARK_HISTORY_OPTS environment variable to set history server configuration.
#
sbin=`dirname "$0"`
sbin=`cd "$sbin"; pwd`
-if [ $# -lt 1 ]; then
- echo "Usage: ./start-history-server.sh "
- echo "Example: ./start-history-server.sh /tmp/spark-events"
- exit
+if [ $# != 0 ]; then
+ echo "Using command line arguments for setting the log directory is deprecated. Please "
+ echo "set the spark.history.fs.logDirectory configuration option instead."
+ export SPARK_HISTORY_OPTS="$SPARK_HISTORY_OPTS -Dspark.history.fs.logDirectory=$1"
fi
-LOG_DIR=$1
-
-"$sbin"/spark-daemon.sh start org.apache.spark.deploy.history.HistoryServer 1 --dir "$LOG_DIR"
+exec "$sbin"/spark-daemon.sh start org.apache.spark.deploy.history.HistoryServer 1
|