From f96aa78bb65513fc71948bc8496b119fa0625886 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 7 May 2014 16:30:02 -0700 Subject: [PATCH 01/12] Separate history server from history backend. This change does two things, mainly: - Separate the logic of serving application history from fetching application history from the underlying storage. Not only this cleans up the code a little bit, but it also serves as initial work for SPARK-1537, where we may want to fetch application data from Yarn instead of HDFS. I've kept the current command line options working, but I changed the way configuration is set to be mostly based on SparkConf, so that it's easy to support new providers later. - Make it so the UI for each application is loaded lazily. The UIs are cached in memory (cache size configurable) for faster subsequent access. This means that we don't need a limit for the number of applications listed; the list should fit comfortably in memory (since it holds much less data). Because of this I lowered the number of applications kept in memory to 50 (since that setting doesn't influence the number of apps listed anymore). Later, we may want to provide paging in the listing UI, and also spilling the listing to disk and loading it on demand to avoid large memory usage / slow startup. --- .../deploy/history/FsHistoryProvider.scala | 205 +++++++++++ .../spark/deploy/history/HistoryPage.scala | 20 +- .../spark/deploy/history/HistoryServer.scala | 333 +++++++++--------- .../history/HistoryServerArguments.scala | 76 ---- .../scala/org/apache/spark/ui/WebUI.scala | 1 + 5 files changed, 379 insertions(+), 256 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala delete mode 100644 core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala new file mode 100644 index 0000000000000..884eca34168ca --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.FileNotFoundException +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable + +import org.apache.hadoop.fs.{FileStatus, Path} + +import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.scheduler._ +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.Utils + +class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider + with Logging { + + // Interval between each check for event log updates + private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval", 10) * 1000 + + private val logDir = conf.get("spark.history.fs.logDirectory") + private val fs = Utils.getHadoopFileSystem(logDir) + + // A timestamp of when the disk was last accessed to check for log updates + private var lastLogCheckTime = -1L + + // List of applications, in order from newest to oldest. + private val appList = new AtomicReference[Seq[ApplicationHistoryInfo]](Nil) + + /** + * A background thread that periodically checks for event log updates on disk. + * + * If a log check is invoked manually in the middle of a period, this thread re-adjusts the + * time at which it performs the next log check to maintain the same period as before. + * + * TODO: Add a mechanism to update manually. + */ + private val logCheckingThread = new Thread("LogCheckingThread") { + override def run() = Utils.logUncaughtExceptions { + while (!stopped) { + val now = System.currentTimeMillis + if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) { + Thread.sleep(UPDATE_INTERVAL_MS) + } else { + // If the user has manually checked for logs recently, wait until + // UPDATE_INTERVAL_MS after the last check time + Thread.sleep(lastLogCheckTime + UPDATE_INTERVAL_MS - now) + } + checkForLogs() + } + } + } + + @volatile private var stopped = false + + initialize() + + private def initialize() { + // Validate the log directory. + val path = new Path(logDir) + if (!fs.exists(path)) { + throw new IllegalArgumentException("Logging directory specified does not exist: %s".format(logDir)) + } + if (!fs.getFileStatus(path).isDir) { + throw new IllegalArgumentException("Logging directory specified is not a directory: %s".format(logDir)) + } + + checkForLogs() + logCheckingThread.start() + } + + override def stop() = { + stopped = true + logCheckingThread.interrupt() + logCheckingThread.join() + } + + override def getListing(offset: Int, limit: Int): Seq[ApplicationHistoryInfo] = { + appList.get() + } + + override def getAppInfo(appId: String): ApplicationHistoryInfo = { + try { + val appLogDir = fs.getFileStatus(new Path(logDir, appId)) + loadAppInfo(appLogDir, true) + } catch { + case e: FileNotFoundException => null + } + } + + /** + * Check for any updates to event logs in the base directory. This is only effective once + * the server has been bound. + * + * If a new completed application is found, the server renders the associated SparkUI + * from the application's event logs, attaches this UI to itself, and stores metadata + * information for this application. + * + * If the logs for an existing completed application are no longer found, the server + * removes all associated information and detaches the SparkUI. + */ + def checkForLogs() = synchronized { + lastLogCheckTime = System.currentTimeMillis + logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTime)) + try { + val logStatus = fs.listStatus(new Path(logDir)) + val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]() + val logInfos = logDirs + .sortBy { dir => getModificationTime(dir) } + .filter { + dir => fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE)) + } + + var currentApps = Map[String, ApplicationHistoryInfo]( + appList.get().map(app => (app.id -> app)):_*) + + // For any application that either (i) is not listed or (ii) has changed since the last time + // the listing was created (defined by the log dir's modification time), load the app's info. + // Otherwise just reuse what's already in memory. + appList.set(logInfos + .map { dir => + val curr = currentApps.getOrElse(dir.getPath().getName(), null) + if (curr == null || curr.lastUpdated < getModificationTime(dir)) { + loadAppInfo(dir, false) + } else { + curr + } + } + .sortBy { info => -info.lastUpdated }) + } catch { + case t: Throwable => logError("Exception in checking for event log updates", t) + } + } + + /** + * Parse the application's logs to find out the information we need to build the + * listing page. + */ + private def loadAppInfo(logDir: FileStatus, renderUI: Boolean): ApplicationHistoryInfo = { + val elogInfo = EventLoggingListener.parseLoggingInfo(logDir.getPath(), fs) + val path = logDir.getPath + val appId = path.getName + val replayBus = new ReplayListenerBus(elogInfo.logPaths, fs, elogInfo.compressionCodec) + val appListener = new ApplicationEventListener + replayBus.addListener(appListener) + + val ui: SparkUI = if (renderUI) { + val conf = this.conf.clone() + val appSecManager = new SecurityManager(conf) + new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId) + // Do not call ui.bind() to avoid creating a new server for each application + } else { + null + } + + replayBus.replay() + val appName = appListener.appName + val sparkUser = appListener.sparkUser + val startTime = appListener.startTime + val endTime = appListener.endTime + val lastUpdated = getModificationTime(logDir) + ApplicationHistoryInfo(appId, + appListener.appName, + appListener.startTime, + appListener.endTime, + getModificationTime(logDir), + appListener.sparkUser, + if (renderUI) appListener.viewAcls else null, + ui) + } + + /** Return when this directory was last modified. */ + private def getModificationTime(dir: FileStatus): Long = { + try { + val logFiles = fs.listStatus(dir.getPath) + if (logFiles != null && !logFiles.isEmpty) { + logFiles.map(_.getModificationTime).max + } else { + dir.getModificationTime + } + } catch { + case t: Throwable => + logError("Exception in accessing modification time of %s".format(dir.getPath), t) + -1L + } + } + +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index 180c853ce3096..b933d58a5d8dd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -26,19 +26,16 @@ import org.apache.spark.ui.{WebUIPage, UIUtils} private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { def render(request: HttpServletRequest): Seq[Node] = { - val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated } - val appTable = UIUtils.listingTable(appHeader, appRow, appRows) + val apps = parent.getApplicationList(0, -1) + val appTable = UIUtils.listingTable(appHeader, appRow, apps) val content =
-
    -
  • Event Log Location: {parent.baseLogDir}
  • -
{ - if (parent.appIdToInfo.size > 0) { + if (apps.size > 0) {

- Showing {parent.appIdToInfo.size}/{parent.getNumApplications} - Completed Application{if (parent.getNumApplications > 1) "s" else ""} + Showing {apps.size}/{apps.size} + Completed Application{if (apps.size > 1) "s" else ""}

++ appTable } else { @@ -56,18 +53,16 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { "Completed", "Duration", "Spark User", - "Log Directory", "Last Updated") private def appRow(info: ApplicationHistoryInfo): Seq[Node] = { - val appName = if (info.started) info.name else info.logDirPath.getName - val uiAddress = parent.getAddress + info.ui.basePath + val appName = if (info.started) info.name else info.id + val uiAddress = "/history/" + info.id val startTime = if (info.started) UIUtils.formatDate(info.startTime) else "Not started" val endTime = if (info.completed) UIUtils.formatDate(info.endTime) else "Not completed" val difference = if (info.started && info.completed) info.endTime - info.startTime else -1L val duration = if (difference > 0) UIUtils.formatDuration(difference) else "---" val sparkUser = if (info.started) info.sparkUser else "Unknown user" - val logDirectory = info.logDirPath.getName val lastUpdated = UIUtils.formatDate(info.lastUpdated) {appName} @@ -75,7 +70,6 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { {endTime} {duration} {sparkUser} - {logDirectory} {lastUpdated} } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index a9c11dca5678e..c8e1dfa6fa220 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -17,14 +17,15 @@ package org.apache.spark.deploy.history -import scala.collection.mutable +import java.util.NoSuchElementException +import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} -import org.apache.hadoop.fs.{FileStatus, Path} +import com.google.common.cache._ +import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder} import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.scheduler._ -import org.apache.spark.ui.{WebUI, SparkUI} +import org.apache.spark.ui.{WebUI, SparkUI, UIUtils} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.Utils @@ -38,56 +39,76 @@ import org.apache.spark.util.Utils * application's event logs are maintained in the application's own sub-directory. This * is the same structure as maintained in the event log write code path in * EventLoggingListener. - * - * @param baseLogDir The base directory in which event logs are found */ class HistoryServer( - val baseLogDir: String, + conf: SparkConf, + provider: ApplicationHistoryProvider, securityManager: SecurityManager, - conf: SparkConf) - extends WebUI(securityManager, HistoryServer.WEB_UI_PORT, conf) with Logging { + port: Int) + extends WebUI(securityManager, port, conf) with Logging { + + // How many applications to retain + private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50) - import HistoryServer._ + // set whether to enable or disable view acls for all applications + private val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) - private val fileSystem = Utils.getHadoopFileSystem(baseLogDir) private val localHost = Utils.localHostName() private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost) - // A timestamp of when the disk was last accessed to check for log updates - private var lastLogCheckTime = -1L + private val appLoader = new CacheLoader[String, SparkUI] { + override def load(key: String): SparkUI = { + val info = provider.getAppInfo(key) + if (info != null) { + info.ui.getSecurityManager.setUIAcls(uiAclsEnabled) + info.ui.getSecurityManager.setViewAcls(info.sparkUser, info.viewAcls) + attachSparkUI(info.ui) + info.ui + } else { + throw new NoSuchElementException() + } + } + } - // Number of completed applications found in this directory - private var numCompletedApplications = 0 + private val appCache = CacheBuilder.newBuilder() + .maximumSize(retainedApplications) + .removalListener(new RemovalListener[String, SparkUI] { + override def onRemoval(rm: RemovalNotification[String, SparkUI]) = { + detachSparkUI(rm.getValue()) + } + }) + .build(appLoader) + + private val loaderServlet = new HttpServlet { + protected override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = { + val parts = req.getPathInfo().split("/") + if (parts.length < 2) { + res.setStatus(HttpServletResponse.SC_BAD_REQUEST) + return + } - @volatile private var stopped = false + var appId = parts(1) - /** - * A background thread that periodically checks for event log updates on disk. - * - * If a log check is invoked manually in the middle of a period, this thread re-adjusts the - * time at which it performs the next log check to maintain the same period as before. - * - * TODO: Add a mechanism to update manually. - */ - private val logCheckingThread = new Thread { - override def run(): Unit = Utils.logUncaughtExceptions { - while (!stopped) { - val now = System.currentTimeMillis - if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) { - checkForLogs() - Thread.sleep(UPDATE_INTERVAL_MS) - } else { - // If the user has manually checked for logs recently, wait until - // UPDATE_INTERVAL_MS after the last check time - Thread.sleep(lastLogCheckTime + UPDATE_INTERVAL_MS - now) + // Note we don't use the UI retrieved from the cache; the cache loader above will register + // the app's UI, and all we need to do is redirect the user to the same URI that was + // requested, and the proper data should be served at that point. + try { + appCache.get(appId) + res.sendRedirect(res.encodeRedirectURL(req.getRequestURI())) + } catch { + case e: Exception => e.getCause() match { + case nsee: NoSuchElementException => + val msg =
Application {appId} not found.
+ res.setStatus(HttpServletResponse.SC_NOT_FOUND) + UIUtils.basicSparkPage(msg, "Not Found").foreach( + n => res.getWriter().write(n.toString)) + + case cause: Exception => throw cause } } } } - // A mapping of application ID to its history information, which includes the rendered UI - val appIdToInfo = mutable.HashMap[String, ApplicationHistoryInfo]() - initialize() /** @@ -98,108 +119,23 @@ class HistoryServer( */ def initialize() { attachPage(new HistoryPage(this)) - attachHandler(createStaticHandler(STATIC_RESOURCE_DIR, "/static")) + attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) + + val contextHandler = new ServletContextHandler + contextHandler.setContextPath("/history") + contextHandler.addServlet(new ServletHolder(loaderServlet), "/*") + attachHandler(contextHandler) } /** Bind to the HTTP server behind this web interface. */ override def bind() { super.bind() - logCheckingThread.start() - } - - /** - * Check for any updates to event logs in the base directory. This is only effective once - * the server has been bound. - * - * If a new completed application is found, the server renders the associated SparkUI - * from the application's event logs, attaches this UI to itself, and stores metadata - * information for this application. - * - * If the logs for an existing completed application are no longer found, the server - * removes all associated information and detaches the SparkUI. - */ - def checkForLogs() = synchronized { - if (serverInfo.isDefined) { - lastLogCheckTime = System.currentTimeMillis - logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTime)) - try { - val logStatus = fileSystem.listStatus(new Path(baseLogDir)) - val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]() - val logInfos = logDirs - .sortBy { dir => getModificationTime(dir) } - .map { dir => (dir, EventLoggingListener.parseLoggingInfo(dir.getPath, fileSystem)) } - .filter { case (dir, info) => info.applicationComplete } - - // Logging information for applications that should be retained - val retainedLogInfos = logInfos.takeRight(RETAINED_APPLICATIONS) - val retainedAppIds = retainedLogInfos.map { case (dir, _) => dir.getPath.getName } - - // Remove any applications that should no longer be retained - appIdToInfo.foreach { case (appId, info) => - if (!retainedAppIds.contains(appId)) { - detachSparkUI(info.ui) - appIdToInfo.remove(appId) - } - } - - // Render the application's UI if it is not already there - retainedLogInfos.foreach { case (dir, info) => - val appId = dir.getPath.getName - if (!appIdToInfo.contains(appId)) { - renderSparkUI(dir, info) - } - } - - // Track the total number of completed applications observed this round - numCompletedApplications = logInfos.size - - } catch { - case e: Exception => logError("Exception in checking for event log updates", e) - } - } else { - logWarning("Attempted to check for event log updates before binding the server.") - } - } - - /** - * Render a new SparkUI from the event logs if the associated application is completed. - * - * HistoryServer looks for a special file that indicates application completion in the given - * directory. If this file exists, the associated application is regarded to be completed, in - * which case the server proceeds to render the SparkUI. Otherwise, the server does nothing. - */ - private def renderSparkUI(logDir: FileStatus, elogInfo: EventLoggingInfo) { - val path = logDir.getPath - val appId = path.getName - val replayBus = new ReplayListenerBus(elogInfo.logPaths, fileSystem, elogInfo.compressionCodec) - val appListener = new ApplicationEventListener - replayBus.addListener(appListener) - val appConf = conf.clone() - val appSecManager = new SecurityManager(appConf) - val ui = new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId) - - // Do not call ui.bind() to avoid creating a new server for each application - replayBus.replay() - if (appListener.applicationStarted) { - appSecManager.setUIAcls(HISTORY_UI_ACLS_ENABLED) - appSecManager.setViewAcls(appListener.sparkUser, appListener.viewAcls) - attachSparkUI(ui) - val appName = appListener.appName - val sparkUser = appListener.sparkUser - val startTime = appListener.startTime - val endTime = appListener.endTime - val lastUpdated = getModificationTime(logDir) - ui.setAppName(appName + " (completed)") - appIdToInfo(appId) = ApplicationHistoryInfo(appId, appName, startTime, endTime, - lastUpdated, sparkUser, path, ui) - } } /** Stop the server and close the file system. */ override def stop() { super.stop() - stopped = true - fileSystem.close() + provider.stop() } /** Attach a reconstructed UI to this server. Only valid after bind(). */ @@ -215,27 +151,17 @@ class HistoryServer( ui.getHandlers.foreach(detachHandler) } - /** Return the address of this server. */ - def getAddress: String = "http://" + publicHost + ":" + boundPort - - /** Return the number of completed applications found, whether or not the UI is rendered. */ - def getNumApplications: Int = numCompletedApplications - - /** Return when this directory was last modified. */ - private def getModificationTime(dir: FileStatus): Long = { - try { - val logFiles = fileSystem.listStatus(dir.getPath) - if (logFiles != null && !logFiles.isEmpty) { - logFiles.map(_.getModificationTime).max - } else { - dir.getModificationTime - } - } catch { - case e: Exception => - logError("Exception in accessing modification time of %s".format(dir.getPath), e) - -1L - } + /** + * Returns a list of available applications, in descending order according to their last + * updated time. + * + * @param offset Offset of the first entry to return. + * @param limit Maximum number of entries to return (-1 = no limit). + */ + def getApplicationList(offset: Int, limit: Int) = { + provider.getListing(offset, limit) } + } /** @@ -251,25 +177,21 @@ class HistoryServer( object HistoryServer { private val conf = new SparkConf - // Interval between each check for event log updates - val UPDATE_INTERVAL_MS = conf.getInt("spark.history.updateInterval", 10) * 1000 - - // How many applications to retain - val RETAINED_APPLICATIONS = conf.getInt("spark.history.retainedApplications", 250) - - // The port to which the web UI is bound - val WEB_UI_PORT = conf.getInt("spark.history.ui.port", 18080) - - // set whether to enable or disable view acls for all applications - val HISTORY_UI_ACLS_ENABLED = conf.getBoolean("spark.history.ui.acls.enable", false) - - val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR - def main(argStrings: Array[String]) { initSecurity() - val args = new HistoryServerArguments(argStrings) + parse(argStrings.toList) val securityManager = new SecurityManager(conf) - val server = new HistoryServer(args.logDir, securityManager, conf) + + val providerName = conf.getOption("spark.history.provider") + .getOrElse(classOf[FsHistoryProvider].getName()) + val provider = Class.forName(providerName) + .getConstructor(classOf[SparkConf]) + .newInstance(conf) + .asInstanceOf[ApplicationHistoryProvider] + + val port = conf.getInt("spark.history.ui.port", 18080) + + val server = new HistoryServer(conf, provider, securityManager, port) server.bind() // Wait until the end of the world... or if the HistoryServer process is manually stopped @@ -290,8 +212,85 @@ object HistoryServer { } } + private def parse(args: List[String]): Unit = { + args match { + case ("--dir" | "-d") :: value :: tail => + set("fs.logDirectory", value) + parse(tail) + + case ("--port" | "-p") :: value :: tail => + set("ui.port", value) + parse(tail) + + case ("-D") :: opt :: value :: tail => + set(opt, value) + parse(tail) + + case ("--help" | "-h") :: tail => + printUsageAndExit(0) + + case Nil => + + case _ => + printUsageAndExit(1) + } + } + + private def set(name: String, value: String) = { + conf.set("spark.history." + name, value) + } + + private def printUsageAndExit(exitCode: Int) { + System.err.println( + """ + |Usage: HistoryServer [options] + | + |Options are set by passing "-D option value" command line arguments to the class. + |Command line options will override the Spark configuration file and system properties. + |History Server options are always available; additional options depend on the provider. + | + |History Server options: + | + | ui.port Port where server will listen for connections (default 18080) + | ui.acls.enable Whether to enable view acls for all applications (default false) + | provider Name of history provider class (defaults to file system-based provider) + | + |FsHistoryProvider options: + | + | fs.logDirectory Directory where app logs are stored (required) + | fs.updateInterval How often to reload log data from storage (seconds, default 10) + |""".stripMargin) + System.exit(exitCode) + } + } +private[spark] abstract class ApplicationHistoryProvider { + + /** + * This method should return a list of applications available for the history server to + * show. The listing is assumed to be in descending time order (so that the parameters + * make sense). + * + * @param offset Offset of the first entry to return. + * @param limit Maximum number of entries to return (-1 = no limit). + */ + def getListing(offset: Int, limit: Int): Seq[ApplicationHistoryInfo] + + /** + * This method should return the application information, including a rendered SparkUI. + * + * @param appId The application ID. + * @return The app info, or null if not found. + */ + def getAppInfo(appId: String): ApplicationHistoryInfo + + /** + * Called when the server is shutting down. + */ + def stop(): Unit = { } + +} private[spark] case class ApplicationHistoryInfo( id: String, @@ -300,7 +299,7 @@ private[spark] case class ApplicationHistoryInfo( endTime: Long, lastUpdated: Long, sparkUser: String, - logDirPath: Path, + viewAcls: String, ui: SparkUI) { def started = startTime != -1 def completed = endTime != -1 diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala deleted file mode 100644 index 943c061743dbd..0000000000000 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.history - -import java.net.URI - -import org.apache.hadoop.fs.Path - -import org.apache.spark.util.Utils - -/** - * Command-line parser for the master. - */ -private[spark] class HistoryServerArguments(args: Array[String]) { - var logDir = "" - - parse(args.toList) - - private def parse(args: List[String]): Unit = { - args match { - case ("--dir" | "-d") :: value :: tail => - logDir = value - parse(tail) - - case ("--help" | "-h") :: tail => - printUsageAndExit(0) - - case Nil => - - case _ => - printUsageAndExit(1) - } - validateLogDir() - } - - private def validateLogDir() { - if (logDir == "") { - System.err.println("Logging directory must be specified.") - printUsageAndExit(1) - } - val fileSystem = Utils.getHadoopFileSystem(new URI(logDir)) - val path = new Path(logDir) - if (!fileSystem.exists(path)) { - System.err.println("Logging directory specified does not exist: %s".format(logDir)) - printUsageAndExit(1) - } - if (!fileSystem.getFileStatus(path).isDir) { - System.err.println("Logging directory specified is not a directory: %s".format(logDir)) - printUsageAndExit(1) - } - } - - private def printUsageAndExit(exitCode: Int) { - System.err.println( - "Usage: HistoryServer [options]\n" + - "\n" + - "Options:\n" + - " -d DIR, --dir DIR Location of event log files") - System.exit(exitCode) - } -} diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index b08f308fda1dd..856273e1d4e21 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -51,6 +51,7 @@ private[spark] abstract class WebUI( def getTabs: Seq[WebUITab] = tabs.toSeq def getHandlers: Seq[ServletContextHandler] = handlers.toSeq + def getSecurityManager: SecurityManager = securityManager /** Attach a tab to this UI, along with all of its attached pages. */ def attachTab(tab: WebUITab) { From a1d4f1eac53cc695dfa53fd31c9cf74890eea164 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 8 May 2014 14:48:04 -0700 Subject: [PATCH 02/12] Rudimentary paging support for the history UI. The provider's list api was tweaked a little bit so that the caller can get an atomic view of the data currently held in the provider. --- .../deploy/history/FsHistoryProvider.scala | 8 +++++-- .../spark/deploy/history/HistoryPage.scala | 20 +++++++++++++---- .../spark/deploy/history/HistoryServer.scala | 22 +++++++++++-------- 3 files changed, 35 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 884eca34168ca..c022676c64880 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -83,6 +83,7 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider } checkForLogs() + logCheckingThread.setDaemon(true) logCheckingThread.start() } @@ -90,10 +91,13 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider stopped = true logCheckingThread.interrupt() logCheckingThread.join() + fs.close() } - override def getListing(offset: Int, limit: Int): Seq[ApplicationHistoryInfo] = { - appList.get() + override def getListing(offset: Int, count: Int) = { + val list = appList.get() + val theOffset = if (offset < list.size) offset else 0 + (list.slice(theOffset, Math.min(theOffset + count, list.size)), theOffset, list.size) } override def getAppInfo(appId: String): ApplicationHistoryInfo = { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index b933d58a5d8dd..4a89af6519200 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -25,17 +25,29 @@ import org.apache.spark.ui.{WebUIPage, UIUtils} private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { + val pageSize = 20 + def render(request: HttpServletRequest): Seq[Node] = { - val apps = parent.getApplicationList(0, -1) + val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt + val requestedFirst = (requestedPage - 1) * pageSize + val (apps, actualFirst, count) = parent.getApplicationList(requestedFirst, pageSize) + val actualPage = (actualFirst / pageSize) + 1 + val last = Math.min(actualFirst + pageSize, count) - 1 + val pageCount = count / pageSize + (if (count % pageSize > 0) 1 else 0) + val appTable = UIUtils.listingTable(appHeader, appRow, apps) val content =
{ - if (apps.size > 0) { + if (count > 0) {

- Showing {apps.size}/{apps.size} - Completed Application{if (apps.size > 1) "s" else ""} + Showing {actualFirst + 1}-{last + 1} of {count} + Application{if (last - actualFirst > 1) "s" else ""} + + {if (actualPage > 1) <} + {if (actualPage < pageCount) >} +

++ appTable } else { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index c8e1dfa6fa220..fbdc0970fd334 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -155,11 +155,12 @@ class HistoryServer( * Returns a list of available applications, in descending order according to their last * updated time. * - * @param offset Offset of the first entry to return. - * @param limit Maximum number of entries to return (-1 = no limit). + * @param offset Starting offset for returned objects. + * @param count Max number of objects to return. + * @return 3-tuple (requested app list, adjusted offset, count of all available apps) */ - def getApplicationList(offset: Int, limit: Int) = { - provider.getListing(offset, limit) + def getApplicationList(offset: Int, count: Int) = { + provider.getListing(offset, count) } } @@ -269,13 +270,16 @@ private[spark] abstract class ApplicationHistoryProvider { /** * This method should return a list of applications available for the history server to - * show. The listing is assumed to be in descending time order (so that the parameters - * make sense). + * show. The listing is assumed to be in descending time order. * - * @param offset Offset of the first entry to return. - * @param limit Maximum number of entries to return (-1 = no limit). + * An adjusted offset should be returned if the app list has changed and the request + * references an invalid start offset. Otherwise, the provided offset should be returned. + * + * @param offset Starting offset for returned objects. + * @param count Max number of objects to return. + * @return 3-tuple (requested app list, adjusted offset, count of all available apps) */ - def getListing(offset: Int, limit: Int): Seq[ApplicationHistoryInfo] + def getListing(offset: Int, count: Int): (Seq[ApplicationHistoryInfo], Int, Int) /** * This method should return the application information, including a rendered SparkUI. From 461fac8896f4e9ca31f55824c6ae00df27a3cf1d Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 9 May 2014 13:39:26 -0700 Subject: [PATCH 03/12] Ensure server.stop() is called when shutting down. Also remove the cleanup code from the fs provider. It would be better to clean up, but there's a race between that code's cleanup and Hadoop's shutdown hook, which closes all file systems kept in the cache. So if you try to clean up the fs provider in a shut down hook, you may end up with ugly exceptions in the output. But leave the stop() functionality around in case it's useful for future provider implementations. --- .../spark/deploy/history/FsHistoryProvider.scala | 11 +---------- .../apache/spark/deploy/history/HistoryServer.scala | 7 ++++++- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index c022676c64880..d8ee557caba86 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -54,7 +54,7 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider */ private val logCheckingThread = new Thread("LogCheckingThread") { override def run() = Utils.logUncaughtExceptions { - while (!stopped) { + while (true) { val now = System.currentTimeMillis if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) { Thread.sleep(UPDATE_INTERVAL_MS) @@ -68,8 +68,6 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider } } - @volatile private var stopped = false - initialize() private def initialize() { @@ -87,13 +85,6 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider logCheckingThread.start() } - override def stop() = { - stopped = true - logCheckingThread.interrupt() - logCheckingThread.join() - fs.close() - } - override def getListing(offset: Int, count: Int) = { val list = appList.get() val theOffset = if (offset < list.size) offset else 0 diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index fbdc0970fd334..2aa469f563559 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -195,9 +195,14 @@ object HistoryServer { val server = new HistoryServer(conf, provider, securityManager, port) server.bind() + Runtime.getRuntime().addShutdownHook(new Thread("HistoryServerStopper") { + override def run() = { + server.stop() + } + }) + // Wait until the end of the world... or if the HistoryServer process is manually stopped while(true) { Thread.sleep(Int.MaxValue) } - server.stop() } def initSecurity() { From d62cd43233b6d4a2198c12263df2d1234925dbd9 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 9 May 2014 15:00:45 -0700 Subject: [PATCH 04/12] Better handle failures when loading app info. Instead of failing to load all the applications, just ignore the one that failed. --- .../deploy/history/FsHistoryProvider.scala | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index d8ee557caba86..bd4d1250c051b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -129,16 +129,21 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider // For any application that either (i) is not listed or (ii) has changed since the last time // the listing was created (defined by the log dir's modification time), load the app's info. // Otherwise just reuse what's already in memory. - appList.set(logInfos - .map { dir => - val curr = currentApps.getOrElse(dir.getPath().getName(), null) - if (curr == null || curr.lastUpdated < getModificationTime(dir)) { - loadAppInfo(dir, false) - } else { - curr + val newApps = new mutable.ListBuffer[ApplicationHistoryInfo] + for (dir <- logInfos) { + val curr = currentApps.getOrElse(dir.getPath().getName(), null) + if (curr == null || curr.lastUpdated < getModificationTime(dir)) { + try { + newApps += loadAppInfo(dir, false) + } catch { + case e: Exception => logError(s"Failed to load app info from directory $dir.") } + } else { + newApps += curr } - .sortBy { info => -info.lastUpdated }) + } + + appList.set(newApps.sortBy { info => -info.lastUpdated }) } catch { case t: Throwable => logError("Exception in checking for event log updates", t) } From 8e2aad0a094b3718e8174033f6a106b60a29ef95 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 9 May 2014 15:27:04 -0700 Subject: [PATCH 05/12] Fix scalastyle issues. --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index bd4d1250c051b..f8bcbc291534a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -74,10 +74,12 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider // Validate the log directory. val path = new Path(logDir) if (!fs.exists(path)) { - throw new IllegalArgumentException("Logging directory specified does not exist: %s".format(logDir)) + throw new IllegalArgumentException( + "Logging directory specified does not exist: %s".format(logDir)) } if (!fs.getFileStatus(path).isDir) { - throw new IllegalArgumentException("Logging directory specified is not a directory: %s".format(logDir)) + throw new IllegalArgumentException( + "Logging directory specified is not a directory: %s".format(logDir)) } checkForLogs() From 81b69fbe575c19129fe85b243ba5e58f28ca636c Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 9 May 2014 16:41:15 -0700 Subject: [PATCH 06/12] Fix a comment. --- .../spark/deploy/history/FsHistoryProvider.scala | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index f8bcbc291534a..49bacd8974009 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -103,15 +103,9 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider } /** - * Check for any updates to event logs in the base directory. This is only effective once - * the server has been bound. - * - * If a new completed application is found, the server renders the associated SparkUI - * from the application's event logs, attaches this UI to itself, and stores metadata - * information for this application. - * - * If the logs for an existing completed application are no longer found, the server - * removes all associated information and detaches the SparkUI. + * Builds the application list based on the current contents of the log directory. + * Tries to reuse as much of the data already in memory as possible, but not reading + * applications that hasn't been updated since last time the logs were checked. */ def checkForLogs() = synchronized { lastLogCheckTime = System.currentTimeMillis From 0bb7b57ead82d109b74cbb40bed698f0b1507014 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 2 Jun 2014 16:36:42 -0700 Subject: [PATCH 07/12] Review feedback. Use monotonic time, plus other stylistic things. --- .../deploy/history/FsHistoryProvider.scala | 18 ++++++++++-------- .../spark/deploy/history/HistoryServer.scala | 11 +++++------ 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 49bacd8974009..b9a6026afb090 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -39,7 +39,7 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider private val fs = Utils.getHadoopFileSystem(logDir) // A timestamp of when the disk was last accessed to check for log updates - private var lastLogCheckTime = -1L + private var lastLogCheckTimeMs = -1L // List of applications, in order from newest to oldest. private val appList = new AtomicReference[Seq[ApplicationHistoryInfo]](Nil) @@ -55,13 +55,13 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider private val logCheckingThread = new Thread("LogCheckingThread") { override def run() = Utils.logUncaughtExceptions { while (true) { - val now = System.currentTimeMillis - if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) { + val now = getMonotonicTime() + if (now - lastLogCheckTimeMs > UPDATE_INTERVAL_MS) { Thread.sleep(UPDATE_INTERVAL_MS) } else { // If the user has manually checked for logs recently, wait until // UPDATE_INTERVAL_MS after the last check time - Thread.sleep(lastLogCheckTime + UPDATE_INTERVAL_MS - now) + Thread.sleep(lastLogCheckTimeMs + UPDATE_INTERVAL_MS - now) } checkForLogs() } @@ -108,13 +108,12 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider * applications that hasn't been updated since last time the logs were checked. */ def checkForLogs() = synchronized { - lastLogCheckTime = System.currentTimeMillis - logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTime)) + lastLogCheckTimeMs = getMonotonicTime() + logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs)) try { val logStatus = fs.listStatus(new Path(logDir)) val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]() val logInfos = logDirs - .sortBy { dir => getModificationTime(dir) } .filter { dir => fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE)) } @@ -125,7 +124,7 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider // For any application that either (i) is not listed or (ii) has changed since the last time // the listing was created (defined by the log dir's modification time), load the app's info. // Otherwise just reuse what's already in memory. - val newApps = new mutable.ListBuffer[ApplicationHistoryInfo] + val newApps = new mutable.ArrayBuffer[ApplicationHistoryInfo] for (dir <- logInfos) { val curr = currentApps.getOrElse(dir.getPath().getName(), null) if (curr == null || curr.lastUpdated < getModificationTime(dir)) { @@ -198,4 +197,7 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider } } + /** Returns the system's mononotically increasing time. */ + private def getMonotonicTime() = System.nanoTime() / (1000 * 1000) + } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 2aa469f563559..73b827d8b6055 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -59,14 +59,13 @@ class HistoryServer( private val appLoader = new CacheLoader[String, SparkUI] { override def load(key: String): SparkUI = { val info = provider.getAppInfo(key) - if (info != null) { - info.ui.getSecurityManager.setUIAcls(uiAclsEnabled) - info.ui.getSecurityManager.setViewAcls(info.sparkUser, info.viewAcls) - attachSparkUI(info.ui) - info.ui - } else { + if (info == null) { throw new NoSuchElementException() } + info.ui.getSecurityManager.setUIAcls(uiAclsEnabled) + info.ui.getSecurityManager.setViewAcls(info.sparkUser, info.viewAcls) + attachSparkUI(info.ui) + info.ui } } From bd790a03ff93daf3e99b80da4e015c116ab719bd Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 2 Jun 2014 16:56:23 -0700 Subject: [PATCH 08/12] Initialize new app array to expected size. To avoid reallocations. --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index b9a6026afb090..e4851479a9f3a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -124,7 +124,7 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider // For any application that either (i) is not listed or (ii) has changed since the last time // the listing was created (defined by the log dir's modification time), load the app's info. // Otherwise just reuse what's already in memory. - val newApps = new mutable.ArrayBuffer[ApplicationHistoryInfo] + val newApps = new mutable.ArrayBuffer[ApplicationHistoryInfo](logInfos.size) for (dir <- logInfos) { val curr = currentApps.getOrElse(dir.getPath().getName(), null) if (curr == null || curr.lastUpdated < getModificationTime(dir)) { From 43e270f8e7af738f3ae81c7c17df81c44e6e80f6 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 3 Jun 2014 10:34:47 -0700 Subject: [PATCH 09/12] Cosmetic change to listing header. --- .../org/apache/spark/deploy/history/HistoryPage.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index 4a89af6519200..7f1a9fe6f7d53 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -30,20 +30,19 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { def render(request: HttpServletRequest): Seq[Node] = { val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt val requestedFirst = (requestedPage - 1) * pageSize - val (apps, actualFirst, count) = parent.getApplicationList(requestedFirst, pageSize) + val (apps, actualFirst, totalCount) = parent.getApplicationList(requestedFirst, pageSize) val actualPage = (actualFirst / pageSize) + 1 - val last = Math.min(actualFirst + pageSize, count) - 1 - val pageCount = count / pageSize + (if (count % pageSize > 0) 1 else 0) + val last = Math.min(actualFirst + pageSize, totalCount) - 1 + val pageCount = totalCount / pageSize + (if (totalCount % pageSize > 0) 1 else 0) val appTable = UIUtils.listingTable(appHeader, appRow, apps) val content =
{ - if (count > 0) { + if (totalCount > 0) {

- Showing {actualFirst + 1}-{last + 1} of {count} - Application{if (last - actualFirst > 1) "s" else ""} + Showing {actualFirst + 1}-{last + 1} of {totalCount} {if (actualPage > 1) <} {if (actualPage < pageCount) >} From e0437005af34d7eb8d82d4f630594125f0d1090f Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 3 Jun 2014 10:58:31 -0700 Subject: [PATCH 10/12] Make class package-private. --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index e4851479a9f3a..85da47bb55212 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -29,7 +29,7 @@ import org.apache.spark.scheduler._ import org.apache.spark.ui.SparkUI import org.apache.spark.util.Utils -class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider +private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider with Logging { // Interval between each check for event log updates From 9b11205514ac200b7543c1f277b0578cb4d387ef Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 22 May 2014 13:41:54 -0700 Subject: [PATCH 11/12] Make event logger use a single file. Currently the event logger uses a directory and several files to describe an app's event log, all but one of which are empty. This is not very HDFS-friendly, since creating lots of nodes in HDFS (especially when they don't contain any data) is frowned upon due to the node metadata being kept in the NameNode's memory. Instead, all the metadata needed for the app log file can be encoded in the file name itself. (HDFS is adding extended attributes which could be used for this, but we need to support older versions.) This change implements that approach, and also gets rid of FileLogger, which was only used by EventLoggingListener and the little functionality it provided can be much more concisely implemented inside the listener itself. With the new approach, aside from reducing the load on the NN, there's also a lot less remote calls needed when reading the log directory. --- .../spark/deploy/ApplicationDescription.scala | 2 +- .../deploy/history/FsHistoryProvider.scala | 49 ++--- .../apache/spark/deploy/master/Master.scala | 41 ++-- .../scheduler/EventLoggingListener.scala | 189 +++++++++-------- .../spark/scheduler/ReplayListenerBus.scala | 56 +++-- .../cluster/SparkDeploySchedulerBackend.scala | 2 +- .../org/apache/spark/util/FileLogger.scala | 193 ----------------- .../scheduler/EventLoggingListenerSuite.scala | 196 +++++++----------- .../spark/scheduler/ReplayListenerSuite.scala | 20 +- .../apache/spark/util/FileLoggerSuite.scala | 168 --------------- 10 files changed, 246 insertions(+), 670 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/util/FileLogger.scala delete mode 100644 core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index 86305d2ea8a09..3607b9a3706c4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -24,7 +24,7 @@ private[spark] class ApplicationDescription( val command: Command, val sparkHome: Option[String], var appUiUrl: String, - val eventLogDir: Option[String] = None) + val eventLogFile: Option[String] = None) extends Serializable { val user = System.getProperty("user.name", "") diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 85da47bb55212..e6717b1010908 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -111,12 +111,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis lastLogCheckTimeMs = getMonotonicTime() logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs)) try { - val logStatus = fs.listStatus(new Path(logDir)) - val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]() - val logInfos = logDirs + val matcher = EventLoggingListener.LOG_FILE_NAME_REGEX + val logInfos = fs.listStatus(new Path(logDir)) .filter { - dir => fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE)) - } + entry => entry.isFile() && matcher.pattern.matcher(entry.getPath()getName()).matches() } var currentApps = Map[String, ApplicationHistoryInfo]( appList.get().map(app => (app.id -> app)):_*) @@ -124,14 +122,14 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis // For any application that either (i) is not listed or (ii) has changed since the last time // the listing was created (defined by the log dir's modification time), load the app's info. // Otherwise just reuse what's already in memory. - val newApps = new mutable.ArrayBuffer[ApplicationHistoryInfo](logInfos.size) - for (dir <- logInfos) { - val curr = currentApps.getOrElse(dir.getPath().getName(), null) - if (curr == null || curr.lastUpdated < getModificationTime(dir)) { + val newApps = new mutable.ListBuffer[ApplicationHistoryInfo] + for (log <- logInfos) { + val curr = currentApps.getOrElse(log.getPath().getName(), null) + if (curr == null || curr.lastUpdated < log.getModificationTime()) { try { - newApps += loadAppInfo(dir, false) + newApps += loadAppInfo(log, false) } catch { - case e: Exception => logError(s"Failed to load app info from directory $dir.") + case e: Exception => logError(s"Failed to load app info from directory $log.") } } else { newApps += curr @@ -148,11 +146,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis * Parse the application's logs to find out the information we need to build the * listing page. */ - private def loadAppInfo(logDir: FileStatus, renderUI: Boolean): ApplicationHistoryInfo = { - val elogInfo = EventLoggingListener.parseLoggingInfo(logDir.getPath(), fs) - val path = logDir.getPath - val appId = path.getName - val replayBus = new ReplayListenerBus(elogInfo.logPaths, fs, elogInfo.compressionCodec) + private def loadAppInfo(log: FileStatus, renderUI: Boolean): ApplicationHistoryInfo = { + val elogInfo = EventLoggingListener.parseLoggingInfo(log.getPath()) + val appId = elogInfo.path.getName + val replayBus = new ReplayListenerBus(elogInfo.path, fs, elogInfo.compressionCodec) val appListener = new ApplicationEventListener replayBus.addListener(appListener) @@ -170,33 +167,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis val sparkUser = appListener.sparkUser val startTime = appListener.startTime val endTime = appListener.endTime - val lastUpdated = getModificationTime(logDir) + val lastUpdated = log.getModificationTime() ApplicationHistoryInfo(appId, appListener.appName, appListener.startTime, appListener.endTime, - getModificationTime(logDir), + lastUpdated, appListener.sparkUser, if (renderUI) appListener.viewAcls else null, ui) } - /** Return when this directory was last modified. */ - private def getModificationTime(dir: FileStatus): Long = { - try { - val logFiles = fs.listStatus(dir.getPath) - if (logFiles != null && !logFiles.isEmpty) { - logFiles.map(_.getModificationTime).max - } else { - dir.getModificationTime - } - } catch { - case t: Throwable => - logError("Exception in accessing modification time of %s".format(dir.getPath), t) - -1L - } - } - /** Returns the system's mononotically increasing time. */ private def getMonotonicTime() = System.nanoTime() / (1000 * 1000) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index c6dec305bffcb..f6619f34d5410 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -30,7 +30,7 @@ import akka.actor._ import akka.pattern.ask import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} import akka.serialization.SerializationExtension -import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState} @@ -668,27 +668,26 @@ private[spark] class Master( */ def rebuildSparkUI(app: ApplicationInfo): Boolean = { val appName = app.desc.name - val eventLogDir = app.desc.eventLogDir.getOrElse { return false } - val fileSystem = Utils.getHadoopFileSystem(eventLogDir) - val eventLogInfo = EventLoggingListener.parseLoggingInfo(eventLogDir, fileSystem) - val eventLogPaths = eventLogInfo.logPaths + val eventLogFile = app.desc.eventLogFile.getOrElse { return false } + val eventLogInfo = EventLoggingListener.parseLoggingInfo(new Path(eventLogFile)) + if (eventLogInfo == null) { + return false + } + val compressionCodec = eventLogInfo.compressionCodec - if (!eventLogPaths.isEmpty) { - try { - val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec) - val ui = new SparkUI( - new SparkConf, replayBus, appName + " (completed)", "/history/" + app.id) - replayBus.replay() - app.desc.appUiUrl = ui.basePath - appIdToUI(app.id) = ui - webUi.attachSparkUI(ui) - return true - } catch { - case e: Exception => - logError("Exception in replaying log for application %s (%s)".format(appName, app.id), e) - } - } else { - logWarning("Application %s (%s) has no valid logs: %s".format(appName, app.id, eventLogDir)) + try { + val fileSystem = Utils.getHadoopFileSystem(eventLogFile) + val replayBus = new ReplayListenerBus(eventLogInfo.path, fileSystem, compressionCodec) + val ui = new SparkUI( + new SparkConf, replayBus, appName + " (completed)", "/history/" + app.id) + replayBus.replay() + app.desc.appUiUrl = ui.basePath + appIdToUI(app.id) = ui + webUi.attachSparkUI(ui) + return true + } catch { + case e: Exception => + logError("Exception in replaying log for application %s (%s)".format(appName, app.id), e) } false } diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index a90b0d475c04e..0f860ce533a43 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -17,11 +17,14 @@ package org.apache.spark.scheduler +import java.io.{BufferedOutputStream, FileOutputStream, IOException, PrintWriter} +import java.net.URI + import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path} import org.apache.hadoop.fs.permission.FsPermission import org.json4s.JsonAST.JValue import org.json4s.jackson.JsonMethods._ @@ -29,7 +32,7 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.{Logging, SparkConf, SparkContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io.CompressionCodec -import org.apache.spark.util.{FileLogger, JsonProtocol} +import org.apache.spark.util.{JsonProtocol, Utils} /** * A SparkListener that logs events to persistent storage. @@ -54,38 +57,93 @@ private[spark] class EventLoggingListener( private val testing = sparkConf.getBoolean("spark.eventLog.testing", false) private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024 private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/") - private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis - val logDir = logBaseDir + "/" + name + private val fileSystem = Utils.getHadoopFileSystem(new URI(logBaseDir)) + private lazy val compressionCodec = CompressionCodec.createCodec(sparkConf) + + // Only defined if the file system scheme is not local + private var hadoopDataStream: Option[FSDataOutputStream] = None - protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize, - shouldCompress, shouldOverwrite, Some(LOG_FILE_PERMISSIONS)) + private var writer: Option[PrintWriter] = None // For testing. Keep track of all JSON serialized events that have been logged. private[scheduler] val loggedEvents = new ArrayBuffer[JValue] + val logPath = { + val sb = new StringBuilder() + .append(logBaseDir) + .append("/") + .append(appName.replaceAll("[ :/]", "-").toLowerCase()) + .append("-") + .append(System.currentTimeMillis()) + .append("-") + .append(SparkContext.SPARK_VERSION) + if (shouldCompress) { + val codec = + sparkConf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC) + sb.append("-").append(codec) + } + sb.toString() + } + /** - * Begin logging events. - * If compression is used, log a file that indicates which compression library is used. + * Creates the log file in the configured log directory. + * + * The file name contains some metadata about its contents. It follows the following + * format: + * + * {{{ + * {app name}-{timestamp}.{spark version}[.{compression codec}][.inprogress] + * }}} + * + * Where: + * - "app name" is a fs-friendly version of the application's name, in lower case + * - "timestamp" is a timestamp generated by this logger + * - "spark version" is the version of spark that generated the logs + * - "compression codec" is an optional string with the name of the compression codec + * used to write the file + * - ".inprogress" will be present while the log file is still being written to, and + * removed after the application is finished. */ def start() { - logger.start() - logInfo("Logging events to %s".format(logDir)) - if (shouldCompress) { - val codec = - sparkConf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC) - logger.newFile(COMPRESSION_CODEC_PREFIX + codec) + if (!fileSystem.isDirectory(new Path(logBaseDir))) { + throw new IllegalArgumentException(s"Log directory $logBaseDir does not exist."); } - logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION) - logger.newFile(LOG_PREFIX + logger.fileIndex) + + val workingPath = logPath + IN_PROGRESS + val uri = new URI(workingPath) + val path = new Path(workingPath) + val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme + val isDefaultLocal = defaultFs == null || defaultFs == "file" + + /* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). + * Therefore, for local files, use FileOutputStream instead. */ + val dstream = + if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { + // Second parameter is whether to append + new FileOutputStream(uri.getPath) + } else { + hadoopDataStream = Some(fileSystem.create(path)) + hadoopDataStream.get + } + + fileSystem.setPermission(path, LOG_FILE_PERMISSIONS) + val bstream = new BufferedOutputStream(dstream, outputBufferSize) + val cstream = if (shouldCompress) compressionCodec.compressedOutputStream(bstream) else bstream + writer = Some(new PrintWriter(cstream)) + + logInfo("Logging events to %s".format(logPath)) } /** Log the event as JSON. */ private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) { val eventJson = JsonProtocol.sparkEventToJson(event) - logger.logLine(compact(render(eventJson))) + + writer.foreach(_.println(compact(render(eventJson)))) if (flushLogger) { - logger.flush() + writer.foreach(_.flush()) + hadoopDataStream.foreach(_.sync()) } + if (testing) { loggedEvents += eventJson } @@ -126,50 +184,28 @@ private[spark] class EventLoggingListener( * In addition, create an empty special file to indicate application completion. */ def stop() = { - logger.newFile(APPLICATION_COMPLETE) - logger.stop() + writer.foreach(_.close()) + + val target = new Path(logPath) + if (fileSystem.exists(target)) { + throw new IOException("Target log file already exists (%s)".format(logPath)) + } + fileSystem.rename(new Path(logPath + IN_PROGRESS), target) } + } private[spark] object EventLoggingListener extends Logging { val DEFAULT_LOG_DIR = "/tmp/spark-events" - val LOG_PREFIX = "EVENT_LOG_" - val SPARK_VERSION_PREFIX = "SPARK_VERSION_" - val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_" - val APPLICATION_COMPLETE = "APPLICATION_COMPLETE" + val IN_PROGRESS = ".inprogress" val LOG_FILE_PERMISSIONS = FsPermission.createImmutable(Integer.parseInt("770", 8).toShort) + // Regex for parsing log file names. See description of log file name format in start(). + val LOG_FILE_NAME_REGEX = s".+-[0-9]+-([0-9](?:\\.[0-9])*)(?:-(.+?))?(\\$IN_PROGRESS)?".r + // A cache for compression codecs to avoid creating the same codec many times private val codecMap = new mutable.HashMap[String, CompressionCodec] - def isEventLogFile(fileName: String): Boolean = { - fileName.startsWith(LOG_PREFIX) - } - - def isSparkVersionFile(fileName: String): Boolean = { - fileName.startsWith(SPARK_VERSION_PREFIX) - } - - def isCompressionCodecFile(fileName: String): Boolean = { - fileName.startsWith(COMPRESSION_CODEC_PREFIX) - } - - def isApplicationCompleteFile(fileName: String): Boolean = { - fileName == APPLICATION_COMPLETE - } - - def parseSparkVersion(fileName: String): String = { - if (isSparkVersionFile(fileName)) { - fileName.replaceAll(SPARK_VERSION_PREFIX, "") - } else "" - } - - def parseCompressionCodec(fileName: String): String = { - if (isCompressionCodecFile(fileName)) { - fileName.replaceAll(COMPRESSION_CODEC_PREFIX, "") - } else "" - } - /** * Parse the event logging information associated with the logs in the given directory. * @@ -177,47 +213,24 @@ private[spark] object EventLoggingListener extends Logging { * codec file (if event logs are compressed), and the application completion file (if the * application has run to completion). */ - def parseLoggingInfo(logDir: Path, fileSystem: FileSystem): EventLoggingInfo = { + def parseLoggingInfo(log: Path): EventLoggingInfo = { try { - val fileStatuses = fileSystem.listStatus(logDir) - val filePaths = - if (fileStatuses != null) { - fileStatuses.filter(!_.isDir).map(_.getPath).toSeq + val LOG_FILE_NAME_REGEX(version, codecName, inprogress) = log.getName() + val codec: Option[CompressionCodec] = if (codecName != null) { + val conf = new SparkConf() + conf.set("spark.io.compression.codec", codecName) + Some(CompressionCodec.createCodec(conf)) } else { - Seq[Path]() + None } - if (filePaths.isEmpty) { - logWarning("No files found in logging directory %s".format(logDir)) - } - EventLoggingInfo( - logPaths = filePaths.filter { path => isEventLogFile(path.getName) }, - sparkVersion = filePaths - .find { path => isSparkVersionFile(path.getName) } - .map { path => parseSparkVersion(path.getName) } - .getOrElse(""), - compressionCodec = filePaths - .find { path => isCompressionCodecFile(path.getName) } - .map { path => - val codec = EventLoggingListener.parseCompressionCodec(path.getName) - val conf = new SparkConf - conf.set("spark.io.compression.codec", codec) - codecMap.getOrElseUpdate(codec, CompressionCodec.createCodec(conf)) - }, - applicationComplete = filePaths.exists { path => isApplicationCompleteFile(path.getName) } - ) + EventLoggingInfo(log, version, codec, inprogress == null) } catch { case e: Exception => - logError("Exception in parsing logging info from directory %s".format(logDir), e) - EventLoggingInfo.empty + logError("Exception in parsing logging info from file %s".format(log), e) + null } } - /** - * Parse the event logging information associated with the logs in the given directory. - */ - def parseLoggingInfo(logDir: String, fileSystem: FileSystem): EventLoggingInfo = { - parseLoggingInfo(new Path(logDir), fileSystem) - } } @@ -225,11 +238,7 @@ private[spark] object EventLoggingListener extends Logging { * Information needed to process the event logs associated with an application. */ private[spark] case class EventLoggingInfo( - logPaths: Seq[Path], + path: Path, sparkVersion: String, compressionCodec: Option[CompressionCodec], applicationComplete: Boolean = false) - -private[spark] object EventLoggingInfo { - def empty = EventLoggingInfo(Seq[Path](), "", None, applicationComplete = false) -} diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index f89724d4ea196..2f6c4f408e65c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -35,51 +35,47 @@ import org.apache.spark.util.JsonProtocol * exactly one SparkListenerEvent. */ private[spark] class ReplayListenerBus( - logPaths: Seq[Path], + logPath: Path, fileSystem: FileSystem, compressionCodec: Option[CompressionCodec]) extends SparkListenerBus with Logging { private var replayed = false - if (logPaths.length == 0) { - logWarning("Log path provided contains no log files.") - } - /** * Replay each event in the order maintained in the given logs. * This should only be called exactly once. */ def replay() { assert(!replayed, "ReplayListenerBus cannot replay events more than once") - logPaths.foreach { path => - // Keep track of input streams at all levels to close them later - // This is necessary because an exception can occur in between stream initializations - var fileStream: Option[InputStream] = None - var bufferedStream: Option[InputStream] = None - var compressStream: Option[InputStream] = None - var currentLine = "" - try { - fileStream = Some(fileSystem.open(path)) - bufferedStream = Some(new BufferedInputStream(fileStream.get)) - compressStream = Some(wrapForCompression(bufferedStream.get)) - // Parse each line as an event and post the event to all attached listeners - val lines = Source.fromInputStream(compressStream.get).getLines() - lines.foreach { line => - currentLine = line - postToAll(JsonProtocol.sparkEventFromJson(parse(line))) - } - } catch { - case e: Exception => - logError("Exception in parsing Spark event log %s".format(path), e) - logError("Malformed line: %s\n".format(currentLine)) - } finally { - fileStream.foreach(_.close()) - bufferedStream.foreach(_.close()) - compressStream.foreach(_.close()) + // Keep track of input streams at all levels to close them later + // This is necessary because an exception can occur in between stream initializations + var fileStream: Option[InputStream] = None + var bufferedStream: Option[InputStream] = None + var compressStream: Option[InputStream] = None + var currentLine = "" + try { + fileStream = Some(fileSystem.open(logPath)) + bufferedStream = Some(new BufferedInputStream(fileStream.get)) + compressStream = Some(wrapForCompression(bufferedStream.get)) + + // Parse each line as an event and post the event to all attached listeners + val lines = Source.fromInputStream(compressStream.get).getLines() + lines.foreach { line => + currentLine = line + postToAll(JsonProtocol.sparkEventFromJson(parse(line))) } + } catch { + case e: Exception => + logError("Exception in parsing Spark event log %s".format(logPath), e) + logError("Malformed line: %s\n".format(currentLine)) + } finally { + fileStream.foreach(_.close()) + bufferedStream.foreach(_.close()) + compressStream.foreach(_.close()) } + replayed = true } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 9c07b3f7b695a..27d01bfe3c7da 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -59,7 +59,7 @@ private[spark] class SparkDeploySchedulerBackend( classPathEntries, libraryPathEntries, extraJavaOpts) val sparkHome = sc.getSparkHome() val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, - sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir)) + sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logPath)) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala deleted file mode 100644 index 0e6d21b22023a..0000000000000 --- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import java.io.{BufferedOutputStream, FileOutputStream, IOException, PrintWriter} -import java.net.URI -import java.text.SimpleDateFormat -import java.util.Date - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} -import org.apache.hadoop.fs.permission.FsPermission - -import org.apache.spark.{Logging, SparkConf} -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.io.CompressionCodec - -/** - * A generic class for logging information to file. - * - * @param logDir Path to the directory in which files are logged - * @param outputBufferSize The buffer size to use when writing to an output stream in bytes - * @param compress Whether to compress output - * @param overwrite Whether to overwrite existing files - */ -private[spark] class FileLogger( - logDir: String, - sparkConf: SparkConf, - hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration(), - outputBufferSize: Int = 8 * 1024, // 8 KB - compress: Boolean = false, - overwrite: Boolean = true, - dirPermissions: Option[FsPermission] = None) - extends Logging { - - private val dateFormat = new ThreadLocal[SimpleDateFormat]() { - override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") - } - - private val fileSystem = Utils.getHadoopFileSystem(new URI(logDir)) - var fileIndex = 0 - - // Only used if compression is enabled - private lazy val compressionCodec = CompressionCodec.createCodec(sparkConf) - - // Only defined if the file system scheme is not local - private var hadoopDataStream: Option[FSDataOutputStream] = None - - private var writer: Option[PrintWriter] = None - - /** - * Start this logger by creating the logging directory. - */ - def start() { - createLogDir() - } - - /** - * Create a logging directory with the given path. - */ - private def createLogDir() { - val path = new Path(logDir) - if (fileSystem.exists(path)) { - if (overwrite) { - logWarning("Log directory %s already exists. Overwriting...".format(logDir)) - // Second parameter is whether to delete recursively - fileSystem.delete(path, true) - } else { - throw new IOException("Log directory %s already exists!".format(logDir)) - } - } - if (!fileSystem.mkdirs(path)) { - throw new IOException("Error in creating log directory: %s".format(logDir)) - } - if (dirPermissions.isDefined) { - val fsStatus = fileSystem.getFileStatus(path) - if (fsStatus.getPermission.toShort != dirPermissions.get.toShort) { - fileSystem.setPermission(path, dirPermissions.get) - } - } - } - - /** - * Create a new writer for the file identified by the given path. - * If the permissions are not passed in, it will default to use the permissions - * (dirPermissions) used when class was instantiated. - */ - private def createWriter(fileName: String, perms: Option[FsPermission] = None): PrintWriter = { - val logPath = logDir + "/" + fileName - val uri = new URI(logPath) - val path = new Path(logPath) - val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme - val isDefaultLocal = defaultFs == null || defaultFs == "file" - - /* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). - * Therefore, for local files, use FileOutputStream instead. */ - val dstream = - if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { - // Second parameter is whether to append - new FileOutputStream(uri.getPath, !overwrite) - } else { - hadoopDataStream = Some(fileSystem.create(path, overwrite)) - hadoopDataStream.get - } - - perms.orElse(dirPermissions).foreach { p => fileSystem.setPermission(path, p) } - val bstream = new BufferedOutputStream(dstream, outputBufferSize) - val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream - new PrintWriter(cstream) - } - - /** - * Log the message to the given writer. - * @param msg The message to be logged - * @param withTime Whether to prepend message with a timestamp - */ - def log(msg: String, withTime: Boolean = false) { - val writeInfo = if (!withTime) { - msg - } else { - val date = new Date(System.currentTimeMillis) - dateFormat.get.format(date) + ": " + msg - } - writer.foreach(_.print(writeInfo)) - } - - /** - * Log the message to the given writer as a new line. - * @param msg The message to be logged - * @param withTime Whether to prepend message with a timestamp - */ - def logLine(msg: String, withTime: Boolean = false) = log(msg + "\n", withTime) - - /** - * Flush the writer to disk manually. - * - * If the Hadoop FileSystem is used, the underlying FSDataOutputStream (r1.0.4) must be - * sync()'ed manually as it does not support flush(), which is invoked by when higher - * level streams are flushed. - */ - def flush() { - writer.foreach(_.flush()) - hadoopDataStream.foreach(_.sync()) - } - - /** - * Close the writer. Any subsequent calls to log or flush will have no effect. - */ - def close() { - writer.foreach(_.close()) - writer = None - } - - /** - * Start a writer for a new file, closing the existing one if it exists. - * @param fileName Name of the new file, defaulting to the file index if not provided. - * @param perms Permissions to put on the new file. - */ - def newFile(fileName: String = "", perms: Option[FsPermission] = None) { - fileIndex += 1 - writer.foreach(_.close()) - val name = fileName match { - case "" => fileIndex.toString - case _ => fileName - } - writer = Some(createWriter(name, perms)) - } - - /** - * Close all open writers, streams, and file systems. Any subsequent uses of this FileLogger - * instance will throw exceptions. - */ - def stop() { - hadoopDataStream.foreach(_.close()) - writer.foreach(_.close()) - fileSystem.close() - } -} diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 21e3db34b8b7a..1cce75cb779ac 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -25,8 +25,8 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.json4s.jackson.JsonMethods._ import org.scalatest.{BeforeAndAfter, FunSuite} -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.io.CompressionCodec +import org.apache.spark.{Logging, SparkConf, SparkContext} +import org.apache.spark.io._ import org.apache.spark.util.{JsonProtocol, Utils} import java.io.File @@ -34,40 +34,40 @@ import java.io.File /** * Test whether EventLoggingListener logs events properly. * - * This tests whether EventLoggingListener actually creates special files while logging events, - * whether the parsing of these special files is correct, and whether the logged events can be - * read and deserialized into actual SparkListenerEvents. + * This tests whether EventLoggingListener actually log files with expected name patterns while + * logging events, whether the parsing of the file names is correct, and whether the logged events + * can be read and deserialized into actual SparkListenerEvents. */ -class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { +class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Logging { private val fileSystem = Utils.getHadoopFileSystem("/") private val allCompressionCodecs = Seq[String]( "org.apache.spark.io.LZFCompressionCodec", "org.apache.spark.io.SnappyCompressionCodec" ) private var testDir: File = _ - private var logDirPath: Path = _ + private var testDirPath: Path = _ before { testDir = Files.createTempDir() testDir.deleteOnExit() - logDirPath = Utils.getFilePath(testDir, "spark-events") + testDirPath = new Path(testDir.getAbsolutePath()) } after { Utils.deleteRecursively(testDir) } - test("Parse names of special files") { + test("Parse names of log files") { testParsingFileName() } - test("Verify special files exist") { - testSpecialFilesExist() + test("Verify log file exist") { + testLogFileExists() } - test("Verify special files exist with compression") { + test("Verify log file names contain compression codec info") { allCompressionCodecs.foreach { codec => - testSpecialFilesExist(compressionCodec = Some(codec)) + testLogFileExists(compressionCodec = Some(codec)) } } @@ -109,79 +109,59 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { import EventLoggingListenerSuite._ /** - * Test whether names of special files are correctly identified and parsed. + * Test whether names of log files are correctly identified and parsed. */ private def testParsingFileName() { - val logPrefix = EventLoggingListener.LOG_PREFIX - val sparkVersionPrefix = EventLoggingListener.SPARK_VERSION_PREFIX - val compressionCodecPrefix = EventLoggingListener.COMPRESSION_CODEC_PREFIX - val applicationComplete = EventLoggingListener.APPLICATION_COMPLETE - assert(EventLoggingListener.isEventLogFile(logPrefix + "0")) - assert(EventLoggingListener.isEventLogFile(logPrefix + "100")) - assert(EventLoggingListener.isEventLogFile(logPrefix + "ANYTHING")) - assert(EventLoggingListener.isSparkVersionFile(sparkVersionPrefix + "0.9.1")) - assert(EventLoggingListener.isSparkVersionFile(sparkVersionPrefix + "1.0.0")) - assert(EventLoggingListener.isSparkVersionFile(sparkVersionPrefix + "ANYTHING")) - assert(EventLoggingListener.isApplicationCompleteFile(applicationComplete)) - allCompressionCodecs.foreach { codec => - assert(EventLoggingListener.isCompressionCodecFile(compressionCodecPrefix + codec)) - } - - // Negatives - assert(!EventLoggingListener.isEventLogFile("The greatest man of all mankind")) - assert(!EventLoggingListener.isSparkVersionFile("Will never falter in the face of death!")) - assert(!EventLoggingListener.isCompressionCodecFile("Unless he chooses to leave behind")) - assert(!EventLoggingListener.isApplicationCompleteFile("The very treasure he calls Macbeth")) - - // Verify that parsing is correct - assert(EventLoggingListener.parseSparkVersion(sparkVersionPrefix + "1.0.0") === "1.0.0") - allCompressionCodecs.foreach { codec => - assert(EventLoggingListener.parseCompressionCodec(compressionCodecPrefix + codec) === codec) - } + var tests = List( + // Log file name, Spark version, Compression codec, in progress + ("app1-1234-1.0.inprogress", "1.0", None, true), + ("app2-1234-0.9.1", "0.9.1", None, false), + ("app3-1234-0.9-org.apache.spark.io.LZFCompressionCodec", "0.9", + Some(classOf[LZFCompressionCodec]), false), + ("app-123456-1234-0.8-org.apache.spark.io.SnappyCompressionCodec.inprogress", "0.8", + Some(classOf[SnappyCompressionCodec]), true) + ) + + tests.foreach({ case (fname, version, codec, inProgress) => + logWarning(s"Testing: $fname") + val info = EventLoggingListener.parseLoggingInfo(new Path(fname)) + assert(info != null) + assert(version === info.sparkVersion) + assert(!inProgress === info.applicationComplete) + + val actualCodec = if (!info.compressionCodec.isEmpty) { + info.compressionCodec.get.getClass() + } else null + assert(codec.getOrElse(null) === actualCodec) + }) + + var invalid = List("app1", "app1-1.0", "app1-1234", "app1-abc-1.0", + "app1-1234-1.0-org.invalid.compression.Codec", + "app1-1234-1.0.not_in_progress") + + invalid.foreach(fname => assert(EventLoggingListener.parseLoggingInfo(new Path(fname)) == null)) } /** - * Test whether the special files produced by EventLoggingListener exist. - * - * There should be exactly one event log and one spark version file throughout the entire - * execution. If a compression codec is specified, then the compression codec file should - * also exist. Only after the application has completed does the test expect the application - * completed file to be present. + * Test whether the log file produced by EventLoggingListener exists and matches the expected + * name pattern. */ - private def testSpecialFilesExist(compressionCodec: Option[String] = None) { - - def assertFilesExist(logFiles: Array[FileStatus], loggerStopped: Boolean) { - val numCompressionCodecFiles = if (compressionCodec.isDefined) 1 else 0 - val numApplicationCompleteFiles = if (loggerStopped) 1 else 0 - assert(logFiles.size === 2 + numCompressionCodecFiles + numApplicationCompleteFiles) - assert(eventLogsExist(logFiles)) - assert(sparkVersionExists(logFiles)) - assert(compressionCodecExists(logFiles) === compressionCodec.isDefined) - assert(applicationCompleteExists(logFiles) === loggerStopped) - assertSparkVersionIsValid(logFiles) - compressionCodec.foreach { codec => - assertCompressionCodecIsValid(logFiles, codec) - } - } - + private def testLogFileExists(compressionCodec: Option[String] = None) { // Verify logging directory exists - val conf = getLoggingConf(logDirPath, compressionCodec) + val conf = getLoggingConf(testDirPath, compressionCodec) val eventLogger = new EventLoggingListener("test", conf) eventLogger.start() - val logPath = new Path(eventLogger.logDir) - assert(fileSystem.exists(logPath)) - val logDir = fileSystem.getFileStatus(logPath) - assert(logDir.isDir) - // Verify special files are as expected before stop() - var logFiles = fileSystem.listStatus(logPath) - assert(logFiles != null) - assertFilesExist(logFiles, loggerStopped = false) + val logPath = new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS) + assert(fileSystem.exists(logPath)) + val logStatus = fileSystem.getFileStatus(logPath) + assert(logStatus.isFile) + assert(EventLoggingListener.LOG_FILE_NAME_REGEX.pattern.matcher( + logPath.getName()).matches()) - // Verify special files are as expected after stop() + // Verify log is renamed after stop() eventLogger.stop() - logFiles = fileSystem.listStatus(logPath) - assertFilesExist(logFiles, loggerStopped = true) + assert(fileSystem.getFileStatus(new Path(eventLogger.logPath)).isFile()) } /** @@ -193,7 +173,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { private def testParsingLogInfo(compressionCodec: Option[String] = None) { def assertInfoCorrect(info: EventLoggingInfo, loggerStopped: Boolean) { - assert(info.logPaths.size > 0) + assert(info != null) assert(info.sparkVersion === SparkContext.SPARK_VERSION) assert(info.compressionCodec.isDefined === compressionCodec.isDefined) info.compressionCodec.foreach { codec => @@ -205,15 +185,16 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { } // Verify that all information is correctly parsed before stop() - val conf = getLoggingConf(logDirPath, compressionCodec) + val conf = getLoggingConf(testDirPath, compressionCodec) val eventLogger = new EventLoggingListener("test", conf) eventLogger.start() - var eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) + var eventLoggingInfo = EventLoggingListener.parseLoggingInfo( + new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS)) assertInfoCorrect(eventLoggingInfo, loggerStopped = false) // Verify that all information is correctly parsed after stop() eventLogger.stop() - eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) + eventLoggingInfo = EventLoggingListener.parseLoggingInfo(new Path(eventLogger.logPath)) assertInfoCorrect(eventLoggingInfo, loggerStopped = true) } @@ -224,7 +205,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { * exactly these two events are logged in the expected file. */ private def testEventLogging(compressionCodec: Option[String] = None) { - val conf = getLoggingConf(logDirPath, compressionCodec) + val conf = getLoggingConf(testDirPath, compressionCodec) val eventLogger = new EventLoggingListener("test", conf) val listenerBus = new LiveListenerBus val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", 125L, "Mickey") @@ -236,17 +217,17 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { listenerBus.addListener(eventLogger) listenerBus.postToAll(applicationStart) listenerBus.postToAll(applicationEnd) + eventLogger.stop() // Verify file contains exactly the two events logged - val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) - assert(eventLoggingInfo.logPaths.size > 0) - val lines = readFileLines(eventLoggingInfo.logPaths.head, eventLoggingInfo.compressionCodec) + val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(new Path(eventLogger.logPath)) + assert(eventLoggingInfo != null) + val lines = readFileLines(eventLoggingInfo.path, eventLoggingInfo.compressionCodec) assert(lines.size === 2) assert(lines(0).contains("SparkListenerApplicationStart")) assert(lines(1).contains("SparkListenerApplicationEnd")) assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === applicationStart) assert(JsonProtocol.sparkEventFromJson(parse(lines(1))) === applicationEnd) - eventLogger.stop() } /** @@ -254,12 +235,12 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { * This runs a simple Spark job and asserts that the expected events are logged when expected. */ private def testApplicationEventLogging(compressionCodec: Option[String] = None) { - val conf = getLoggingConf(logDirPath, compressionCodec) + val conf = getLoggingConf(testDirPath, compressionCodec) val sc = new SparkContext("local", "test", conf) assert(sc.eventLogger.isDefined) val eventLogger = sc.eventLogger.get - val expectedLogDir = logDirPath.toString - assert(eventLogger.logDir.startsWith(expectedLogDir)) + val expectedLogDir = testDir.getAbsolutePath() + assert(eventLogger.logPath.startsWith(expectedLogDir + "/")) // Begin listening for events that trigger asserts val eventExistenceListener = new EventExistenceListener(eventLogger) @@ -275,11 +256,15 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { /** * Assert that all of the specified events are logged by the given EventLoggingListener. + * + * This is done while the application is still running, so the log file contains the + * IN_PROGRESS suffix. */ private def assertEventsExist(eventLogger: EventLoggingListener, events: Seq[String]) { - val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) - assert(eventLoggingInfo.logPaths.size > 0) - val lines = readFileLines(eventLoggingInfo.logPaths.head, eventLoggingInfo.compressionCodec) + val eventLoggingInfo = EventLoggingListener.parseLoggingInfo( + new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS)) + assert(eventLoggingInfo != null) + val lines = readFileLines(eventLoggingInfo.path, eventLoggingInfo.compressionCodec) val eventSet = mutable.Set(events: _*) lines.foreach { line => eventSet.foreach { event => @@ -354,39 +339,6 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { } } - - /* -------------------------------------------------------- * - * Helper methods for validating state of the special files * - * -------------------------------------------------------- */ - - private def eventLogsExist(logFiles: Array[FileStatus]): Boolean = { - logFiles.map(_.getPath.getName).exists(EventLoggingListener.isEventLogFile) - } - - private def sparkVersionExists(logFiles: Array[FileStatus]): Boolean = { - logFiles.map(_.getPath.getName).exists(EventLoggingListener.isSparkVersionFile) - } - - private def compressionCodecExists(logFiles: Array[FileStatus]): Boolean = { - logFiles.map(_.getPath.getName).exists(EventLoggingListener.isCompressionCodecFile) - } - - private def applicationCompleteExists(logFiles: Array[FileStatus]): Boolean = { - logFiles.map(_.getPath.getName).exists(EventLoggingListener.isApplicationCompleteFile) - } - - private def assertSparkVersionIsValid(logFiles: Array[FileStatus]) { - val file = logFiles.map(_.getPath.getName).find(EventLoggingListener.isSparkVersionFile) - assert(file.isDefined) - assert(EventLoggingListener.parseSparkVersion(file.get) === SparkContext.SPARK_VERSION) - } - - private def assertCompressionCodecIsValid(logFiles: Array[FileStatus], compressionCodec: String) { - val file = logFiles.map(_.getPath.getName).find(EventLoggingListener.isCompressionCodecFile) - assert(file.isDefined) - assert(EventLoggingListener.parseCompressionCodec(file.get) === compressionCodec) - } - } diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index d81499ac6abef..eeaa05b0022ab 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.scheduler import java.io.{File, PrintWriter} import com.google.common.io.Files +import org.apache.hadoop.fs.Path import org.json4s.jackson.JsonMethods._ import org.scalatest.{BeforeAndAfter, FunSuite} @@ -89,7 +90,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart)))) writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd)))) writer.close() - val replayer = new ReplayListenerBus(Seq(logFilePath), fileSystem, codec) + val replayer = new ReplayListenerBus(logFilePath, fileSystem, codec) val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath, codecName) val eventMonster = new EventMonster(conf) replayer.addListener(eventMonster) @@ -108,6 +109,8 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { */ private def testApplicationReplay(codecName: Option[String] = None) { val logDirPath = Utils.getFilePath(testDir, "test-replay") + fileSystem.mkdirs(logDirPath) + val conf = EventLoggingListenerSuite.getLoggingConf(logDirPath, codecName) val sc = new SparkContext("local-cluster[2,1,512]", "Test replay", conf) @@ -122,16 +125,11 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { val codec = codecName.map(getCompressionCodec) val applications = fileSystem.listStatus(logDirPath) assert(applications != null && applications.size > 0) - val eventLogDir = applications.sortBy(_.getAccessTime).last - assert(eventLogDir.isDir) - val logFiles = fileSystem.listStatus(eventLogDir.getPath) - assert(logFiles != null && logFiles.size > 0) - val logFile = logFiles.find(_.getPath.getName.startsWith("EVENT_LOG_")) - assert(logFile.isDefined) - val logFilePath = logFile.get.getPath + val eventLog = applications.sortBy(_.getAccessTime).last + assert(eventLog.isFile) // Replay events - val replayer = new ReplayListenerBus(Seq(logFilePath), fileSystem, codec) + val replayer = new ReplayListenerBus(eventLog.getPath(), fileSystem, codec) val eventMonster = new EventMonster(conf) replayer.addListener(eventMonster) replayer.replay() @@ -156,7 +154,9 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { * log the events. */ private class EventMonster(conf: SparkConf) extends EventLoggingListener("test", conf) { - logger.close() + + override def start() { } + } private def getCompressionCodec(codecName: String) = { diff --git a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala deleted file mode 100644 index 44332fc8dbc23..0000000000000 --- a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import java.io.{File, IOException} - -import scala.io.Source - -import com.google.common.io.Files -import org.apache.hadoop.fs.Path -import org.scalatest.{BeforeAndAfter, FunSuite} - -import org.apache.spark.SparkConf -import org.apache.spark.io.CompressionCodec - -/** - * Test writing files through the FileLogger. - */ -class FileLoggerSuite extends FunSuite with BeforeAndAfter { - private val fileSystem = Utils.getHadoopFileSystem("/") - private val allCompressionCodecs = Seq[String]( - "org.apache.spark.io.LZFCompressionCodec", - "org.apache.spark.io.SnappyCompressionCodec" - ) - private var testDir: File = _ - private var logDirPath: Path = _ - private var logDirPathString: String = _ - - before { - testDir = Files.createTempDir() - logDirPath = Utils.getFilePath(testDir, "test-file-logger") - logDirPathString = logDirPath.toString - } - - after { - Utils.deleteRecursively(testDir) - } - - test("Simple logging") { - testSingleFile() - } - - test ("Simple logging with compression") { - allCompressionCodecs.foreach { codec => - testSingleFile(Some(codec)) - } - } - - test("Logging multiple files") { - testMultipleFiles() - } - - test("Logging multiple files with compression") { - allCompressionCodecs.foreach { codec => - testMultipleFiles(Some(codec)) - } - } - - test("Logging when directory already exists") { - // Create the logging directory multiple times - new FileLogger(logDirPathString, new SparkConf, overwrite = true).start() - new FileLogger(logDirPathString, new SparkConf, overwrite = true).start() - new FileLogger(logDirPathString, new SparkConf, overwrite = true).start() - - // If overwrite is not enabled, an exception should be thrown - intercept[IOException] { - new FileLogger(logDirPathString, new SparkConf, overwrite = false).start() - } - } - - - /* ----------------- * - * Actual test logic * - * ----------------- */ - - /** - * Test logging to a single file. - */ - private def testSingleFile(codecName: Option[String] = None) { - val conf = getLoggingConf(codecName) - val codec = codecName.map { c => CompressionCodec.createCodec(conf) } - val logger = - if (codecName.isDefined) { - new FileLogger(logDirPathString, conf, compress = true) - } else { - new FileLogger(logDirPathString, conf) - } - logger.start() - assert(fileSystem.exists(logDirPath)) - assert(fileSystem.getFileStatus(logDirPath).isDir) - assert(fileSystem.listStatus(logDirPath).size === 0) - - logger.newFile() - val files = fileSystem.listStatus(logDirPath) - assert(files.size === 1) - val firstFile = files.head - val firstFilePath = firstFile.getPath - - logger.log("hello") - logger.flush() - assert(readFileContent(firstFilePath, codec) === "hello") - - logger.log(" world") - logger.close() - assert(readFileContent(firstFilePath, codec) === "hello world") - } - - /** - * Test logging to multiple files. - */ - private def testMultipleFiles(codecName: Option[String] = None) { - val conf = getLoggingConf(codecName) - val codec = codecName.map { c => CompressionCodec.createCodec(conf) } - val logger = - if (codecName.isDefined) { - new FileLogger(logDirPathString, conf, compress = true) - } else { - new FileLogger(logDirPathString, conf) - } - logger.start() - logger.newFile("Jean_Valjean") - logger.logLine("Who am I?") - logger.logLine("Destiny?") - logger.newFile("John_Valjohn") - logger.logLine("One") - logger.logLine("Two three...") - logger.newFile("Wolverine") - logger.logLine("There was a time") - logger.logLine("A time when our enemies knew honor.") - logger.close() - assert(readFileContent(new Path(logDirPath, "Jean_Valjean"), codec) === "Who am I?\nDestiny?") - assert(readFileContent(new Path(logDirPath, "John_Valjohn"), codec) === "One\nTwo three...") - assert(readFileContent(new Path(logDirPath, "Wolverine"), codec) === - "There was a time\nA time when our enemies knew honor.") - } - - /** - * Read the content of the file specified by the given path. - * If a compression codec is specified, use it to read the file. - */ - private def readFileContent(logPath: Path, codec: Option[CompressionCodec] = None): String = { - val fstream = fileSystem.open(logPath) - val cstream = codec.map(_.compressedInputStream(fstream)).getOrElse(fstream) - Source.fromInputStream(cstream).getLines().mkString("\n") - } - - private def getLoggingConf(codecName: Option[String]) = { - val conf = new SparkConf - codecName.foreach { c => conf.set("spark.io.compression.codec", c) } - conf - } - -} From 9dcfab975d7f88d4ebb8033dab4b6db835fab91c Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 23 May 2014 13:12:13 -0700 Subject: [PATCH 12/12] Make history server parse old-style log directories. Spark 1.0 will generate log directories instead of single log files for applications; so it's nice to have the history server understand both styles. --- .../deploy/history/FsHistoryProvider.scala | 107 +++++++++++-- .../history/FsHistoryProviderSuite.scala | 150 ++++++++++++++++++ 2 files changed, 248 insertions(+), 9 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index e6717b1010908..afd2e83ccb519 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -25,6 +25,7 @@ import scala.collection.mutable import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.io.CompressionCodec import org.apache.spark.scheduler._ import org.apache.spark.ui.SparkUI import org.apache.spark.util.Utils @@ -44,6 +45,12 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis // List of applications, in order from newest to oldest. private val appList = new AtomicReference[Seq[ApplicationHistoryInfo]](Nil) + // Constants used to parse Spark 1.0.0 log directories. + private[history] val LOG_PREFIX = "EVENT_LOG_" + private[history] val SPARK_VERSION_PREFIX = "SPARK_VERSION_" + private[history] val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_" + private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE" + /** * A background thread that periodically checks for event log updates on disk. * @@ -83,8 +90,12 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } checkForLogs() - logCheckingThread.setDaemon(true) - logCheckingThread.start() + + // Treat 0 as "disable the background thread", mostly for testing. + if (UPDATE_INTERVAL_MS > 0) { + logCheckingThread.setDaemon(true) + logCheckingThread.start() + } } override def getListing(offset: Int, count: Int) = { @@ -107,14 +118,24 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis * Tries to reuse as much of the data already in memory as possible, but not reading * applications that hasn't been updated since last time the logs were checked. */ - def checkForLogs() = synchronized { + private[history] def checkForLogs() = synchronized { lastLogCheckTimeMs = getMonotonicTime() logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs)) try { val matcher = EventLoggingListener.LOG_FILE_NAME_REGEX val logInfos = fs.listStatus(new Path(logDir)) - .filter { - entry => entry.isFile() && matcher.pattern.matcher(entry.getPath()getName()).matches() } + .filter { entry => + if (entry.isDir()) { + fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE)) + } else { + try { + val matcher(version, codecName, inprogress) = entry.getPath().getName() + inprogress == null + } catch { + case e: Exception => false + } + } + } var currentApps = Map[String, ApplicationHistoryInfo]( appList.get().map(app => (app.id -> app)):_*) @@ -127,7 +148,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis val curr = currentApps.getOrElse(log.getPath().getName(), null) if (curr == null || curr.lastUpdated < log.getModificationTime()) { try { - newApps += loadAppInfo(log, false) + val info = loadAppInfo(log, false) + if (info != null) { + newApps += info + } } catch { case e: Exception => logError(s"Failed to load app info from directory $log.") } @@ -147,9 +171,29 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis * listing page. */ private def loadAppInfo(log: FileStatus, renderUI: Boolean): ApplicationHistoryInfo = { - val elogInfo = EventLoggingListener.parseLoggingInfo(log.getPath()) + val elogInfo = if (log.isFile()) { + EventLoggingListener.parseLoggingInfo(log.getPath()) + } else { + loadOldLoggingInfo(log.getPath()) + } + + if (elogInfo == null) { + return null + } + + + val (logFile, lastUpdated) = if (log.isFile()) { + (elogInfo.path, log.getModificationTime()) + } else { + // For old-style log directories, need to find the actual log file. + val status = fs.listStatus(elogInfo.path) + .filter(e => e.getPath().getName().startsWith(LOG_PREFIX))(0) + (status.getPath(), status.getModificationTime()) + } + val appId = elogInfo.path.getName - val replayBus = new ReplayListenerBus(elogInfo.path, fs, elogInfo.compressionCodec) + + val replayBus = new ReplayListenerBus(logFile, fs, elogInfo.compressionCodec) val appListener = new ApplicationEventListener replayBus.addListener(appListener) @@ -167,7 +211,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis val sparkUser = appListener.sparkUser val startTime = appListener.startTime val endTime = appListener.endTime - val lastUpdated = log.getModificationTime() ApplicationHistoryInfo(appId, appListener.appName, appListener.startTime, @@ -178,6 +221,52 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis ui) } + /** + * Load the app log information from a Spark 1.0.0 log directory, for backwards compatibility. + * This assumes that the log directory contains a single event log file, which is the case for + * directories generated by the code in that release. + */ + private[history] def loadOldLoggingInfo(dir: Path): EventLoggingInfo = { + val children = fs.listStatus(dir) + var eventLogPath: Path = null + var sparkVersion: String = null + var codecName: String = null + var applicationCompleted: Boolean = false + + children.foreach(child => child.getPath().getName() match { + case name if name.startsWith(LOG_PREFIX) => + eventLogPath = child.getPath() + + case ver if ver.startsWith(SPARK_VERSION_PREFIX) => + sparkVersion = ver.substring(SPARK_VERSION_PREFIX.length()) + + case codec if codec.startsWith(COMPRESSION_CODEC_PREFIX) => + codecName = codec.substring(COMPRESSION_CODEC_PREFIX.length()) + + case complete if complete == APPLICATION_COMPLETE => + applicationCompleted = true + + case _ => + }) + + val codec = try { + if (codecName != null) { + Some(CompressionCodec.createCodec(conf, codecName)) + } else None + } catch { + case e: Exception => + logError(s"Unknown compression codec $codecName.") + return null + } + + if (eventLogPath == null || sparkVersion == null) { + logInfo(s"$dir is not a Spark application log directory.") + return null + } + + EventLoggingInfo(dir, sparkVersion, codec, applicationCompleted) + } + /** Returns the system's mononotically increasing time. */ private def getMonotonicTime() = System.nanoTime() / (1000 * 1000) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala new file mode 100644 index 0000000000000..ca47671a62452 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.{File, FileOutputStream, OutputStreamWriter} + +import com.google.common.io.Files +import org.apache.hadoop.fs.Path +import org.json4s.jackson.JsonMethods._ +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.matchers.ShouldMatchers + +import org.apache.spark.SparkConf +import org.apache.spark.io._ +import org.apache.spark.scheduler._ +import org.apache.spark.util.{JsonProtocol, Utils} + +class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with ShouldMatchers { + + private var testDir: File = null + + private var provider: FsHistoryProvider = null + + before { + testDir = Files.createTempDir() + provider = new FsHistoryProvider(new SparkConf() + .set("spark.history.fs.logDirectory", testDir.getAbsolutePath()) + .set("spark.history.fs.updateInterval", "0")) + } + + after { + Utils.getHadoopFileSystem("/").delete(new Path(testDir.getAbsolutePath()), true) + } + + test("Parse new and old application logs") { + val conf = new SparkConf() + .set("spark.history.fs.logDirectory", testDir.getAbsolutePath()) + .set("spark.history.fs.updateInterval", "0") + val provider = new FsHistoryProvider(conf) + + // Write a new-style application log. + val logFile1 = new File(testDir, "app1-1-1.0") + writeFile(logFile1, + SparkListenerApplicationStart("app1", 1L, "test"), + SparkListenerApplicationEnd(2L) + ) + + // Write an unfinished app, new-style. + writeFile(new File(testDir, "app2-1-1.0.inprogress"), + SparkListenerApplicationStart("app2", 1L, "test") + ) + + // Wait so that the files' modification times will be different, ensuring the ordering + // the test expects. + Thread.sleep(1000) + + // Write an old-style application log. + val oldLog = new File(testDir, "app3-1234") + oldLog.mkdir() + writeFile(new File(oldLog, provider.SPARK_VERSION_PREFIX + "1.0")) + writeFile(new File(oldLog, provider.LOG_PREFIX + "1"), + SparkListenerApplicationStart("app3", 2L, "test"), + SparkListenerApplicationEnd(3L) + ) + writeFile(new File(oldLog, provider.APPLICATION_COMPLETE)) + + // Write an unfinished app, old-style. + val oldLog2 = new File(testDir, "app4-1234") + oldLog2.mkdir() + writeFile(new File(oldLog2, provider.SPARK_VERSION_PREFIX + "1.0")) + writeFile(new File(oldLog2, provider.LOG_PREFIX + "1"), + SparkListenerApplicationStart("app2", 2L, "test") + ) + + // Force a reload of data from the log directory, and check that both logs are loaded. + // Take the opportunity to check that the offset checks work as expected. + provider.checkForLogs() + + val (list, offset, count) = provider.getListing(4, 100) + list should not be (null) + offset should be (0) + count should be (2) + + list(0) should be (ApplicationHistoryInfo(oldLog.getName(), "app3", 2L, 3L, + oldLog.lastModified(), "test", null, null)) + list(1) should be (ApplicationHistoryInfo(logFile1.getName(), "app1", 1L, 2L, + logFile1.lastModified(), "test", null, null)) + + // Make sure the UI can be rendered. + list.foreach { case info => + val fullInfo = provider.getAppInfo(info.id) + fullInfo should not be null + fullInfo.ui should not be null + } + } + + test("Parse logs with compression codec set") { + val testCodecs = List((classOf[LZFCompressionCodec].getName(), true), + (classOf[SnappyCompressionCodec].getName(), true), + ("invalid.codec", false)) + + testCodecs.foreach { case (codec, valid) => + val logDir = new File(testDir, "test") + logDir.mkdir() + writeFile(new File(logDir, provider.SPARK_VERSION_PREFIX + "1.0")) + writeFile(new File(logDir, provider.LOG_PREFIX + "1"), + SparkListenerApplicationStart("app2", 2L, "test"), + SparkListenerApplicationEnd(3L) + ) + writeFile(new File(logDir, provider.COMPRESSION_CODEC_PREFIX + codec)) + + val logPath = new Path(logDir.getAbsolutePath()) + val info = provider.loadOldLoggingInfo(logPath) + if (valid) { + info.path.toUri().getPath() should be (logPath.toUri().getPath()) + info.sparkVersion should be ("1.0") + info.compressionCodec should not be (None) + info.compressionCodec.get.getClass().getName() should be (codec) + info.applicationComplete should be (false) + } else { + info should be (null) + } + } + } + + private def writeFile(file: File, events: SparkListenerEvent*) = { + val out = new OutputStreamWriter(new FileOutputStream(file), "UTF-8") + try { + events.foreach(e => out.write(compact(render(JsonProtocol.sparkEventToJson(e))) + "\n")) + } finally { + out.close() + } + } + +}