- - Event Log Location: {parent.baseLogDir}
+ { providerConfig.map(e => - {e._1}: {e._2}
) }
{
- if (parent.appIdToInfo.size > 0) {
+ if (allApps.size > 0) {
- Showing {parent.appIdToInfo.size}/{parent.getNumApplications}
- Completed Application{if (parent.getNumApplications > 1) "s" else ""}
+ Showing {actualFirst + 1}-{last + 1} of {allApps.size}
+
+ {if (actualPage > 1) <}
+ {if (actualPage < pageCount) >}
+
++
appTable
} else {
@@ -56,26 +72,20 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
"Completed",
"Duration",
"Spark User",
- "Log Directory",
"Last Updated")
private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
- val appName = if (info.started) info.name else info.logDirPath.getName
- val uiAddress = parent.getAddress + info.ui.basePath
- val startTime = if (info.started) UIUtils.formatDate(info.startTime) else "Not started"
- val endTime = if (info.completed) UIUtils.formatDate(info.endTime) else "Not completed"
- val difference = if (info.started && info.completed) info.endTime - info.startTime else -1L
- val duration = if (difference > 0) UIUtils.formatDuration(difference) else "---"
- val sparkUser = if (info.started) info.sparkUser else "Unknown user"
- val logDirectory = info.logDirPath.getName
+ val uiAddress = "/history/" + info.id
+ val startTime = UIUtils.formatDate(info.startTime)
+ val endTime = UIUtils.formatDate(info.endTime)
+ val duration = UIUtils.formatDuration(info.endTime - info.startTime)
val lastUpdated = UIUtils.formatDate(info.lastUpdated)
- | {appName} |
+ {info.name} |
{startTime} |
{endTime} |
{duration} |
- {sparkUser} |
- {logDirectory} |
+ {info.sparkUser} |
{lastUpdated} |
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index a9c11dca5678e..29a78a56c8ed5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -17,14 +17,15 @@
package org.apache.spark.deploy.history
-import scala.collection.mutable
+import java.util.NoSuchElementException
+import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
-import org.apache.hadoop.fs.{FileStatus, Path}
+import com.google.common.cache._
+import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.scheduler._
-import org.apache.spark.ui.{WebUI, SparkUI}
+import org.apache.spark.ui.{WebUI, SparkUI, UIUtils}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.Utils
@@ -38,56 +39,68 @@ import org.apache.spark.util.Utils
* application's event logs are maintained in the application's own sub-directory. This
* is the same structure as maintained in the event log write code path in
* EventLoggingListener.
- *
- * @param baseLogDir The base directory in which event logs are found
*/
class HistoryServer(
- val baseLogDir: String,
+ conf: SparkConf,
+ provider: ApplicationHistoryProvider,
securityManager: SecurityManager,
- conf: SparkConf)
- extends WebUI(securityManager, HistoryServer.WEB_UI_PORT, conf) with Logging {
-
- import HistoryServer._
+ port: Int)
+ extends WebUI(securityManager, port, conf) with Logging {
- private val fileSystem = Utils.getHadoopFileSystem(baseLogDir)
- private val localHost = Utils.localHostName()
- private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost)
+ // How many applications to retain
+ private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50)
- // A timestamp of when the disk was last accessed to check for log updates
- private var lastLogCheckTime = -1L
+ private val appLoader = new CacheLoader[String, SparkUI] {
+ override def load(key: String): SparkUI = {
+ val ui = provider.getAppUI(key)
+ if (ui == null) {
+ throw new NoSuchElementException()
+ }
+ attachSparkUI(ui)
+ ui
+ }
+ }
- // Number of completed applications found in this directory
- private var numCompletedApplications = 0
+ private val appCache = CacheBuilder.newBuilder()
+ .maximumSize(retainedApplications)
+ .removalListener(new RemovalListener[String, SparkUI] {
+ override def onRemoval(rm: RemovalNotification[String, SparkUI]) = {
+ detachSparkUI(rm.getValue())
+ }
+ })
+ .build(appLoader)
+
+ private val loaderServlet = new HttpServlet {
+ protected override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = {
+ val parts = Option(req.getPathInfo()).getOrElse("").split("/")
+ if (parts.length < 2) {
+ res.sendError(HttpServletResponse.SC_BAD_REQUEST,
+ s"Unexpected path info in request (URI = ${req.getRequestURI()}")
+ return
+ }
- @volatile private var stopped = false
+ val appId = parts(1)
- /**
- * A background thread that periodically checks for event log updates on disk.
- *
- * If a log check is invoked manually in the middle of a period, this thread re-adjusts the
- * time at which it performs the next log check to maintain the same period as before.
- *
- * TODO: Add a mechanism to update manually.
- */
- private val logCheckingThread = new Thread {
- override def run(): Unit = Utils.logUncaughtExceptions {
- while (!stopped) {
- val now = System.currentTimeMillis
- if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) {
- checkForLogs()
- Thread.sleep(UPDATE_INTERVAL_MS)
- } else {
- // If the user has manually checked for logs recently, wait until
- // UPDATE_INTERVAL_MS after the last check time
- Thread.sleep(lastLogCheckTime + UPDATE_INTERVAL_MS - now)
+ // Note we don't use the UI retrieved from the cache; the cache loader above will register
+ // the app's UI, and all we need to do is redirect the user to the same URI that was
+ // requested, and the proper data should be served at that point.
+ try {
+ appCache.get(appId)
+ res.sendRedirect(res.encodeRedirectURL(req.getRequestURI()))
+ } catch {
+ case e: Exception => e.getCause() match {
+ case nsee: NoSuchElementException =>
+ val msg =
Application {appId} not found.
+ res.setStatus(HttpServletResponse.SC_NOT_FOUND)
+ UIUtils.basicSparkPage(msg, "Not Found").foreach(
+ n => res.getWriter().write(n.toString))
+
+ case cause: Exception => throw cause
}
}
}
}
- // A mapping of application ID to its history information, which includes the rendered UI
- val appIdToInfo = mutable.HashMap[String, ApplicationHistoryInfo]()
-
initialize()
/**
@@ -98,108 +111,23 @@ class HistoryServer(
*/
def initialize() {
attachPage(new HistoryPage(this))
- attachHandler(createStaticHandler(STATIC_RESOURCE_DIR, "/static"))
+ attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
+
+ val contextHandler = new ServletContextHandler
+ contextHandler.setContextPath("/history")
+ contextHandler.addServlet(new ServletHolder(loaderServlet), "/*")
+ attachHandler(contextHandler)
}
/** Bind to the HTTP server behind this web interface. */
override def bind() {
super.bind()
- logCheckingThread.start()
- }
-
- /**
- * Check for any updates to event logs in the base directory. This is only effective once
- * the server has been bound.
- *
- * If a new completed application is found, the server renders the associated SparkUI
- * from the application's event logs, attaches this UI to itself, and stores metadata
- * information for this application.
- *
- * If the logs for an existing completed application are no longer found, the server
- * removes all associated information and detaches the SparkUI.
- */
- def checkForLogs() = synchronized {
- if (serverInfo.isDefined) {
- lastLogCheckTime = System.currentTimeMillis
- logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTime))
- try {
- val logStatus = fileSystem.listStatus(new Path(baseLogDir))
- val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
- val logInfos = logDirs
- .sortBy { dir => getModificationTime(dir) }
- .map { dir => (dir, EventLoggingListener.parseLoggingInfo(dir.getPath, fileSystem)) }
- .filter { case (dir, info) => info.applicationComplete }
-
- // Logging information for applications that should be retained
- val retainedLogInfos = logInfos.takeRight(RETAINED_APPLICATIONS)
- val retainedAppIds = retainedLogInfos.map { case (dir, _) => dir.getPath.getName }
-
- // Remove any applications that should no longer be retained
- appIdToInfo.foreach { case (appId, info) =>
- if (!retainedAppIds.contains(appId)) {
- detachSparkUI(info.ui)
- appIdToInfo.remove(appId)
- }
- }
-
- // Render the application's UI if it is not already there
- retainedLogInfos.foreach { case (dir, info) =>
- val appId = dir.getPath.getName
- if (!appIdToInfo.contains(appId)) {
- renderSparkUI(dir, info)
- }
- }
-
- // Track the total number of completed applications observed this round
- numCompletedApplications = logInfos.size
-
- } catch {
- case e: Exception => logError("Exception in checking for event log updates", e)
- }
- } else {
- logWarning("Attempted to check for event log updates before binding the server.")
- }
- }
-
- /**
- * Render a new SparkUI from the event logs if the associated application is completed.
- *
- * HistoryServer looks for a special file that indicates application completion in the given
- * directory. If this file exists, the associated application is regarded to be completed, in
- * which case the server proceeds to render the SparkUI. Otherwise, the server does nothing.
- */
- private def renderSparkUI(logDir: FileStatus, elogInfo: EventLoggingInfo) {
- val path = logDir.getPath
- val appId = path.getName
- val replayBus = new ReplayListenerBus(elogInfo.logPaths, fileSystem, elogInfo.compressionCodec)
- val appListener = new ApplicationEventListener
- replayBus.addListener(appListener)
- val appConf = conf.clone()
- val appSecManager = new SecurityManager(appConf)
- val ui = new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId)
-
- // Do not call ui.bind() to avoid creating a new server for each application
- replayBus.replay()
- if (appListener.applicationStarted) {
- appSecManager.setUIAcls(HISTORY_UI_ACLS_ENABLED)
- appSecManager.setViewAcls(appListener.sparkUser, appListener.viewAcls)
- attachSparkUI(ui)
- val appName = appListener.appName
- val sparkUser = appListener.sparkUser
- val startTime = appListener.startTime
- val endTime = appListener.endTime
- val lastUpdated = getModificationTime(logDir)
- ui.setAppName(appName + " (completed)")
- appIdToInfo(appId) = ApplicationHistoryInfo(appId, appName, startTime, endTime,
- lastUpdated, sparkUser, path, ui)
- }
}
/** Stop the server and close the file system. */
override def stop() {
super.stop()
- stopped = true
- fileSystem.close()
+ provider.stop()
}
/** Attach a reconstructed UI to this server. Only valid after bind(). */
@@ -215,27 +143,20 @@ class HistoryServer(
ui.getHandlers.foreach(detachHandler)
}
- /** Return the address of this server. */
- def getAddress: String = "http://" + publicHost + ":" + boundPort
+ /**
+ * Returns a list of available applications, in descending order according to their end time.
+ *
+ * @return List of all known applications.
+ */
+ def getApplicationList() = provider.getListing()
- /** Return the number of completed applications found, whether or not the UI is rendered. */
- def getNumApplications: Int = numCompletedApplications
+ /**
+ * Returns the provider configuration to show in the listing page.
+ *
+ * @return A map with the provider's configuration.
+ */
+ def getProviderConfig() = provider.getConfig()
- /** Return when this directory was last modified. */
- private def getModificationTime(dir: FileStatus): Long = {
- try {
- val logFiles = fileSystem.listStatus(dir.getPath)
- if (logFiles != null && !logFiles.isEmpty) {
- logFiles.map(_.getModificationTime).max
- } else {
- dir.getModificationTime
- }
- } catch {
- case e: Exception =>
- logError("Exception in accessing modification time of %s".format(dir.getPath), e)
- -1L
- }
- }
}
/**
@@ -251,30 +172,31 @@ class HistoryServer(
object HistoryServer {
private val conf = new SparkConf
- // Interval between each check for event log updates
- val UPDATE_INTERVAL_MS = conf.getInt("spark.history.updateInterval", 10) * 1000
-
- // How many applications to retain
- val RETAINED_APPLICATIONS = conf.getInt("spark.history.retainedApplications", 250)
-
- // The port to which the web UI is bound
- val WEB_UI_PORT = conf.getInt("spark.history.ui.port", 18080)
-
- // set whether to enable or disable view acls for all applications
- val HISTORY_UI_ACLS_ENABLED = conf.getBoolean("spark.history.ui.acls.enable", false)
-
- val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR
-
def main(argStrings: Array[String]) {
initSecurity()
- val args = new HistoryServerArguments(argStrings)
+ val args = new HistoryServerArguments(conf, argStrings)
val securityManager = new SecurityManager(conf)
- val server = new HistoryServer(args.logDir, securityManager, conf)
+
+ val providerName = conf.getOption("spark.history.provider")
+ .getOrElse(classOf[FsHistoryProvider].getName())
+ val provider = Class.forName(providerName)
+ .getConstructor(classOf[SparkConf])
+ .newInstance(conf)
+ .asInstanceOf[ApplicationHistoryProvider]
+
+ val port = conf.getInt("spark.history.ui.port", 18080)
+
+ val server = new HistoryServer(conf, provider, securityManager, port)
server.bind()
+ Runtime.getRuntime().addShutdownHook(new Thread("HistoryServerStopper") {
+ override def run() = {
+ server.stop()
+ }
+ })
+
// Wait until the end of the world... or if the HistoryServer process is manually stopped
while(true) { Thread.sleep(Int.MaxValue) }
- server.stop()
}
def initSecurity() {
@@ -291,17 +213,3 @@ object HistoryServer {
}
}
-
-
-private[spark] case class ApplicationHistoryInfo(
- id: String,
- name: String,
- startTime: Long,
- endTime: Long,
- lastUpdated: Long,
- sparkUser: String,
- logDirPath: Path,
- ui: SparkUI) {
- def started = startTime != -1
- def completed = endTime != -1
-}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
index 943c061743dbd..be9361b754fc3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
@@ -17,17 +17,14 @@
package org.apache.spark.deploy.history
-import java.net.URI
-
-import org.apache.hadoop.fs.Path
-
+import org.apache.spark.SparkConf
import org.apache.spark.util.Utils
/**
* Command-line parser for the master.
*/
-private[spark] class HistoryServerArguments(args: Array[String]) {
- var logDir = ""
+private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]) {
+ private var logDir: String = null
parse(args.toList)
@@ -45,32 +42,36 @@ private[spark] class HistoryServerArguments(args: Array[String]) {
case _ =>
printUsageAndExit(1)
}
- validateLogDir()
- }
-
- private def validateLogDir() {
- if (logDir == "") {
- System.err.println("Logging directory must be specified.")
- printUsageAndExit(1)
- }
- val fileSystem = Utils.getHadoopFileSystem(new URI(logDir))
- val path = new Path(logDir)
- if (!fileSystem.exists(path)) {
- System.err.println("Logging directory specified does not exist: %s".format(logDir))
- printUsageAndExit(1)
- }
- if (!fileSystem.getFileStatus(path).isDir) {
- System.err.println("Logging directory specified is not a directory: %s".format(logDir))
- printUsageAndExit(1)
+ if (logDir != null) {
+ conf.set("spark.history.fs.logDirectory", logDir)
}
}
private def printUsageAndExit(exitCode: Int) {
System.err.println(
- "Usage: HistoryServer [options]\n" +
- "\n" +
- "Options:\n" +
- " -d DIR, --dir DIR Location of event log files")
+ """
+ |Usage: HistoryServer
+ |
+ |Configuration options can be set by setting the corresponding JVM system property.
+ |History Server options are always available; additional options depend on the provider.
+ |
+ |History Server options:
+ |
+ | spark.history.ui.port Port where server will listen for connections
+ | (default 18080)
+ | spark.history.acls.enable Whether to enable view acls for all applications
+ | (default false)
+ | spark.history.provider Name of history provider class (defaults to
+ | file system-based provider)
+ | spark.history.retainedApplications Max number of application UIs to keep loaded in memory
+ | (default 50)
+ |FsHistoryProvider options:
+ |
+ | spark.history.fs.logDirectory Directory where app logs are stored (required)
+ | spark.history.fs.updateInterval How often to reload log data from storage (in seconds,
+ | default 10)
+ |""".stripMargin)
System.exit(exitCode)
}
+
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 6433aac1c23e0..467317dd9b44c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -77,6 +77,7 @@ private[spark] class ExecutorRunner(
* @param message the exception message which caused the executor's death
*/
private def killProcess(message: Option[String]) {
+ var exitCode: Option[Int] = None
if (process != null) {
logInfo("Killing process!")
process.destroy()
@@ -87,9 +88,9 @@ private[spark] class ExecutorRunner(
if (stderrAppender != null) {
stderrAppender.stop()
}
- val exitCode = process.waitFor()
- worker ! ExecutorStateChanged(appId, execId, state, message, Some(exitCode))
+ exitCode = Some(process.waitFor())
}
+ worker ! ExecutorStateChanged(appId, execId, state, message, exitCode)
}
/** Stop this executor runner, including killing the process it launched */
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
index 6a5ffb1b71bfb..b389cb546de6c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
@@ -120,7 +120,7 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") w