From b28447862b515a45a6b7798b256014df23b55799 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 7 May 2014 16:30:02 -0700 Subject: [PATCH 01/19] Separate history server from history backend. This change does two things, mainly: - Separate the logic of serving application history from fetching application history from the underlying storage. Not only this cleans up the code a little bit, but it also serves as initial work for SPARK-1537, where we may want to fetch application data from Yarn instead of HDFS. I've kept the current command line options working, but I changed the way configuration is set to be mostly based on SparkConf, so that it's easy to support new providers later. - Make it so the UI for each application is loaded lazily. The UIs are cached in memory (cache size configurable) for faster subsequent access. This means that we don't need a limit for the number of applications listed; the list should fit comfortably in memory (since it holds much less data). Because of this I lowered the number of applications kept in memory to 50 (since that setting doesn't influence the number of apps listed anymore). Later, we may want to provide paging in the listing UI, and also spilling the listing to disk and loading it on demand to avoid large memory usage / slow startup. --- .../deploy/history/FsHistoryProvider.scala | 205 +++++++++++ .../spark/deploy/history/HistoryPage.scala | 20 +- .../spark/deploy/history/HistoryServer.scala | 333 +++++++++--------- .../history/HistoryServerArguments.scala | 76 ---- .../scala/org/apache/spark/ui/WebUI.scala | 1 + 5 files changed, 379 insertions(+), 256 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala delete mode 100644 core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala new file mode 100644 index 0000000000000..884eca34168ca --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.FileNotFoundException +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable + +import org.apache.hadoop.fs.{FileStatus, Path} + +import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.scheduler._ +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.Utils + +class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider + with Logging { + + // Interval between each check for event log updates + private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval", 10) * 1000 + + private val logDir = conf.get("spark.history.fs.logDirectory") + private val fs = Utils.getHadoopFileSystem(logDir) + + // A timestamp of when the disk was last accessed to check for log updates + private var lastLogCheckTime = -1L + + // List of applications, in order from newest to oldest. + private val appList = new AtomicReference[Seq[ApplicationHistoryInfo]](Nil) + + /** + * A background thread that periodically checks for event log updates on disk. + * + * If a log check is invoked manually in the middle of a period, this thread re-adjusts the + * time at which it performs the next log check to maintain the same period as before. + * + * TODO: Add a mechanism to update manually. + */ + private val logCheckingThread = new Thread("LogCheckingThread") { + override def run() = Utils.logUncaughtExceptions { + while (!stopped) { + val now = System.currentTimeMillis + if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) { + Thread.sleep(UPDATE_INTERVAL_MS) + } else { + // If the user has manually checked for logs recently, wait until + // UPDATE_INTERVAL_MS after the last check time + Thread.sleep(lastLogCheckTime + UPDATE_INTERVAL_MS - now) + } + checkForLogs() + } + } + } + + @volatile private var stopped = false + + initialize() + + private def initialize() { + // Validate the log directory. + val path = new Path(logDir) + if (!fs.exists(path)) { + throw new IllegalArgumentException("Logging directory specified does not exist: %s".format(logDir)) + } + if (!fs.getFileStatus(path).isDir) { + throw new IllegalArgumentException("Logging directory specified is not a directory: %s".format(logDir)) + } + + checkForLogs() + logCheckingThread.start() + } + + override def stop() = { + stopped = true + logCheckingThread.interrupt() + logCheckingThread.join() + } + + override def getListing(offset: Int, limit: Int): Seq[ApplicationHistoryInfo] = { + appList.get() + } + + override def getAppInfo(appId: String): ApplicationHistoryInfo = { + try { + val appLogDir = fs.getFileStatus(new Path(logDir, appId)) + loadAppInfo(appLogDir, true) + } catch { + case e: FileNotFoundException => null + } + } + + /** + * Check for any updates to event logs in the base directory. This is only effective once + * the server has been bound. + * + * If a new completed application is found, the server renders the associated SparkUI + * from the application's event logs, attaches this UI to itself, and stores metadata + * information for this application. + * + * If the logs for an existing completed application are no longer found, the server + * removes all associated information and detaches the SparkUI. + */ + def checkForLogs() = synchronized { + lastLogCheckTime = System.currentTimeMillis + logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTime)) + try { + val logStatus = fs.listStatus(new Path(logDir)) + val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]() + val logInfos = logDirs + .sortBy { dir => getModificationTime(dir) } + .filter { + dir => fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE)) + } + + var currentApps = Map[String, ApplicationHistoryInfo]( + appList.get().map(app => (app.id -> app)):_*) + + // For any application that either (i) is not listed or (ii) has changed since the last time + // the listing was created (defined by the log dir's modification time), load the app's info. + // Otherwise just reuse what's already in memory. + appList.set(logInfos + .map { dir => + val curr = currentApps.getOrElse(dir.getPath().getName(), null) + if (curr == null || curr.lastUpdated < getModificationTime(dir)) { + loadAppInfo(dir, false) + } else { + curr + } + } + .sortBy { info => -info.lastUpdated }) + } catch { + case t: Throwable => logError("Exception in checking for event log updates", t) + } + } + + /** + * Parse the application's logs to find out the information we need to build the + * listing page. + */ + private def loadAppInfo(logDir: FileStatus, renderUI: Boolean): ApplicationHistoryInfo = { + val elogInfo = EventLoggingListener.parseLoggingInfo(logDir.getPath(), fs) + val path = logDir.getPath + val appId = path.getName + val replayBus = new ReplayListenerBus(elogInfo.logPaths, fs, elogInfo.compressionCodec) + val appListener = new ApplicationEventListener + replayBus.addListener(appListener) + + val ui: SparkUI = if (renderUI) { + val conf = this.conf.clone() + val appSecManager = new SecurityManager(conf) + new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId) + // Do not call ui.bind() to avoid creating a new server for each application + } else { + null + } + + replayBus.replay() + val appName = appListener.appName + val sparkUser = appListener.sparkUser + val startTime = appListener.startTime + val endTime = appListener.endTime + val lastUpdated = getModificationTime(logDir) + ApplicationHistoryInfo(appId, + appListener.appName, + appListener.startTime, + appListener.endTime, + getModificationTime(logDir), + appListener.sparkUser, + if (renderUI) appListener.viewAcls else null, + ui) + } + + /** Return when this directory was last modified. */ + private def getModificationTime(dir: FileStatus): Long = { + try { + val logFiles = fs.listStatus(dir.getPath) + if (logFiles != null && !logFiles.isEmpty) { + logFiles.map(_.getModificationTime).max + } else { + dir.getModificationTime + } + } catch { + case t: Throwable => + logError("Exception in accessing modification time of %s".format(dir.getPath), t) + -1L + } + } + +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index 180c853ce3096..b933d58a5d8dd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -26,19 +26,16 @@ import org.apache.spark.ui.{WebUIPage, UIUtils} private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { def render(request: HttpServletRequest): Seq[Node] = { - val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated } - val appTable = UIUtils.listingTable(appHeader, appRow, appRows) + val apps = parent.getApplicationList(0, -1) + val appTable = UIUtils.listingTable(appHeader, appRow, apps) val content =
-
    -
  • Event Log Location: {parent.baseLogDir}
  • -
{ - if (parent.appIdToInfo.size > 0) { + if (apps.size > 0) {

- Showing {parent.appIdToInfo.size}/{parent.getNumApplications} - Completed Application{if (parent.getNumApplications > 1) "s" else ""} + Showing {apps.size}/{apps.size} + Completed Application{if (apps.size > 1) "s" else ""}

++ appTable } else { @@ -56,18 +53,16 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { "Completed", "Duration", "Spark User", - "Log Directory", "Last Updated") private def appRow(info: ApplicationHistoryInfo): Seq[Node] = { - val appName = if (info.started) info.name else info.logDirPath.getName - val uiAddress = parent.getAddress + info.ui.basePath + val appName = if (info.started) info.name else info.id + val uiAddress = "/history/" + info.id val startTime = if (info.started) UIUtils.formatDate(info.startTime) else "Not started" val endTime = if (info.completed) UIUtils.formatDate(info.endTime) else "Not completed" val difference = if (info.started && info.completed) info.endTime - info.startTime else -1L val duration = if (difference > 0) UIUtils.formatDuration(difference) else "---" val sparkUser = if (info.started) info.sparkUser else "Unknown user" - val logDirectory = info.logDirPath.getName val lastUpdated = UIUtils.formatDate(info.lastUpdated) {appName} @@ -75,7 +70,6 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { {endTime} {duration} {sparkUser} - {logDirectory} {lastUpdated} } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index a9c11dca5678e..c8e1dfa6fa220 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -17,14 +17,15 @@ package org.apache.spark.deploy.history -import scala.collection.mutable +import java.util.NoSuchElementException +import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} -import org.apache.hadoop.fs.{FileStatus, Path} +import com.google.common.cache._ +import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder} import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.scheduler._ -import org.apache.spark.ui.{WebUI, SparkUI} +import org.apache.spark.ui.{WebUI, SparkUI, UIUtils} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.Utils @@ -38,56 +39,76 @@ import org.apache.spark.util.Utils * application's event logs are maintained in the application's own sub-directory. This * is the same structure as maintained in the event log write code path in * EventLoggingListener. - * - * @param baseLogDir The base directory in which event logs are found */ class HistoryServer( - val baseLogDir: String, + conf: SparkConf, + provider: ApplicationHistoryProvider, securityManager: SecurityManager, - conf: SparkConf) - extends WebUI(securityManager, HistoryServer.WEB_UI_PORT, conf) with Logging { + port: Int) + extends WebUI(securityManager, port, conf) with Logging { + + // How many applications to retain + private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50) - import HistoryServer._ + // set whether to enable or disable view acls for all applications + private val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) - private val fileSystem = Utils.getHadoopFileSystem(baseLogDir) private val localHost = Utils.localHostName() private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost) - // A timestamp of when the disk was last accessed to check for log updates - private var lastLogCheckTime = -1L + private val appLoader = new CacheLoader[String, SparkUI] { + override def load(key: String): SparkUI = { + val info = provider.getAppInfo(key) + if (info != null) { + info.ui.getSecurityManager.setUIAcls(uiAclsEnabled) + info.ui.getSecurityManager.setViewAcls(info.sparkUser, info.viewAcls) + attachSparkUI(info.ui) + info.ui + } else { + throw new NoSuchElementException() + } + } + } - // Number of completed applications found in this directory - private var numCompletedApplications = 0 + private val appCache = CacheBuilder.newBuilder() + .maximumSize(retainedApplications) + .removalListener(new RemovalListener[String, SparkUI] { + override def onRemoval(rm: RemovalNotification[String, SparkUI]) = { + detachSparkUI(rm.getValue()) + } + }) + .build(appLoader) + + private val loaderServlet = new HttpServlet { + protected override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = { + val parts = req.getPathInfo().split("/") + if (parts.length < 2) { + res.setStatus(HttpServletResponse.SC_BAD_REQUEST) + return + } - @volatile private var stopped = false + var appId = parts(1) - /** - * A background thread that periodically checks for event log updates on disk. - * - * If a log check is invoked manually in the middle of a period, this thread re-adjusts the - * time at which it performs the next log check to maintain the same period as before. - * - * TODO: Add a mechanism to update manually. - */ - private val logCheckingThread = new Thread { - override def run(): Unit = Utils.logUncaughtExceptions { - while (!stopped) { - val now = System.currentTimeMillis - if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) { - checkForLogs() - Thread.sleep(UPDATE_INTERVAL_MS) - } else { - // If the user has manually checked for logs recently, wait until - // UPDATE_INTERVAL_MS after the last check time - Thread.sleep(lastLogCheckTime + UPDATE_INTERVAL_MS - now) + // Note we don't use the UI retrieved from the cache; the cache loader above will register + // the app's UI, and all we need to do is redirect the user to the same URI that was + // requested, and the proper data should be served at that point. + try { + appCache.get(appId) + res.sendRedirect(res.encodeRedirectURL(req.getRequestURI())) + } catch { + case e: Exception => e.getCause() match { + case nsee: NoSuchElementException => + val msg =
Application {appId} not found.
+ res.setStatus(HttpServletResponse.SC_NOT_FOUND) + UIUtils.basicSparkPage(msg, "Not Found").foreach( + n => res.getWriter().write(n.toString)) + + case cause: Exception => throw cause } } } } - // A mapping of application ID to its history information, which includes the rendered UI - val appIdToInfo = mutable.HashMap[String, ApplicationHistoryInfo]() - initialize() /** @@ -98,108 +119,23 @@ class HistoryServer( */ def initialize() { attachPage(new HistoryPage(this)) - attachHandler(createStaticHandler(STATIC_RESOURCE_DIR, "/static")) + attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) + + val contextHandler = new ServletContextHandler + contextHandler.setContextPath("/history") + contextHandler.addServlet(new ServletHolder(loaderServlet), "/*") + attachHandler(contextHandler) } /** Bind to the HTTP server behind this web interface. */ override def bind() { super.bind() - logCheckingThread.start() - } - - /** - * Check for any updates to event logs in the base directory. This is only effective once - * the server has been bound. - * - * If a new completed application is found, the server renders the associated SparkUI - * from the application's event logs, attaches this UI to itself, and stores metadata - * information for this application. - * - * If the logs for an existing completed application are no longer found, the server - * removes all associated information and detaches the SparkUI. - */ - def checkForLogs() = synchronized { - if (serverInfo.isDefined) { - lastLogCheckTime = System.currentTimeMillis - logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTime)) - try { - val logStatus = fileSystem.listStatus(new Path(baseLogDir)) - val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]() - val logInfos = logDirs - .sortBy { dir => getModificationTime(dir) } - .map { dir => (dir, EventLoggingListener.parseLoggingInfo(dir.getPath, fileSystem)) } - .filter { case (dir, info) => info.applicationComplete } - - // Logging information for applications that should be retained - val retainedLogInfos = logInfos.takeRight(RETAINED_APPLICATIONS) - val retainedAppIds = retainedLogInfos.map { case (dir, _) => dir.getPath.getName } - - // Remove any applications that should no longer be retained - appIdToInfo.foreach { case (appId, info) => - if (!retainedAppIds.contains(appId)) { - detachSparkUI(info.ui) - appIdToInfo.remove(appId) - } - } - - // Render the application's UI if it is not already there - retainedLogInfos.foreach { case (dir, info) => - val appId = dir.getPath.getName - if (!appIdToInfo.contains(appId)) { - renderSparkUI(dir, info) - } - } - - // Track the total number of completed applications observed this round - numCompletedApplications = logInfos.size - - } catch { - case e: Exception => logError("Exception in checking for event log updates", e) - } - } else { - logWarning("Attempted to check for event log updates before binding the server.") - } - } - - /** - * Render a new SparkUI from the event logs if the associated application is completed. - * - * HistoryServer looks for a special file that indicates application completion in the given - * directory. If this file exists, the associated application is regarded to be completed, in - * which case the server proceeds to render the SparkUI. Otherwise, the server does nothing. - */ - private def renderSparkUI(logDir: FileStatus, elogInfo: EventLoggingInfo) { - val path = logDir.getPath - val appId = path.getName - val replayBus = new ReplayListenerBus(elogInfo.logPaths, fileSystem, elogInfo.compressionCodec) - val appListener = new ApplicationEventListener - replayBus.addListener(appListener) - val appConf = conf.clone() - val appSecManager = new SecurityManager(appConf) - val ui = new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId) - - // Do not call ui.bind() to avoid creating a new server for each application - replayBus.replay() - if (appListener.applicationStarted) { - appSecManager.setUIAcls(HISTORY_UI_ACLS_ENABLED) - appSecManager.setViewAcls(appListener.sparkUser, appListener.viewAcls) - attachSparkUI(ui) - val appName = appListener.appName - val sparkUser = appListener.sparkUser - val startTime = appListener.startTime - val endTime = appListener.endTime - val lastUpdated = getModificationTime(logDir) - ui.setAppName(appName + " (completed)") - appIdToInfo(appId) = ApplicationHistoryInfo(appId, appName, startTime, endTime, - lastUpdated, sparkUser, path, ui) - } } /** Stop the server and close the file system. */ override def stop() { super.stop() - stopped = true - fileSystem.close() + provider.stop() } /** Attach a reconstructed UI to this server. Only valid after bind(). */ @@ -215,27 +151,17 @@ class HistoryServer( ui.getHandlers.foreach(detachHandler) } - /** Return the address of this server. */ - def getAddress: String = "http://" + publicHost + ":" + boundPort - - /** Return the number of completed applications found, whether or not the UI is rendered. */ - def getNumApplications: Int = numCompletedApplications - - /** Return when this directory was last modified. */ - private def getModificationTime(dir: FileStatus): Long = { - try { - val logFiles = fileSystem.listStatus(dir.getPath) - if (logFiles != null && !logFiles.isEmpty) { - logFiles.map(_.getModificationTime).max - } else { - dir.getModificationTime - } - } catch { - case e: Exception => - logError("Exception in accessing modification time of %s".format(dir.getPath), e) - -1L - } + /** + * Returns a list of available applications, in descending order according to their last + * updated time. + * + * @param offset Offset of the first entry to return. + * @param limit Maximum number of entries to return (-1 = no limit). + */ + def getApplicationList(offset: Int, limit: Int) = { + provider.getListing(offset, limit) } + } /** @@ -251,25 +177,21 @@ class HistoryServer( object HistoryServer { private val conf = new SparkConf - // Interval between each check for event log updates - val UPDATE_INTERVAL_MS = conf.getInt("spark.history.updateInterval", 10) * 1000 - - // How many applications to retain - val RETAINED_APPLICATIONS = conf.getInt("spark.history.retainedApplications", 250) - - // The port to which the web UI is bound - val WEB_UI_PORT = conf.getInt("spark.history.ui.port", 18080) - - // set whether to enable or disable view acls for all applications - val HISTORY_UI_ACLS_ENABLED = conf.getBoolean("spark.history.ui.acls.enable", false) - - val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR - def main(argStrings: Array[String]) { initSecurity() - val args = new HistoryServerArguments(argStrings) + parse(argStrings.toList) val securityManager = new SecurityManager(conf) - val server = new HistoryServer(args.logDir, securityManager, conf) + + val providerName = conf.getOption("spark.history.provider") + .getOrElse(classOf[FsHistoryProvider].getName()) + val provider = Class.forName(providerName) + .getConstructor(classOf[SparkConf]) + .newInstance(conf) + .asInstanceOf[ApplicationHistoryProvider] + + val port = conf.getInt("spark.history.ui.port", 18080) + + val server = new HistoryServer(conf, provider, securityManager, port) server.bind() // Wait until the end of the world... or if the HistoryServer process is manually stopped @@ -290,8 +212,85 @@ object HistoryServer { } } + private def parse(args: List[String]): Unit = { + args match { + case ("--dir" | "-d") :: value :: tail => + set("fs.logDirectory", value) + parse(tail) + + case ("--port" | "-p") :: value :: tail => + set("ui.port", value) + parse(tail) + + case ("-D") :: opt :: value :: tail => + set(opt, value) + parse(tail) + + case ("--help" | "-h") :: tail => + printUsageAndExit(0) + + case Nil => + + case _ => + printUsageAndExit(1) + } + } + + private def set(name: String, value: String) = { + conf.set("spark.history." + name, value) + } + + private def printUsageAndExit(exitCode: Int) { + System.err.println( + """ + |Usage: HistoryServer [options] + | + |Options are set by passing "-D option value" command line arguments to the class. + |Command line options will override the Spark configuration file and system properties. + |History Server options are always available; additional options depend on the provider. + | + |History Server options: + | + | ui.port Port where server will listen for connections (default 18080) + | ui.acls.enable Whether to enable view acls for all applications (default false) + | provider Name of history provider class (defaults to file system-based provider) + | + |FsHistoryProvider options: + | + | fs.logDirectory Directory where app logs are stored (required) + | fs.updateInterval How often to reload log data from storage (seconds, default 10) + |""".stripMargin) + System.exit(exitCode) + } + } +private[spark] abstract class ApplicationHistoryProvider { + + /** + * This method should return a list of applications available for the history server to + * show. The listing is assumed to be in descending time order (so that the parameters + * make sense). + * + * @param offset Offset of the first entry to return. + * @param limit Maximum number of entries to return (-1 = no limit). + */ + def getListing(offset: Int, limit: Int): Seq[ApplicationHistoryInfo] + + /** + * This method should return the application information, including a rendered SparkUI. + * + * @param appId The application ID. + * @return The app info, or null if not found. + */ + def getAppInfo(appId: String): ApplicationHistoryInfo + + /** + * Called when the server is shutting down. + */ + def stop(): Unit = { } + +} private[spark] case class ApplicationHistoryInfo( id: String, @@ -300,7 +299,7 @@ private[spark] case class ApplicationHistoryInfo( endTime: Long, lastUpdated: Long, sparkUser: String, - logDirPath: Path, + viewAcls: String, ui: SparkUI) { def started = startTime != -1 def completed = endTime != -1 diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala deleted file mode 100644 index 943c061743dbd..0000000000000 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.history - -import java.net.URI - -import org.apache.hadoop.fs.Path - -import org.apache.spark.util.Utils - -/** - * Command-line parser for the master. - */ -private[spark] class HistoryServerArguments(args: Array[String]) { - var logDir = "" - - parse(args.toList) - - private def parse(args: List[String]): Unit = { - args match { - case ("--dir" | "-d") :: value :: tail => - logDir = value - parse(tail) - - case ("--help" | "-h") :: tail => - printUsageAndExit(0) - - case Nil => - - case _ => - printUsageAndExit(1) - } - validateLogDir() - } - - private def validateLogDir() { - if (logDir == "") { - System.err.println("Logging directory must be specified.") - printUsageAndExit(1) - } - val fileSystem = Utils.getHadoopFileSystem(new URI(logDir)) - val path = new Path(logDir) - if (!fileSystem.exists(path)) { - System.err.println("Logging directory specified does not exist: %s".format(logDir)) - printUsageAndExit(1) - } - if (!fileSystem.getFileStatus(path).isDir) { - System.err.println("Logging directory specified is not a directory: %s".format(logDir)) - printUsageAndExit(1) - } - } - - private def printUsageAndExit(exitCode: Int) { - System.err.println( - "Usage: HistoryServer [options]\n" + - "\n" + - "Options:\n" + - " -d DIR, --dir DIR Location of event log files") - System.exit(exitCode) - } -} diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index b08f308fda1dd..856273e1d4e21 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -51,6 +51,7 @@ private[spark] abstract class WebUI( def getTabs: Seq[WebUITab] = tabs.toSeq def getHandlers: Seq[ServletContextHandler] = handlers.toSeq + def getSecurityManager: SecurityManager = securityManager /** Attach a tab to this UI, along with all of its attached pages. */ def attachTab(tab: WebUITab) { From bda2fa14142cc9dee5b309092c649eac152931b1 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 8 May 2014 14:48:04 -0700 Subject: [PATCH 02/19] Rudimentary paging support for the history UI. The provider's list api was tweaked a little bit so that the caller can get an atomic view of the data currently held in the provider. --- .../deploy/history/FsHistoryProvider.scala | 8 +++++-- .../spark/deploy/history/HistoryPage.scala | 20 +++++++++++++---- .../spark/deploy/history/HistoryServer.scala | 22 +++++++++++-------- 3 files changed, 35 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 884eca34168ca..c022676c64880 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -83,6 +83,7 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider } checkForLogs() + logCheckingThread.setDaemon(true) logCheckingThread.start() } @@ -90,10 +91,13 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider stopped = true logCheckingThread.interrupt() logCheckingThread.join() + fs.close() } - override def getListing(offset: Int, limit: Int): Seq[ApplicationHistoryInfo] = { - appList.get() + override def getListing(offset: Int, count: Int) = { + val list = appList.get() + val theOffset = if (offset < list.size) offset else 0 + (list.slice(theOffset, Math.min(theOffset + count, list.size)), theOffset, list.size) } override def getAppInfo(appId: String): ApplicationHistoryInfo = { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index b933d58a5d8dd..4a89af6519200 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -25,17 +25,29 @@ import org.apache.spark.ui.{WebUIPage, UIUtils} private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { + val pageSize = 20 + def render(request: HttpServletRequest): Seq[Node] = { - val apps = parent.getApplicationList(0, -1) + val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt + val requestedFirst = (requestedPage - 1) * pageSize + val (apps, actualFirst, count) = parent.getApplicationList(requestedFirst, pageSize) + val actualPage = (actualFirst / pageSize) + 1 + val last = Math.min(actualFirst + pageSize, count) - 1 + val pageCount = count / pageSize + (if (count % pageSize > 0) 1 else 0) + val appTable = UIUtils.listingTable(appHeader, appRow, apps) val content =
{ - if (apps.size > 0) { + if (count > 0) {

- Showing {apps.size}/{apps.size} - Completed Application{if (apps.size > 1) "s" else ""} + Showing {actualFirst + 1}-{last + 1} of {count} + Application{if (last - actualFirst > 1) "s" else ""} + + {if (actualPage > 1) <} + {if (actualPage < pageCount) >} +

++ appTable } else { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index c8e1dfa6fa220..fbdc0970fd334 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -155,11 +155,12 @@ class HistoryServer( * Returns a list of available applications, in descending order according to their last * updated time. * - * @param offset Offset of the first entry to return. - * @param limit Maximum number of entries to return (-1 = no limit). + * @param offset Starting offset for returned objects. + * @param count Max number of objects to return. + * @return 3-tuple (requested app list, adjusted offset, count of all available apps) */ - def getApplicationList(offset: Int, limit: Int) = { - provider.getListing(offset, limit) + def getApplicationList(offset: Int, count: Int) = { + provider.getListing(offset, count) } } @@ -269,13 +270,16 @@ private[spark] abstract class ApplicationHistoryProvider { /** * This method should return a list of applications available for the history server to - * show. The listing is assumed to be in descending time order (so that the parameters - * make sense). + * show. The listing is assumed to be in descending time order. * - * @param offset Offset of the first entry to return. - * @param limit Maximum number of entries to return (-1 = no limit). + * An adjusted offset should be returned if the app list has changed and the request + * references an invalid start offset. Otherwise, the provided offset should be returned. + * + * @param offset Starting offset for returned objects. + * @param count Max number of objects to return. + * @return 3-tuple (requested app list, adjusted offset, count of all available apps) */ - def getListing(offset: Int, limit: Int): Seq[ApplicationHistoryInfo] + def getListing(offset: Int, count: Int): (Seq[ApplicationHistoryInfo], Int, Int) /** * This method should return the application information, including a rendered SparkUI. From eee2f5a5c500a74dd0c9fe454b3d91635b61fc25 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 9 May 2014 13:39:26 -0700 Subject: [PATCH 03/19] Ensure server.stop() is called when shutting down. Also remove the cleanup code from the fs provider. It would be better to clean up, but there's a race between that code's cleanup and Hadoop's shutdown hook, which closes all file systems kept in the cache. So if you try to clean up the fs provider in a shut down hook, you may end up with ugly exceptions in the output. But leave the stop() functionality around in case it's useful for future provider implementations. --- .../spark/deploy/history/FsHistoryProvider.scala | 11 +---------- .../apache/spark/deploy/history/HistoryServer.scala | 7 ++++++- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index c022676c64880..d8ee557caba86 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -54,7 +54,7 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider */ private val logCheckingThread = new Thread("LogCheckingThread") { override def run() = Utils.logUncaughtExceptions { - while (!stopped) { + while (true) { val now = System.currentTimeMillis if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) { Thread.sleep(UPDATE_INTERVAL_MS) @@ -68,8 +68,6 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider } } - @volatile private var stopped = false - initialize() private def initialize() { @@ -87,13 +85,6 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider logCheckingThread.start() } - override def stop() = { - stopped = true - logCheckingThread.interrupt() - logCheckingThread.join() - fs.close() - } - override def getListing(offset: Int, count: Int) = { val list = appList.get() val theOffset = if (offset < list.size) offset else 0 diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index fbdc0970fd334..2aa469f563559 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -195,9 +195,14 @@ object HistoryServer { val server = new HistoryServer(conf, provider, securityManager, port) server.bind() + Runtime.getRuntime().addShutdownHook(new Thread("HistoryServerStopper") { + override def run() = { + server.stop() + } + }) + // Wait until the end of the world... or if the HistoryServer process is manually stopped while(true) { Thread.sleep(Int.MaxValue) } - server.stop() } def initSecurity() { From 6fbe0d8d41c6a95a3e1a8db4445359afa134ccd8 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 9 May 2014 15:00:45 -0700 Subject: [PATCH 04/19] Better handle failures when loading app info. Instead of failing to load all the applications, just ignore the one that failed. --- .../deploy/history/FsHistoryProvider.scala | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index d8ee557caba86..bd4d1250c051b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -129,16 +129,21 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider // For any application that either (i) is not listed or (ii) has changed since the last time // the listing was created (defined by the log dir's modification time), load the app's info. // Otherwise just reuse what's already in memory. - appList.set(logInfos - .map { dir => - val curr = currentApps.getOrElse(dir.getPath().getName(), null) - if (curr == null || curr.lastUpdated < getModificationTime(dir)) { - loadAppInfo(dir, false) - } else { - curr + val newApps = new mutable.ListBuffer[ApplicationHistoryInfo] + for (dir <- logInfos) { + val curr = currentApps.getOrElse(dir.getPath().getName(), null) + if (curr == null || curr.lastUpdated < getModificationTime(dir)) { + try { + newApps += loadAppInfo(dir, false) + } catch { + case e: Exception => logError(s"Failed to load app info from directory $dir.") } + } else { + newApps += curr } - .sortBy { info => -info.lastUpdated }) + } + + appList.set(newApps.sortBy { info => -info.lastUpdated }) } catch { case t: Throwable => logError("Exception in checking for event log updates", t) } From 91e96ca81ccfe8b57ed4b89bdb2db97ec31f80bb Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 9 May 2014 15:27:04 -0700 Subject: [PATCH 05/19] Fix scalastyle issues. --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index bd4d1250c051b..f8bcbc291534a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -74,10 +74,12 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider // Validate the log directory. val path = new Path(logDir) if (!fs.exists(path)) { - throw new IllegalArgumentException("Logging directory specified does not exist: %s".format(logDir)) + throw new IllegalArgumentException( + "Logging directory specified does not exist: %s".format(logDir)) } if (!fs.getFileStatus(path).isDir) { - throw new IllegalArgumentException("Logging directory specified is not a directory: %s".format(logDir)) + throw new IllegalArgumentException( + "Logging directory specified is not a directory: %s".format(logDir)) } checkForLogs() From 49d2fd3227d63dfe800981f1a9d4adbec8a05c2e Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 9 May 2014 16:41:15 -0700 Subject: [PATCH 06/19] Fix a comment. --- .../spark/deploy/history/FsHistoryProvider.scala | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index f8bcbc291534a..49bacd8974009 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -103,15 +103,9 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider } /** - * Check for any updates to event logs in the base directory. This is only effective once - * the server has been bound. - * - * If a new completed application is found, the server renders the associated SparkUI - * from the application's event logs, attaches this UI to itself, and stores metadata - * information for this application. - * - * If the logs for an existing completed application are no longer found, the server - * removes all associated information and detaches the SparkUI. + * Builds the application list based on the current contents of the log directory. + * Tries to reuse as much of the data already in memory as possible, but not reading + * applications that hasn't been updated since last time the logs were checked. */ def checkForLogs() = synchronized { lastLogCheckTime = System.currentTimeMillis From e8026f4eeb83152c88ceb87793625bc22d67f362 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 2 Jun 2014 16:36:42 -0700 Subject: [PATCH 07/19] Review feedback. Use monotonic time, plus other stylistic things. --- .../deploy/history/FsHistoryProvider.scala | 18 ++++++++++-------- .../spark/deploy/history/HistoryServer.scala | 11 +++++------ 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 49bacd8974009..b9a6026afb090 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -39,7 +39,7 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider private val fs = Utils.getHadoopFileSystem(logDir) // A timestamp of when the disk was last accessed to check for log updates - private var lastLogCheckTime = -1L + private var lastLogCheckTimeMs = -1L // List of applications, in order from newest to oldest. private val appList = new AtomicReference[Seq[ApplicationHistoryInfo]](Nil) @@ -55,13 +55,13 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider private val logCheckingThread = new Thread("LogCheckingThread") { override def run() = Utils.logUncaughtExceptions { while (true) { - val now = System.currentTimeMillis - if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) { + val now = getMonotonicTime() + if (now - lastLogCheckTimeMs > UPDATE_INTERVAL_MS) { Thread.sleep(UPDATE_INTERVAL_MS) } else { // If the user has manually checked for logs recently, wait until // UPDATE_INTERVAL_MS after the last check time - Thread.sleep(lastLogCheckTime + UPDATE_INTERVAL_MS - now) + Thread.sleep(lastLogCheckTimeMs + UPDATE_INTERVAL_MS - now) } checkForLogs() } @@ -108,13 +108,12 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider * applications that hasn't been updated since last time the logs were checked. */ def checkForLogs() = synchronized { - lastLogCheckTime = System.currentTimeMillis - logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTime)) + lastLogCheckTimeMs = getMonotonicTime() + logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs)) try { val logStatus = fs.listStatus(new Path(logDir)) val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]() val logInfos = logDirs - .sortBy { dir => getModificationTime(dir) } .filter { dir => fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE)) } @@ -125,7 +124,7 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider // For any application that either (i) is not listed or (ii) has changed since the last time // the listing was created (defined by the log dir's modification time), load the app's info. // Otherwise just reuse what's already in memory. - val newApps = new mutable.ListBuffer[ApplicationHistoryInfo] + val newApps = new mutable.ArrayBuffer[ApplicationHistoryInfo] for (dir <- logInfos) { val curr = currentApps.getOrElse(dir.getPath().getName(), null) if (curr == null || curr.lastUpdated < getModificationTime(dir)) { @@ -198,4 +197,7 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider } } + /** Returns the system's mononotically increasing time. */ + private def getMonotonicTime() = System.nanoTime() / (1000 * 1000) + } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 2aa469f563559..73b827d8b6055 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -59,14 +59,13 @@ class HistoryServer( private val appLoader = new CacheLoader[String, SparkUI] { override def load(key: String): SparkUI = { val info = provider.getAppInfo(key) - if (info != null) { - info.ui.getSecurityManager.setUIAcls(uiAclsEnabled) - info.ui.getSecurityManager.setViewAcls(info.sparkUser, info.viewAcls) - attachSparkUI(info.ui) - info.ui - } else { + if (info == null) { throw new NoSuchElementException() } + info.ui.getSecurityManager.setUIAcls(uiAclsEnabled) + info.ui.getSecurityManager.setViewAcls(info.sparkUser, info.viewAcls) + attachSparkUI(info.ui) + info.ui } } From e8521499051fdef2c091d9f302277b55e3c1f953 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 2 Jun 2014 16:56:23 -0700 Subject: [PATCH 08/19] Initialize new app array to expected size. To avoid reallocations. --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index b9a6026afb090..e4851479a9f3a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -124,7 +124,7 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider // For any application that either (i) is not listed or (ii) has changed since the last time // the listing was created (defined by the log dir's modification time), load the app's info. // Otherwise just reuse what's already in memory. - val newApps = new mutable.ArrayBuffer[ApplicationHistoryInfo] + val newApps = new mutable.ArrayBuffer[ApplicationHistoryInfo](logInfos.size) for (dir <- logInfos) { val curr = currentApps.getOrElse(dir.getPath().getName(), null) if (curr == null || curr.lastUpdated < getModificationTime(dir)) { From 4406f6159f7dcc620c23031c5f562c8380db1851 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 3 Jun 2014 10:34:47 -0700 Subject: [PATCH 09/19] Cosmetic change to listing header. --- .../org/apache/spark/deploy/history/HistoryPage.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index 4a89af6519200..7f1a9fe6f7d53 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -30,20 +30,19 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { def render(request: HttpServletRequest): Seq[Node] = { val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt val requestedFirst = (requestedPage - 1) * pageSize - val (apps, actualFirst, count) = parent.getApplicationList(requestedFirst, pageSize) + val (apps, actualFirst, totalCount) = parent.getApplicationList(requestedFirst, pageSize) val actualPage = (actualFirst / pageSize) + 1 - val last = Math.min(actualFirst + pageSize, count) - 1 - val pageCount = count / pageSize + (if (count % pageSize > 0) 1 else 0) + val last = Math.min(actualFirst + pageSize, totalCount) - 1 + val pageCount = totalCount / pageSize + (if (totalCount % pageSize > 0) 1 else 0) val appTable = UIUtils.listingTable(appHeader, appRow, apps) val content =
{ - if (count > 0) { + if (totalCount > 0) {

- Showing {actualFirst + 1}-{last + 1} of {count} - Application{if (last - actualFirst > 1) "s" else ""} + Showing {actualFirst + 1}-{last + 1} of {totalCount} {if (actualPage > 1) <} {if (actualPage < pageCount) >} From b2c570ad0c16bb1296f01e996d04671a17cce421 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 3 Jun 2014 10:58:31 -0700 Subject: [PATCH 10/19] Make class package-private. --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index e4851479a9f3a..85da47bb55212 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -29,7 +29,7 @@ import org.apache.spark.scheduler._ import org.apache.spark.ui.SparkUI import org.apache.spark.util.Utils -class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider +private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider with Logging { // Interval between each check for event log updates From 6e2432fc5ad29e05b5d446fa6e940256587c64ee Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 4 Jun 2014 10:42:07 -0700 Subject: [PATCH 11/19] Second round of feedback. - Simplify some mt code. - Remove new argument that wasn't in 1.0.0, reword some comments. --- .../deploy/history/FsHistoryProvider.scala | 22 +++++++------------ .../spark/deploy/history/HistoryServer.scala | 8 +++---- 2 files changed, 11 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 85da47bb55212..6c5cd21809d6d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -18,7 +18,6 @@ package org.apache.spark.deploy.history import java.io.FileNotFoundException -import java.util.concurrent.atomic.AtomicReference import scala.collection.mutable @@ -42,7 +41,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private var lastLogCheckTimeMs = -1L // List of applications, in order from newest to oldest. - private val appList = new AtomicReference[Seq[ApplicationHistoryInfo]](Nil) + @volatile private var appList: Seq[ApplicationHistoryInfo] = Nil /** * A background thread that periodically checks for event log updates on disk. @@ -88,7 +87,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } override def getListing(offset: Int, count: Int) = { - val list = appList.get() + val list = appList val theOffset = if (offset < list.size) offset else 0 (list.slice(theOffset, Math.min(theOffset + count, list.size)), theOffset, list.size) } @@ -104,10 +103,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis /** * Builds the application list based on the current contents of the log directory. - * Tries to reuse as much of the data already in memory as possible, but not reading - * applications that hasn't been updated since last time the logs were checked. + * Tries to reuse as much of the data already in memory as possible, by not reading + * applications that haven't been updated since last time the logs were checked. */ - def checkForLogs() = synchronized { + def checkForLogs() = { lastLogCheckTimeMs = getMonotonicTime() logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs)) try { @@ -118,8 +117,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis dir => fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE)) } - var currentApps = Map[String, ApplicationHistoryInfo]( - appList.get().map(app => (app.id -> app)):_*) + val currentApps = Map[String, ApplicationHistoryInfo]( + appList.map(app => (app.id -> app)):_*) // For any application that either (i) is not listed or (ii) has changed since the last time // the listing was created (defined by the log dir's modification time), load the app's info. @@ -138,7 +137,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } } - appList.set(newApps.sortBy { info => -info.lastUpdated }) + appList = newApps.sortBy { info => -info.lastUpdated } } catch { case t: Throwable => logError("Exception in checking for event log updates", t) } @@ -166,11 +165,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } replayBus.replay() - val appName = appListener.appName - val sparkUser = appListener.sparkUser - val startTime = appListener.startTime - val endTime = appListener.endTime - val lastUpdated = getModificationTime(logDir) ApplicationHistoryInfo(appId, appListener.appName, appListener.startTime, diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 73b827d8b6055..2e6243afada01 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -223,10 +223,6 @@ object HistoryServer { set("fs.logDirectory", value) parse(tail) - case ("--port" | "-p") :: value :: tail => - set("ui.port", value) - parse(tail) - case ("-D") :: opt :: value :: tail => set(opt, value) parse(tail) @@ -251,7 +247,9 @@ object HistoryServer { |Usage: HistoryServer [options] | |Options are set by passing "-D option value" command line arguments to the class. - |Command line options will override the Spark configuration file and system properties. + |Command line options will override JVM system properties (which should be prepended + |with "spark.history."). + | |History Server options are always available; additional options depend on the provider. | |History Server options: From ca5d3200d24ae4cc35f1e6b4e60593ec59144bad Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 4 Jun 2014 11:02:20 -0700 Subject: [PATCH 12/19] Remove code that deals with unfinished apps. The HS only reads logs from finished apps, so remove the code that checked whether the app was actually finished. --- .../apache/spark/deploy/history/HistoryPage.scala | 13 +++++-------- .../apache/spark/deploy/history/HistoryServer.scala | 2 -- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index 7f1a9fe6f7d53..05bc2cccd0444 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -67,20 +67,17 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { "Last Updated") private def appRow(info: ApplicationHistoryInfo): Seq[Node] = { - val appName = if (info.started) info.name else info.id val uiAddress = "/history/" + info.id - val startTime = if (info.started) UIUtils.formatDate(info.startTime) else "Not started" - val endTime = if (info.completed) UIUtils.formatDate(info.endTime) else "Not completed" - val difference = if (info.started && info.completed) info.endTime - info.startTime else -1L - val duration = if (difference > 0) UIUtils.formatDuration(difference) else "---" - val sparkUser = if (info.started) info.sparkUser else "Unknown user" + val startTime = UIUtils.formatDate(info.startTime) + val endTime = UIUtils.formatDate(info.endTime) + val duration = UIUtils.formatDuration(info.endTime - info.startTime) val lastUpdated = UIUtils.formatDate(info.lastUpdated) - {appName} + {info.name} {startTime} {endTime} {duration} - {sparkUser} + {info.sparkUser} {lastUpdated} } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 2e6243afada01..a82fe8e6ea732 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -307,6 +307,4 @@ private[spark] case class ApplicationHistoryInfo( sparkUser: String, viewAcls: String, ui: SparkUI) { - def started = startTime != -1 - def completed = endTime != -1 } From 249bcea9f32fc19cb40501a73b30a15adaf4858e Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 4 Jun 2014 13:31:27 -0700 Subject: [PATCH 13/19] Remove offset / count from provider interface. --- .../deploy/history/FsHistoryProvider.scala | 8 ++----- .../spark/deploy/history/HistoryPage.scala | 14 +++++++----- .../spark/deploy/history/HistoryServer.scala | 22 ++++++------------- 3 files changed, 18 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 6c5cd21809d6d..23d246492aa48 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -86,11 +86,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis logCheckingThread.start() } - override def getListing(offset: Int, count: Int) = { - val list = appList - val theOffset = if (offset < list.size) offset else 0 - (list.slice(theOffset, Math.min(theOffset + count, list.size)), theOffset, list.size) - } + override def getListing() = appList override def getAppInfo(appId: String): ApplicationHistoryInfo = { try { @@ -137,7 +133,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } } - appList = newApps.sortBy { info => -info.lastUpdated } + appList = newApps.sortBy { info => -info.endTime } } catch { case t: Throwable => logError("Exception in checking for event log updates", t) } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index 05bc2cccd0444..0115a51280b7e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -30,19 +30,23 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { def render(request: HttpServletRequest): Seq[Node] = { val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt val requestedFirst = (requestedPage - 1) * pageSize - val (apps, actualFirst, totalCount) = parent.getApplicationList(requestedFirst, pageSize) + + val allApps = parent.getApplicationList() + val actualFirst = if (requestedFirst < allApps.size) requestedFirst else 0 + val apps = allApps.slice(actualFirst, Math.min(pageSize, allApps.size)) + val actualPage = (actualFirst / pageSize) + 1 - val last = Math.min(actualFirst + pageSize, totalCount) - 1 - val pageCount = totalCount / pageSize + (if (totalCount % pageSize > 0) 1 else 0) + val last = Math.min(actualFirst + pageSize, allApps.size) - 1 + val pageCount = allApps.size / pageSize + (if (allApps.size % pageSize > 0) 1 else 0) val appTable = UIUtils.listingTable(appHeader, appRow, apps) val content =
{ - if (totalCount > 0) { + if (allApps.size > 0) {

- Showing {actualFirst + 1}-{last + 1} of {totalCount} + Showing {actualFirst + 1}-{last + 1} of {allApps.size} {if (actualPage > 1) <} {if (actualPage < pageCount) >} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index a82fe8e6ea732..3ae2d90166efe 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -151,16 +151,11 @@ class HistoryServer( } /** - * Returns a list of available applications, in descending order according to their last - * updated time. + * Returns a list of available applications, in descending order according to their end time. * - * @param offset Starting offset for returned objects. - * @param count Max number of objects to return. - * @return 3-tuple (requested app list, adjusted offset, count of all available apps) + * @return List of all known applications. */ - def getApplicationList(offset: Int, count: Int) = { - provider.getListing(offset, count) - } + def getApplicationList() = provider.getListing() } @@ -272,16 +267,13 @@ private[spark] abstract class ApplicationHistoryProvider { /** * This method should return a list of applications available for the history server to - * show. The listing is assumed to be in descending time order. + * show. * - * An adjusted offset should be returned if the app list has changed and the request - * references an invalid start offset. Otherwise, the provided offset should be returned. + * The listing is assumed to be in descending end time order. * - * @param offset Starting offset for returned objects. - * @param count Max number of objects to return. - * @return 3-tuple (requested app list, adjusted offset, count of all available apps) + * @return List of all know applications. */ - def getListing(offset: Int, count: Int): (Seq[ApplicationHistoryInfo], Int, Int) + def getListing(): Seq[ApplicationHistoryInfo] /** * This method should return the application information, including a rendered SparkUI. From 4e72c771da6759db0d3b7a9b27170a5f8bf40dc5 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 4 Jun 2014 13:34:28 -0700 Subject: [PATCH 14/19] Remove comment about ordering. While ordering is nice to have, it's hard to guarantee that, even with ordering, the listing won't change between two client requests (and thus end up with different info when the UI applies the paging parameters). So don't make it a requirement (even if an informal one). --- .../scala/org/apache/spark/deploy/history/HistoryServer.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 3ae2d90166efe..5d172591b5e6c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -269,8 +269,6 @@ private[spark] abstract class ApplicationHistoryProvider { * This method should return a list of applications available for the history server to * show. * - * The listing is assumed to be in descending end time order. - * * @return List of all know applications. */ def getListing(): Seq[ApplicationHistoryInfo] From 2a7f68d6d2fa7a9b259f00130e1bf1adce91fb29 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 9 Jun 2014 17:25:02 -0700 Subject: [PATCH 15/19] Address review feedback. Main changes: - Restore old command line handling. - Fix pagination. - Restore showing the log directory in the listing page. --- .../history/ApplicationHistoryProvider.scala | 63 +++++++++++ .../deploy/history/FsHistoryProvider.scala | 22 +++- .../spark/deploy/history/HistoryPage.scala | 8 +- .../spark/deploy/history/HistoryServer.scala | 102 +++--------------- .../history/HistoryServerArguments.scala | 97 +++++++++++++++++ docs/monitoring.md | 2 +- 6 files changed, 196 insertions(+), 98 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala create mode 100644 core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala new file mode 100644 index 0000000000000..c1f4c34e7a5c4 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import org.apache.spark.ui.SparkUI + +private[spark] case class ApplicationHistoryInfo( + id: String, + name: String, + startTime: Long, + endTime: Long, + lastUpdated: Long, + sparkUser: String, + viewAcls: String, + ui: SparkUI) { +} + +private[spark] abstract class ApplicationHistoryProvider { + + /** + * This method should return a list of applications available for the history server to + * show. + * + * @return List of all know applications. + */ + def getListing(): Seq[ApplicationHistoryInfo] + + /** + * This method should return the application information, including a rendered SparkUI. + * + * @param appId The application ID. + * @return The app info, or null if not found. + */ + def getAppInfo(appId: String): ApplicationHistoryInfo + + /** + * Called when the server is shutting down. + */ + def stop(): Unit = { } + + /** + * Returns configuration data to be shown in the HS home page. + * + * @return A map with the configuration data. Data is show in the order returned by the map. + */ + def getConfig(): Map[String, String] = Map() + +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 23d246492aa48..aea11f525d24c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -32,7 +32,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis with Logging { // Interval between each check for event log updates - private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval", 10) * 1000 + private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval", + conf.getInt("spark.history.updateInterval", 10)) * 1000 private val logDir = conf.get("spark.history.fs.logDirectory") private val fs = Utils.getHadoopFileSystem(logDir) @@ -54,7 +55,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private val logCheckingThread = new Thread("LogCheckingThread") { override def run() = Utils.logUncaughtExceptions { while (true) { - val now = getMonotonicTime() + val now = getMonotonicTimeMs() if (now - lastLogCheckTimeMs > UPDATE_INTERVAL_MS) { Thread.sleep(UPDATE_INTERVAL_MS) } else { @@ -97,13 +98,16 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } } + override def getConfig(): Map[String, String] = + Map(("Event Log Location" -> logDir)) + /** * Builds the application list based on the current contents of the log directory. * Tries to reuse as much of the data already in memory as possible, by not reading * applications that haven't been updated since last time the logs were checked. */ - def checkForLogs() = { - lastLogCheckTimeMs = getMonotonicTime() + private def checkForLogs() = { + lastLogCheckTimeMs = getMonotonicTimeMs() logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs)) try { val logStatus = fs.listStatus(new Path(logDir)) @@ -142,6 +146,14 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis /** * Parse the application's logs to find out the information we need to build the * listing page. + * + * When creating the listing of available apps, there is no need to load the whole UI for the + * application. The UI is requested by the HistoryServer (by calling getAppInfo()) when the user + * clicks on a specific application. + * + * @param logDir Directory with application's log files. + * @param renderUI Whether to create the SparkUI for the application. If false, the "ui" + * attribute of the returned object will be null. */ private def loadAppInfo(logDir: FileStatus, renderUI: Boolean): ApplicationHistoryInfo = { val elogInfo = EventLoggingListener.parseLoggingInfo(logDir.getPath(), fs) @@ -188,6 +200,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } /** Returns the system's mononotically increasing time. */ - private def getMonotonicTime() = System.nanoTime() / (1000 * 1000) + private def getMonotonicTimeMs() = System.nanoTime() / (1000 * 1000) } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index 0115a51280b7e..a958c837c2ff6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -25,7 +25,7 @@ import org.apache.spark.ui.{WebUIPage, UIUtils} private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { - val pageSize = 20 + private val pageSize = 20 def render(request: HttpServletRequest): Seq[Node] = { val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt @@ -33,16 +33,20 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { val allApps = parent.getApplicationList() val actualFirst = if (requestedFirst < allApps.size) requestedFirst else 0 - val apps = allApps.slice(actualFirst, Math.min(pageSize, allApps.size)) + val apps = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allApps.size)) val actualPage = (actualFirst / pageSize) + 1 val last = Math.min(actualFirst + pageSize, allApps.size) - 1 val pageCount = allApps.size / pageSize + (if (allApps.size % pageSize > 0) 1 else 0) val appTable = UIUtils.listingTable(appHeader, appRow, apps) + val providerConfig = parent.getProviderConfig() val content =
+
    + { providerConfig.map(e =>
  • {e._1}: {e._2}
  • ) } +
{ if (allApps.size > 0) {

diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 5d172591b5e6c..6819bda8561e8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -54,7 +54,6 @@ class HistoryServer( private val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) private val localHost = Utils.localHostName() - private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost) private val appLoader = new CacheLoader[String, SparkUI] { override def load(key: String): SparkUI = { @@ -80,13 +79,14 @@ class HistoryServer( private val loaderServlet = new HttpServlet { protected override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = { - val parts = req.getPathInfo().split("/") + val parts = Option(req.getPathInfo()).getOrElse("").split("/") if (parts.length < 2) { - res.setStatus(HttpServletResponse.SC_BAD_REQUEST) + res.sendError(HttpServletResponse.SC_BAD_REQUEST, + s"Unexpected path info in request (URI = ${req.getRequestURI()}") return } - var appId = parts(1) + val appId = parts(1) // Note we don't use the UI retrieved from the cache; the cache loader above will register // the app's UI, and all we need to do is redirect the user to the same URI that was @@ -157,6 +157,13 @@ class HistoryServer( */ def getApplicationList() = provider.getListing() + /** + * Returns the provider configuration to show in the listing page. + * + * @return A map with the provider's configuration. + */ + def getProviderConfig() = provider.getConfig() + } /** @@ -174,7 +181,7 @@ object HistoryServer { def main(argStrings: Array[String]) { initSecurity() - parse(argStrings.toList) + val args = new HistoryServerArguments(conf, argStrings) val securityManager = new SecurityManager(conf) val providerName = conf.getOption("spark.history.provider") @@ -212,89 +219,4 @@ object HistoryServer { } } - private def parse(args: List[String]): Unit = { - args match { - case ("--dir" | "-d") :: value :: tail => - set("fs.logDirectory", value) - parse(tail) - - case ("-D") :: opt :: value :: tail => - set(opt, value) - parse(tail) - - case ("--help" | "-h") :: tail => - printUsageAndExit(0) - - case Nil => - - case _ => - printUsageAndExit(1) - } - } - - private def set(name: String, value: String) = { - conf.set("spark.history." + name, value) - } - - private def printUsageAndExit(exitCode: Int) { - System.err.println( - """ - |Usage: HistoryServer [options] - | - |Options are set by passing "-D option value" command line arguments to the class. - |Command line options will override JVM system properties (which should be prepended - |with "spark.history."). - | - |History Server options are always available; additional options depend on the provider. - | - |History Server options: - | - | ui.port Port where server will listen for connections (default 18080) - | ui.acls.enable Whether to enable view acls for all applications (default false) - | provider Name of history provider class (defaults to file system-based provider) - | - |FsHistoryProvider options: - | - | fs.logDirectory Directory where app logs are stored (required) - | fs.updateInterval How often to reload log data from storage (seconds, default 10) - |""".stripMargin) - System.exit(exitCode) - } - -} - -private[spark] abstract class ApplicationHistoryProvider { - - /** - * This method should return a list of applications available for the history server to - * show. - * - * @return List of all know applications. - */ - def getListing(): Seq[ApplicationHistoryInfo] - - /** - * This method should return the application information, including a rendered SparkUI. - * - * @param appId The application ID. - * @return The app info, or null if not found. - */ - def getAppInfo(appId: String): ApplicationHistoryInfo - - /** - * Called when the server is shutting down. - */ - def stop(): Unit = { } - -} - -private[spark] case class ApplicationHistoryInfo( - id: String, - name: String, - startTime: Long, - endTime: Long, - lastUpdated: Long, - sparkUser: String, - viewAcls: String, - ui: SparkUI) { } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala new file mode 100644 index 0000000000000..30896d140d507 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.net.URI + +import org.apache.hadoop.fs.Path + +import org.apache.spark.SparkConf +import org.apache.spark.util.Utils + +/** + * Command-line parser for the master. + */ +private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]) { + var logDir = conf.get("spark.history.fs.logDirectory", null) + + parse(args.toList) + + private def parse(args: List[String]): Unit = { + args match { + case ("--dir" | "-d") :: value :: tail => + logDir = value + parse(tail) + + case ("--help" | "-h") :: tail => + printUsageAndExit(0) + + case Nil => + + case _ => + printUsageAndExit(1) + } + validateLogDir() + conf.set("spark.history.fs.logDirectory", logDir) + } + + private def validateLogDir() { + if (logDir == null) { + System.err.println("Logging directory must be specified.") + printUsageAndExit(1) + } + val fileSystem = Utils.getHadoopFileSystem(new URI(logDir)) + val path = new Path(logDir) + if (!fileSystem.exists(path)) { + System.err.println("Logging directory specified does not exist: %s".format(logDir)) + printUsageAndExit(1) + } + if (!fileSystem.getFileStatus(path).isDir) { + System.err.println("Logging directory specified is not a directory: %s".format(logDir)) + printUsageAndExit(1) + } + } + + private def printUsageAndExit(exitCode: Int) { + System.err.println( + """ + |Usage: HistoryServer [-d logDir] + | + |The preferred way to pass options is to set the configuration below using + |SPARK_HISTORY_OPTS. The "-d" command line argument is avalable for backwards + |compatibility, and overrides "spark.history.fs.logDirectory". + | + |History Server options are always available; additional options depend on the provider. + | + |History Server options: + | + | spark.history.ui.port Port where server will listen for connections (default 18080) + | spark.history.acls.enable Whether to enable view acls for all applications (default false) + | spark.history.provider Name of history provider class (defaults to file system-based + | provider) + | + |FsHistoryProvider options: + | + | spark.history.fs.logDirectory Directory where app logs are stored (required) + | spark.history.fs.updateInterval How often to reload log data from storage (seconds, + | default 10) + |""".stripMargin) + System.exit(exitCode) + } + +} diff --git a/docs/monitoring.md b/docs/monitoring.md index 2b9e9e5bd7ea0..83753511862a2 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -69,7 +69,7 @@ represents an application's event logs. This creates a web interface at - +
Property NameDefaultMeaning
spark.history.updateIntervalspark.history.fs.updateInterval 10 The period, in seconds, at which information displayed by this history server is updated. From 4da3a525060e537311439886d2a0bc2b71d2439c Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 10 Jun 2014 11:52:57 -0700 Subject: [PATCH 16/19] Remove UI from ApplicationHistoryInfo. This reduces the needed memory when lots of applications are listed, since there were 2 pointers wasted per entry to hold UI-specific information. --- .../history/ApplicationHistoryProvider.scala | 10 +++----- .../deploy/history/FsHistoryProvider.scala | 25 +++++++++++-------- .../spark/deploy/history/HistoryServer.scala | 13 +++------- 3 files changed, 23 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index c1f4c34e7a5c4..40d5527ef69ed 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -25,9 +25,7 @@ private[spark] case class ApplicationHistoryInfo( startTime: Long, endTime: Long, lastUpdated: Long, - sparkUser: String, - viewAcls: String, - ui: SparkUI) { + sparkUser: String) { } private[spark] abstract class ApplicationHistoryProvider { @@ -41,12 +39,12 @@ private[spark] abstract class ApplicationHistoryProvider { def getListing(): Seq[ApplicationHistoryInfo] /** - * This method should return the application information, including a rendered SparkUI. + * This method should return the application UI. * * @param appId The application ID. - * @return The app info, or null if not found. + * @return The application's UI, or null if application is not found. */ - def getAppInfo(appId: String): ApplicationHistoryInfo + def getAppUI(appId: String): SparkUI /** * Called when the server is shutting down. diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index aea11f525d24c..a690469104422 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -89,10 +89,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis override def getListing() = appList - override def getAppInfo(appId: String): ApplicationHistoryInfo = { + override def getAppUI(appId: String): SparkUI = { try { val appLogDir = fs.getFileStatus(new Path(logDir, appId)) - loadAppInfo(appLogDir, true) + loadAppInfo(appLogDir, true)._2 } catch { case e: FileNotFoundException => null } @@ -128,7 +128,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis val curr = currentApps.getOrElse(dir.getPath().getName(), null) if (curr == null || curr.lastUpdated < getModificationTime(dir)) { try { - newApps += loadAppInfo(dir, false) + newApps += loadAppInfo(dir, false)._1 } catch { case e: Exception => logError(s"Failed to load app info from directory $dir.") } @@ -152,10 +152,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis * clicks on a specific application. * * @param logDir Directory with application's log files. - * @param renderUI Whether to create the SparkUI for the application. If false, the "ui" - * attribute of the returned object will be null. + * @param renderUI Whether to create the SparkUI for the application. + * @return A 2-tuple `(app info, ui)`. `ui` will be null if `renderUI` is false. */ - private def loadAppInfo(logDir: FileStatus, renderUI: Boolean): ApplicationHistoryInfo = { + private def loadAppInfo(logDir: FileStatus, renderUI: Boolean) = { val elogInfo = EventLoggingListener.parseLoggingInfo(logDir.getPath(), fs) val path = logDir.getPath val appId = path.getName @@ -173,14 +173,19 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } replayBus.replay() - ApplicationHistoryInfo(appId, + val appInfo = ApplicationHistoryInfo(appId, appListener.appName, appListener.startTime, appListener.endTime, getModificationTime(logDir), - appListener.sparkUser, - if (renderUI) appListener.viewAcls else null, - ui) + appListener.sparkUser) + + if (ui != null) { + val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) + ui.getSecurityManager.setUIAcls(uiAclsEnabled) + ui.getSecurityManager.setViewAcls(appListener.sparkUser, appListener.viewAcls) + } + (appInfo, ui) } /** Return when this directory was last modified. */ diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 6819bda8561e8..eb7d9a28a16df 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -50,21 +50,16 @@ class HistoryServer( // How many applications to retain private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50) - // set whether to enable or disable view acls for all applications - private val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) - private val localHost = Utils.localHostName() private val appLoader = new CacheLoader[String, SparkUI] { override def load(key: String): SparkUI = { - val info = provider.getAppInfo(key) - if (info == null) { + val ui = provider.getAppUI(key) + if (ui == null) { throw new NoSuchElementException() } - info.ui.getSecurityManager.setUIAcls(uiAclsEnabled) - info.ui.getSecurityManager.setViewAcls(info.sparkUser, info.viewAcls) - attachSparkUI(info.ui) - info.ui + attachSparkUI(ui) + ui } } From dd8cc4b6af0d739fdefbb59857855453bb4177c3 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 10 Jun 2014 15:13:42 -0700 Subject: [PATCH 17/19] Standardize on using spark.history.* configuration. Update documentation to mention the config options instead of the old command line argument, and update the startup script. --- .../deploy/history/FsHistoryProvider.scala | 6 +++- .../history/HistoryServerArguments.scala | 36 ++++--------------- docs/monitoring.md | 18 +++++++--- sbin/start-history-server.sh | 17 +++++---- 4 files changed, 33 insertions(+), 44 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index a690469104422..15050fb3bc642 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -35,7 +35,11 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval", conf.getInt("spark.history.updateInterval", 10)) * 1000 - private val logDir = conf.get("spark.history.fs.logDirectory") + private val logDir = conf.get("spark.history.fs.logDirectory", null) + if (logDir == null) { + throw new IllegalArgumentException("Logging directory must be specified.") + } + private val fs = Utils.getHadoopFileSystem(logDir) // A timestamp of when the disk was last accessed to check for log updates diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala index 30896d140d507..b522da078cb02 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala @@ -17,10 +17,6 @@ package org.apache.spark.deploy.history -import java.net.URI - -import org.apache.hadoop.fs.Path - import org.apache.spark.SparkConf import org.apache.spark.util.Utils @@ -28,7 +24,7 @@ import org.apache.spark.util.Utils * Command-line parser for the master. */ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]) { - var logDir = conf.get("spark.history.fs.logDirectory", null) + private var logDir: String = null parse(args.toList) @@ -46,36 +42,17 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String] case _ => printUsageAndExit(1) } - validateLogDir() - conf.set("spark.history.fs.logDirectory", logDir) - } - - private def validateLogDir() { - if (logDir == null) { - System.err.println("Logging directory must be specified.") - printUsageAndExit(1) - } - val fileSystem = Utils.getHadoopFileSystem(new URI(logDir)) - val path = new Path(logDir) - if (!fileSystem.exists(path)) { - System.err.println("Logging directory specified does not exist: %s".format(logDir)) - printUsageAndExit(1) - } - if (!fileSystem.getFileStatus(path).isDir) { - System.err.println("Logging directory specified is not a directory: %s".format(logDir)) - printUsageAndExit(1) + if (logDir != null) { + conf.set("spark.history.fs.logDirectory", logDir) } } private def printUsageAndExit(exitCode: Int) { System.err.println( """ - |Usage: HistoryServer [-d logDir] - | - |The preferred way to pass options is to set the configuration below using - |SPARK_HISTORY_OPTS. The "-d" command line argument is avalable for backwards - |compatibility, and overrides "spark.history.fs.logDirectory". + |Usage: HistoryServer | + |Configuration options can be set by setting the corresponding JVM system property. |History Server options are always available; additional options depend on the provider. | |History Server options: @@ -84,7 +61,8 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String] | spark.history.acls.enable Whether to enable view acls for all applications (default false) | spark.history.provider Name of history provider class (defaults to file system-based | provider) - | + | spark.history.retainedApplications Max number of application UIs to keep loaded in memory + | (default 50) |FsHistoryProvider options: | | spark.history.fs.logDirectory Directory where app logs are stored (required) diff --git a/docs/monitoring.md b/docs/monitoring.md index 83753511862a2..71a003a2e264a 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -35,11 +35,13 @@ If Spark is run on Mesos or YARN, it is still possible to reconstruct the UI of application through Spark's history server, provided that the application's event logs exist. You can start a the history server by executing: - ./sbin/start-history-server.sh + ./sbin/start-history-server.sh -The base logging directory must be supplied, and should contain sub-directories that each -represents an application's event logs. This creates a web interface at -`http://:18080` by default. The history server can be configured as follows: +When using the file-system provider class (see spark.history.provider below), the base logging +directory must be supplied in the "spark.history.fs.logDirectory" configuration option, and should +contain sub-directories that each represents an application's event logs. This creates a web +interface at `http://:18080` by default. The history server can be configured as +follows: @@ -68,6 +70,12 @@ represents an application's event logs. This creates a web interface at
Environment VariableMeaning
+ + + + + @@ -78,7 +86,7 @@ represents an application's event logs. This creates a web interface at - +
Property NameDefaultMeaning
spark.history.providerorg.apache.spark.deploy.history.FsHistoryProviderName of the class implementing the application history backend. Currently there is only + one implementation provided by Spark, which matches the default value.
spark.history.fs.updateInterval 10
spark.history.retainedApplications25050 The number of application UIs to retain. If this cap is exceeded, then the oldest applications will be removed. diff --git a/sbin/start-history-server.sh b/sbin/start-history-server.sh index 4a90c68763b68..e30493da32a7a 100755 --- a/sbin/start-history-server.sh +++ b/sbin/start-history-server.sh @@ -19,19 +19,18 @@ # Starts the history server on the machine this script is executed on. # -# Usage: start-history-server.sh [] -# Example: ./start-history-server.sh --dir /tmp/spark-events --port 18080 +# Usage: start-history-server.sh +# +# Use the SPARK_HISTORY_OPTS environment variable to set history server configuration. # sbin=`dirname "$0"` sbin=`cd "$sbin"; pwd` -if [ $# -lt 1 ]; then - echo "Usage: ./start-history-server.sh " - echo "Example: ./start-history-server.sh /tmp/spark-events" - exit +if [ $# != 0 ]; then + echo "Using command line arguments for setting the log directory is deprecated. Please " + echo "set the spark.history.fs.logDirectory configuration option instead." + export SPARK_HISTORY_OPTS="$SPARK_HISTORY_OPTS -Dspark.history.fs.logDirectory=$1" fi -LOG_DIR=$1 - -"$sbin"/spark-daemon.sh start org.apache.spark.deploy.history.HistoryServer 1 --dir "$LOG_DIR" +exec "$sbin"/spark-daemon.sh start org.apache.spark.deploy.history.HistoryServer 1 From c21f8d84bb621dc788df5bad70012f71180a9873 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 11 Jun 2014 10:29:35 -0700 Subject: [PATCH 18/19] Feedback: formatting, docs. --- .../history/ApplicationHistoryProvider.scala | 3 +-- .../deploy/history/FsHistoryProvider.scala | 10 +++++----- .../spark/deploy/history/HistoryServer.scala | 10 ++++------ .../history/HistoryServerArguments.scala | 18 ++++++++++-------- docs/monitoring.md | 9 +++++---- 5 files changed, 25 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index 40d5527ef69ed..d0aedf9276e70 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -25,8 +25,7 @@ private[spark] case class ApplicationHistoryInfo( startTime: Long, endTime: Long, lastUpdated: Long, - sparkUser: String) { -} + sparkUser: String) private[spark] abstract class ApplicationHistoryProvider { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 15050fb3bc642..a8c9ac072449f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -116,10 +116,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis try { val logStatus = fs.listStatus(new Path(logDir)) val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]() - val logInfos = logDirs - .filter { - dir => fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE)) - } + val logInfos = logDirs.filter { + dir => fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE)) + } val currentApps = Map[String, ApplicationHistoryInfo]( appList.map(app => (app.id -> app)):_*) @@ -177,7 +176,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } replayBus.replay() - val appInfo = ApplicationHistoryInfo(appId, + val appInfo = ApplicationHistoryInfo( + appId, appListener.appName, appListener.startTime, appListener.endTime, diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index eb7d9a28a16df..29a78a56c8ed5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -50,8 +50,6 @@ class HistoryServer( // How many applications to retain private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50) - private val localHost = Utils.localHostName() - private val appLoader = new CacheLoader[String, SparkUI] { override def load(key: String): SparkUI = { val ui = provider.getAppUI(key) @@ -192,10 +190,10 @@ object HistoryServer { server.bind() Runtime.getRuntime().addShutdownHook(new Thread("HistoryServerStopper") { - override def run() = { - server.stop() - } - }) + override def run() = { + server.stop() + } + }) // Wait until the end of the world... or if the HistoryServer process is manually stopped while(true) { Thread.sleep(Int.MaxValue) } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala index b522da078cb02..be9361b754fc3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala @@ -57,17 +57,19 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String] | |History Server options: | - | spark.history.ui.port Port where server will listen for connections (default 18080) - | spark.history.acls.enable Whether to enable view acls for all applications (default false) - | spark.history.provider Name of history provider class (defaults to file system-based - | provider) + | spark.history.ui.port Port where server will listen for connections + | (default 18080) + | spark.history.acls.enable Whether to enable view acls for all applications + | (default false) + | spark.history.provider Name of history provider class (defaults to + | file system-based provider) | spark.history.retainedApplications Max number of application UIs to keep loaded in memory - | (default 50) + | (default 50) |FsHistoryProvider options: | - | spark.history.fs.logDirectory Directory where app logs are stored (required) - | spark.history.fs.updateInterval How often to reload log data from storage (seconds, - | default 10) + | spark.history.fs.logDirectory Directory where app logs are stored (required) + | spark.history.fs.updateInterval How often to reload log data from storage (in seconds, + | default 10) |""".stripMargin) System.exit(exitCode) } diff --git a/docs/monitoring.md b/docs/monitoring.md index 71a003a2e264a..84073fe4d949a 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -38,9 +38,9 @@ You can start a the history server by executing: ./sbin/start-history-server.sh When using the file-system provider class (see spark.history.provider below), the base logging -directory must be supplied in the "spark.history.fs.logDirectory" configuration option, and should -contain sub-directories that each represents an application's event logs. This creates a web -interface at `http://:18080` by default. The history server can be configured as +directory must be supplied in the spark.history.fs.logDirectory configuration option, +and should contain sub-directories that each represents an application's event logs. This creates a +web interface at `http://:18080` by default. The history server can be configured as follows: @@ -74,7 +74,8 @@ follows: + one implementation, provided by Spark, which looks for application logs stored in the + file system. From 53620c9ace474ebf5cf733dc9c87ce24c3538edc Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 17 Jun 2014 10:55:51 -0700 Subject: [PATCH 19/19] Add mima exclude, fix scaladoc wording. --- .../spark/deploy/history/ApplicationHistoryProvider.scala | 7 +++---- project/MimaExcludes.scala | 5 ++++- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index d0aedf9276e70..a0e8bd403a41d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -30,15 +30,14 @@ private[spark] case class ApplicationHistoryInfo( private[spark] abstract class ApplicationHistoryProvider { /** - * This method should return a list of applications available for the history server to - * show. + * Returns a list of applications available for the history server to show. * * @return List of all know applications. */ def getListing(): Seq[ApplicationHistoryInfo] /** - * This method should return the application UI. + * Returns the Spark UI for a specific application. * * @param appId The application ID. * @return The application's UI, or null if application is not found. @@ -51,7 +50,7 @@ private[spark] abstract class ApplicationHistoryProvider { def stop(): Unit = { } /** - * Returns configuration data to be shown in the HS home page. + * Returns configuration data to be shown in the History Server home page. * * @return A map with the configuration data. Data is show in the order returned by the map. */ diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 042fdfcc47261..933faab5c9431 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -34,7 +34,10 @@ object MimaExcludes { val excludes = SparkBuild.SPARK_VERSION match { case v if v.startsWith("1.1") => - Seq(MimaBuild.excludeSparkPackage("graphx")) ++ + Seq( + MimaBuild.excludeSparkPackage("deploy"), + MimaBuild.excludeSparkPackage("graphx") + ) ++ Seq( // We made a mistake earlier (ed06500d3) in the Java API to use default parameter values // for countApproxDistinct* functions, which does not work in Java. We later removed
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider Name of the class implementing the application history backend. Currently there is only - one implementation provided by Spark, which matches the default value.
spark.history.fs.updateInterval