diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index 86305d2ea8a09..3607b9a3706c4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -24,7 +24,7 @@ private[spark] class ApplicationDescription( val command: Command, val sparkHome: Option[String], var appUiUrl: String, - val eventLogDir: Option[String] = None) + val eventLogFile: Option[String] = None) extends Serializable { val user = System.getProperty("user.name", "") diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala new file mode 100644 index 0000000000000..afd2e83ccb519 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.FileNotFoundException +import java.util.concurrent.atomic.AtomicReference + +import scala.collection.mutable + +import org.apache.hadoop.fs.{FileStatus, Path} + +import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.io.CompressionCodec +import org.apache.spark.scheduler._ +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.Utils + +private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider + with Logging { + + // Interval between each check for event log updates + private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval", 10) * 1000 + + private val logDir = conf.get("spark.history.fs.logDirectory") + private val fs = Utils.getHadoopFileSystem(logDir) + + // A timestamp of when the disk was last accessed to check for log updates + private var lastLogCheckTimeMs = -1L + + // List of applications, in order from newest to oldest. + private val appList = new AtomicReference[Seq[ApplicationHistoryInfo]](Nil) + + // Constants used to parse Spark 1.0.0 log directories. + private[history] val LOG_PREFIX = "EVENT_LOG_" + private[history] val SPARK_VERSION_PREFIX = "SPARK_VERSION_" + private[history] val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_" + private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE" + + /** + * A background thread that periodically checks for event log updates on disk. + * + * If a log check is invoked manually in the middle of a period, this thread re-adjusts the + * time at which it performs the next log check to maintain the same period as before. + * + * TODO: Add a mechanism to update manually. + */ + private val logCheckingThread = new Thread("LogCheckingThread") { + override def run() = Utils.logUncaughtExceptions { + while (true) { + val now = getMonotonicTime() + if (now - lastLogCheckTimeMs > UPDATE_INTERVAL_MS) { + Thread.sleep(UPDATE_INTERVAL_MS) + } else { + // If the user has manually checked for logs recently, wait until + // UPDATE_INTERVAL_MS after the last check time + Thread.sleep(lastLogCheckTimeMs + UPDATE_INTERVAL_MS - now) + } + checkForLogs() + } + } + } + + initialize() + + private def initialize() { + // Validate the log directory. + val path = new Path(logDir) + if (!fs.exists(path)) { + throw new IllegalArgumentException( + "Logging directory specified does not exist: %s".format(logDir)) + } + if (!fs.getFileStatus(path).isDir) { + throw new IllegalArgumentException( + "Logging directory specified is not a directory: %s".format(logDir)) + } + + checkForLogs() + + // Treat 0 as "disable the background thread", mostly for testing. + if (UPDATE_INTERVAL_MS > 0) { + logCheckingThread.setDaemon(true) + logCheckingThread.start() + } + } + + override def getListing(offset: Int, count: Int) = { + val list = appList.get() + val theOffset = if (offset < list.size) offset else 0 + (list.slice(theOffset, Math.min(theOffset + count, list.size)), theOffset, list.size) + } + + override def getAppInfo(appId: String): ApplicationHistoryInfo = { + try { + val appLogDir = fs.getFileStatus(new Path(logDir, appId)) + loadAppInfo(appLogDir, true) + } catch { + case e: FileNotFoundException => null + } + } + + /** + * Builds the application list based on the current contents of the log directory. + * Tries to reuse as much of the data already in memory as possible, but not reading + * applications that hasn't been updated since last time the logs were checked. + */ + private[history] def checkForLogs() = synchronized { + lastLogCheckTimeMs = getMonotonicTime() + logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs)) + try { + val matcher = EventLoggingListener.LOG_FILE_NAME_REGEX + val logInfos = fs.listStatus(new Path(logDir)) + .filter { entry => + if (entry.isDir()) { + fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE)) + } else { + try { + val matcher(version, codecName, inprogress) = entry.getPath().getName() + inprogress == null + } catch { + case e: Exception => false + } + } + } + + var currentApps = Map[String, ApplicationHistoryInfo]( + appList.get().map(app => (app.id -> app)):_*) + + // For any application that either (i) is not listed or (ii) has changed since the last time + // the listing was created (defined by the log dir's modification time), load the app's info. + // Otherwise just reuse what's already in memory. + val newApps = new mutable.ListBuffer[ApplicationHistoryInfo] + for (log <- logInfos) { + val curr = currentApps.getOrElse(log.getPath().getName(), null) + if (curr == null || curr.lastUpdated < log.getModificationTime()) { + try { + val info = loadAppInfo(log, false) + if (info != null) { + newApps += info + } + } catch { + case e: Exception => logError(s"Failed to load app info from directory $log.") + } + } else { + newApps += curr + } + } + + appList.set(newApps.sortBy { info => -info.lastUpdated }) + } catch { + case t: Throwable => logError("Exception in checking for event log updates", t) + } + } + + /** + * Parse the application's logs to find out the information we need to build the + * listing page. + */ + private def loadAppInfo(log: FileStatus, renderUI: Boolean): ApplicationHistoryInfo = { + val elogInfo = if (log.isFile()) { + EventLoggingListener.parseLoggingInfo(log.getPath()) + } else { + loadOldLoggingInfo(log.getPath()) + } + + if (elogInfo == null) { + return null + } + + + val (logFile, lastUpdated) = if (log.isFile()) { + (elogInfo.path, log.getModificationTime()) + } else { + // For old-style log directories, need to find the actual log file. + val status = fs.listStatus(elogInfo.path) + .filter(e => e.getPath().getName().startsWith(LOG_PREFIX))(0) + (status.getPath(), status.getModificationTime()) + } + + val appId = elogInfo.path.getName + + val replayBus = new ReplayListenerBus(logFile, fs, elogInfo.compressionCodec) + val appListener = new ApplicationEventListener + replayBus.addListener(appListener) + + val ui: SparkUI = if (renderUI) { + val conf = this.conf.clone() + val appSecManager = new SecurityManager(conf) + new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId) + // Do not call ui.bind() to avoid creating a new server for each application + } else { + null + } + + replayBus.replay() + val appName = appListener.appName + val sparkUser = appListener.sparkUser + val startTime = appListener.startTime + val endTime = appListener.endTime + ApplicationHistoryInfo(appId, + appListener.appName, + appListener.startTime, + appListener.endTime, + lastUpdated, + appListener.sparkUser, + if (renderUI) appListener.viewAcls else null, + ui) + } + + /** + * Load the app log information from a Spark 1.0.0 log directory, for backwards compatibility. + * This assumes that the log directory contains a single event log file, which is the case for + * directories generated by the code in that release. + */ + private[history] def loadOldLoggingInfo(dir: Path): EventLoggingInfo = { + val children = fs.listStatus(dir) + var eventLogPath: Path = null + var sparkVersion: String = null + var codecName: String = null + var applicationCompleted: Boolean = false + + children.foreach(child => child.getPath().getName() match { + case name if name.startsWith(LOG_PREFIX) => + eventLogPath = child.getPath() + + case ver if ver.startsWith(SPARK_VERSION_PREFIX) => + sparkVersion = ver.substring(SPARK_VERSION_PREFIX.length()) + + case codec if codec.startsWith(COMPRESSION_CODEC_PREFIX) => + codecName = codec.substring(COMPRESSION_CODEC_PREFIX.length()) + + case complete if complete == APPLICATION_COMPLETE => + applicationCompleted = true + + case _ => + }) + + val codec = try { + if (codecName != null) { + Some(CompressionCodec.createCodec(conf, codecName)) + } else None + } catch { + case e: Exception => + logError(s"Unknown compression codec $codecName.") + return null + } + + if (eventLogPath == null || sparkVersion == null) { + logInfo(s"$dir is not a Spark application log directory.") + return null + } + + EventLoggingInfo(dir, sparkVersion, codec, applicationCompleted) + } + + /** Returns the system's mononotically increasing time. */ + private def getMonotonicTime() = System.nanoTime() / (1000 * 1000) + +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index 180c853ce3096..7f1a9fe6f7d53 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -25,20 +25,28 @@ import org.apache.spark.ui.{WebUIPage, UIUtils} private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { + val pageSize = 20 + def render(request: HttpServletRequest): Seq[Node] = { - val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated } - val appTable = UIUtils.listingTable(appHeader, appRow, appRows) + val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt + val requestedFirst = (requestedPage - 1) * pageSize + val (apps, actualFirst, totalCount) = parent.getApplicationList(requestedFirst, pageSize) + val actualPage = (actualFirst / pageSize) + 1 + val last = Math.min(actualFirst + pageSize, totalCount) - 1 + val pageCount = totalCount / pageSize + (if (totalCount % pageSize > 0) 1 else 0) + + val appTable = UIUtils.listingTable(appHeader, appRow, apps) val content =
-
    -
  • Event Log Location: {parent.baseLogDir}
  • -
{ - if (parent.appIdToInfo.size > 0) { + if (totalCount > 0) {

- Showing {parent.appIdToInfo.size}/{parent.getNumApplications} - Completed Application{if (parent.getNumApplications > 1) "s" else ""} + Showing {actualFirst + 1}-{last + 1} of {totalCount} + + {if (actualPage > 1) <} + {if (actualPage < pageCount) >} +

++ appTable } else { @@ -56,18 +64,16 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { "Completed", "Duration", "Spark User", - "Log Directory", "Last Updated") private def appRow(info: ApplicationHistoryInfo): Seq[Node] = { - val appName = if (info.started) info.name else info.logDirPath.getName - val uiAddress = parent.getAddress + info.ui.basePath + val appName = if (info.started) info.name else info.id + val uiAddress = "/history/" + info.id val startTime = if (info.started) UIUtils.formatDate(info.startTime) else "Not started" val endTime = if (info.completed) UIUtils.formatDate(info.endTime) else "Not completed" val difference = if (info.started && info.completed) info.endTime - info.startTime else -1L val duration = if (difference > 0) UIUtils.formatDuration(difference) else "---" val sparkUser = if (info.started) info.sparkUser else "Unknown user" - val logDirectory = info.logDirPath.getName val lastUpdated = UIUtils.formatDate(info.lastUpdated) {appName} @@ -75,7 +81,6 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { {endTime} {duration} {sparkUser} - {logDirectory} {lastUpdated} } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index a9c11dca5678e..73b827d8b6055 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -17,14 +17,15 @@ package org.apache.spark.deploy.history -import scala.collection.mutable +import java.util.NoSuchElementException +import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} -import org.apache.hadoop.fs.{FileStatus, Path} +import com.google.common.cache._ +import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder} import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.scheduler._ -import org.apache.spark.ui.{WebUI, SparkUI} +import org.apache.spark.ui.{WebUI, SparkUI, UIUtils} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.Utils @@ -38,56 +39,75 @@ import org.apache.spark.util.Utils * application's event logs are maintained in the application's own sub-directory. This * is the same structure as maintained in the event log write code path in * EventLoggingListener. - * - * @param baseLogDir The base directory in which event logs are found */ class HistoryServer( - val baseLogDir: String, + conf: SparkConf, + provider: ApplicationHistoryProvider, securityManager: SecurityManager, - conf: SparkConf) - extends WebUI(securityManager, HistoryServer.WEB_UI_PORT, conf) with Logging { + port: Int) + extends WebUI(securityManager, port, conf) with Logging { + + // How many applications to retain + private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50) - import HistoryServer._ + // set whether to enable or disable view acls for all applications + private val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) - private val fileSystem = Utils.getHadoopFileSystem(baseLogDir) private val localHost = Utils.localHostName() private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost) - // A timestamp of when the disk was last accessed to check for log updates - private var lastLogCheckTime = -1L + private val appLoader = new CacheLoader[String, SparkUI] { + override def load(key: String): SparkUI = { + val info = provider.getAppInfo(key) + if (info == null) { + throw new NoSuchElementException() + } + info.ui.getSecurityManager.setUIAcls(uiAclsEnabled) + info.ui.getSecurityManager.setViewAcls(info.sparkUser, info.viewAcls) + attachSparkUI(info.ui) + info.ui + } + } - // Number of completed applications found in this directory - private var numCompletedApplications = 0 + private val appCache = CacheBuilder.newBuilder() + .maximumSize(retainedApplications) + .removalListener(new RemovalListener[String, SparkUI] { + override def onRemoval(rm: RemovalNotification[String, SparkUI]) = { + detachSparkUI(rm.getValue()) + } + }) + .build(appLoader) + + private val loaderServlet = new HttpServlet { + protected override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = { + val parts = req.getPathInfo().split("/") + if (parts.length < 2) { + res.setStatus(HttpServletResponse.SC_BAD_REQUEST) + return + } - @volatile private var stopped = false + var appId = parts(1) - /** - * A background thread that periodically checks for event log updates on disk. - * - * If a log check is invoked manually in the middle of a period, this thread re-adjusts the - * time at which it performs the next log check to maintain the same period as before. - * - * TODO: Add a mechanism to update manually. - */ - private val logCheckingThread = new Thread { - override def run(): Unit = Utils.logUncaughtExceptions { - while (!stopped) { - val now = System.currentTimeMillis - if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) { - checkForLogs() - Thread.sleep(UPDATE_INTERVAL_MS) - } else { - // If the user has manually checked for logs recently, wait until - // UPDATE_INTERVAL_MS after the last check time - Thread.sleep(lastLogCheckTime + UPDATE_INTERVAL_MS - now) + // Note we don't use the UI retrieved from the cache; the cache loader above will register + // the app's UI, and all we need to do is redirect the user to the same URI that was + // requested, and the proper data should be served at that point. + try { + appCache.get(appId) + res.sendRedirect(res.encodeRedirectURL(req.getRequestURI())) + } catch { + case e: Exception => e.getCause() match { + case nsee: NoSuchElementException => + val msg =
Application {appId} not found.
+ res.setStatus(HttpServletResponse.SC_NOT_FOUND) + UIUtils.basicSparkPage(msg, "Not Found").foreach( + n => res.getWriter().write(n.toString)) + + case cause: Exception => throw cause } } } } - // A mapping of application ID to its history information, which includes the rendered UI - val appIdToInfo = mutable.HashMap[String, ApplicationHistoryInfo]() - initialize() /** @@ -98,108 +118,23 @@ class HistoryServer( */ def initialize() { attachPage(new HistoryPage(this)) - attachHandler(createStaticHandler(STATIC_RESOURCE_DIR, "/static")) + attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) + + val contextHandler = new ServletContextHandler + contextHandler.setContextPath("/history") + contextHandler.addServlet(new ServletHolder(loaderServlet), "/*") + attachHandler(contextHandler) } /** Bind to the HTTP server behind this web interface. */ override def bind() { super.bind() - logCheckingThread.start() - } - - /** - * Check for any updates to event logs in the base directory. This is only effective once - * the server has been bound. - * - * If a new completed application is found, the server renders the associated SparkUI - * from the application's event logs, attaches this UI to itself, and stores metadata - * information for this application. - * - * If the logs for an existing completed application are no longer found, the server - * removes all associated information and detaches the SparkUI. - */ - def checkForLogs() = synchronized { - if (serverInfo.isDefined) { - lastLogCheckTime = System.currentTimeMillis - logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTime)) - try { - val logStatus = fileSystem.listStatus(new Path(baseLogDir)) - val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]() - val logInfos = logDirs - .sortBy { dir => getModificationTime(dir) } - .map { dir => (dir, EventLoggingListener.parseLoggingInfo(dir.getPath, fileSystem)) } - .filter { case (dir, info) => info.applicationComplete } - - // Logging information for applications that should be retained - val retainedLogInfos = logInfos.takeRight(RETAINED_APPLICATIONS) - val retainedAppIds = retainedLogInfos.map { case (dir, _) => dir.getPath.getName } - - // Remove any applications that should no longer be retained - appIdToInfo.foreach { case (appId, info) => - if (!retainedAppIds.contains(appId)) { - detachSparkUI(info.ui) - appIdToInfo.remove(appId) - } - } - - // Render the application's UI if it is not already there - retainedLogInfos.foreach { case (dir, info) => - val appId = dir.getPath.getName - if (!appIdToInfo.contains(appId)) { - renderSparkUI(dir, info) - } - } - - // Track the total number of completed applications observed this round - numCompletedApplications = logInfos.size - - } catch { - case e: Exception => logError("Exception in checking for event log updates", e) - } - } else { - logWarning("Attempted to check for event log updates before binding the server.") - } - } - - /** - * Render a new SparkUI from the event logs if the associated application is completed. - * - * HistoryServer looks for a special file that indicates application completion in the given - * directory. If this file exists, the associated application is regarded to be completed, in - * which case the server proceeds to render the SparkUI. Otherwise, the server does nothing. - */ - private def renderSparkUI(logDir: FileStatus, elogInfo: EventLoggingInfo) { - val path = logDir.getPath - val appId = path.getName - val replayBus = new ReplayListenerBus(elogInfo.logPaths, fileSystem, elogInfo.compressionCodec) - val appListener = new ApplicationEventListener - replayBus.addListener(appListener) - val appConf = conf.clone() - val appSecManager = new SecurityManager(appConf) - val ui = new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId) - - // Do not call ui.bind() to avoid creating a new server for each application - replayBus.replay() - if (appListener.applicationStarted) { - appSecManager.setUIAcls(HISTORY_UI_ACLS_ENABLED) - appSecManager.setViewAcls(appListener.sparkUser, appListener.viewAcls) - attachSparkUI(ui) - val appName = appListener.appName - val sparkUser = appListener.sparkUser - val startTime = appListener.startTime - val endTime = appListener.endTime - val lastUpdated = getModificationTime(logDir) - ui.setAppName(appName + " (completed)") - appIdToInfo(appId) = ApplicationHistoryInfo(appId, appName, startTime, endTime, - lastUpdated, sparkUser, path, ui) - } } /** Stop the server and close the file system. */ override def stop() { super.stop() - stopped = true - fileSystem.close() + provider.stop() } /** Attach a reconstructed UI to this server. Only valid after bind(). */ @@ -215,27 +150,18 @@ class HistoryServer( ui.getHandlers.foreach(detachHandler) } - /** Return the address of this server. */ - def getAddress: String = "http://" + publicHost + ":" + boundPort - - /** Return the number of completed applications found, whether or not the UI is rendered. */ - def getNumApplications: Int = numCompletedApplications - - /** Return when this directory was last modified. */ - private def getModificationTime(dir: FileStatus): Long = { - try { - val logFiles = fileSystem.listStatus(dir.getPath) - if (logFiles != null && !logFiles.isEmpty) { - logFiles.map(_.getModificationTime).max - } else { - dir.getModificationTime - } - } catch { - case e: Exception => - logError("Exception in accessing modification time of %s".format(dir.getPath), e) - -1L - } + /** + * Returns a list of available applications, in descending order according to their last + * updated time. + * + * @param offset Starting offset for returned objects. + * @param count Max number of objects to return. + * @return 3-tuple (requested app list, adjusted offset, count of all available apps) + */ + def getApplicationList(offset: Int, count: Int) = { + provider.getListing(offset, count) } + } /** @@ -251,30 +177,31 @@ class HistoryServer( object HistoryServer { private val conf = new SparkConf - // Interval between each check for event log updates - val UPDATE_INTERVAL_MS = conf.getInt("spark.history.updateInterval", 10) * 1000 - - // How many applications to retain - val RETAINED_APPLICATIONS = conf.getInt("spark.history.retainedApplications", 250) - - // The port to which the web UI is bound - val WEB_UI_PORT = conf.getInt("spark.history.ui.port", 18080) - - // set whether to enable or disable view acls for all applications - val HISTORY_UI_ACLS_ENABLED = conf.getBoolean("spark.history.ui.acls.enable", false) - - val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR - def main(argStrings: Array[String]) { initSecurity() - val args = new HistoryServerArguments(argStrings) + parse(argStrings.toList) val securityManager = new SecurityManager(conf) - val server = new HistoryServer(args.logDir, securityManager, conf) + + val providerName = conf.getOption("spark.history.provider") + .getOrElse(classOf[FsHistoryProvider].getName()) + val provider = Class.forName(providerName) + .getConstructor(classOf[SparkConf]) + .newInstance(conf) + .asInstanceOf[ApplicationHistoryProvider] + + val port = conf.getInt("spark.history.ui.port", 18080) + + val server = new HistoryServer(conf, provider, securityManager, port) server.bind() + Runtime.getRuntime().addShutdownHook(new Thread("HistoryServerStopper") { + override def run() = { + server.stop() + } + }) + // Wait until the end of the world... or if the HistoryServer process is manually stopped while(true) { Thread.sleep(Int.MaxValue) } - server.stop() } def initSecurity() { @@ -290,8 +217,88 @@ object HistoryServer { } } + private def parse(args: List[String]): Unit = { + args match { + case ("--dir" | "-d") :: value :: tail => + set("fs.logDirectory", value) + parse(tail) + + case ("--port" | "-p") :: value :: tail => + set("ui.port", value) + parse(tail) + + case ("-D") :: opt :: value :: tail => + set(opt, value) + parse(tail) + + case ("--help" | "-h") :: tail => + printUsageAndExit(0) + + case Nil => + + case _ => + printUsageAndExit(1) + } + } + + private def set(name: String, value: String) = { + conf.set("spark.history." + name, value) + } + + private def printUsageAndExit(exitCode: Int) { + System.err.println( + """ + |Usage: HistoryServer [options] + | + |Options are set by passing "-D option value" command line arguments to the class. + |Command line options will override the Spark configuration file and system properties. + |History Server options are always available; additional options depend on the provider. + | + |History Server options: + | + | ui.port Port where server will listen for connections (default 18080) + | ui.acls.enable Whether to enable view acls for all applications (default false) + | provider Name of history provider class (defaults to file system-based provider) + | + |FsHistoryProvider options: + | + | fs.logDirectory Directory where app logs are stored (required) + | fs.updateInterval How often to reload log data from storage (seconds, default 10) + |""".stripMargin) + System.exit(exitCode) + } + } +private[spark] abstract class ApplicationHistoryProvider { + + /** + * This method should return a list of applications available for the history server to + * show. The listing is assumed to be in descending time order. + * + * An adjusted offset should be returned if the app list has changed and the request + * references an invalid start offset. Otherwise, the provided offset should be returned. + * + * @param offset Starting offset for returned objects. + * @param count Max number of objects to return. + * @return 3-tuple (requested app list, adjusted offset, count of all available apps) + */ + def getListing(offset: Int, count: Int): (Seq[ApplicationHistoryInfo], Int, Int) + + /** + * This method should return the application information, including a rendered SparkUI. + * + * @param appId The application ID. + * @return The app info, or null if not found. + */ + def getAppInfo(appId: String): ApplicationHistoryInfo + + /** + * Called when the server is shutting down. + */ + def stop(): Unit = { } + +} private[spark] case class ApplicationHistoryInfo( id: String, @@ -300,7 +307,7 @@ private[spark] case class ApplicationHistoryInfo( endTime: Long, lastUpdated: Long, sparkUser: String, - logDirPath: Path, + viewAcls: String, ui: SparkUI) { def started = startTime != -1 def completed = endTime != -1 diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala deleted file mode 100644 index 943c061743dbd..0000000000000 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.history - -import java.net.URI - -import org.apache.hadoop.fs.Path - -import org.apache.spark.util.Utils - -/** - * Command-line parser for the master. - */ -private[spark] class HistoryServerArguments(args: Array[String]) { - var logDir = "" - - parse(args.toList) - - private def parse(args: List[String]): Unit = { - args match { - case ("--dir" | "-d") :: value :: tail => - logDir = value - parse(tail) - - case ("--help" | "-h") :: tail => - printUsageAndExit(0) - - case Nil => - - case _ => - printUsageAndExit(1) - } - validateLogDir() - } - - private def validateLogDir() { - if (logDir == "") { - System.err.println("Logging directory must be specified.") - printUsageAndExit(1) - } - val fileSystem = Utils.getHadoopFileSystem(new URI(logDir)) - val path = new Path(logDir) - if (!fileSystem.exists(path)) { - System.err.println("Logging directory specified does not exist: %s".format(logDir)) - printUsageAndExit(1) - } - if (!fileSystem.getFileStatus(path).isDir) { - System.err.println("Logging directory specified is not a directory: %s".format(logDir)) - printUsageAndExit(1) - } - } - - private def printUsageAndExit(exitCode: Int) { - System.err.println( - "Usage: HistoryServer [options]\n" + - "\n" + - "Options:\n" + - " -d DIR, --dir DIR Location of event log files") - System.exit(exitCode) - } -} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index c6dec305bffcb..f6619f34d5410 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -30,7 +30,7 @@ import akka.actor._ import akka.pattern.ask import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} import akka.serialization.SerializationExtension -import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState} @@ -668,27 +668,26 @@ private[spark] class Master( */ def rebuildSparkUI(app: ApplicationInfo): Boolean = { val appName = app.desc.name - val eventLogDir = app.desc.eventLogDir.getOrElse { return false } - val fileSystem = Utils.getHadoopFileSystem(eventLogDir) - val eventLogInfo = EventLoggingListener.parseLoggingInfo(eventLogDir, fileSystem) - val eventLogPaths = eventLogInfo.logPaths + val eventLogFile = app.desc.eventLogFile.getOrElse { return false } + val eventLogInfo = EventLoggingListener.parseLoggingInfo(new Path(eventLogFile)) + if (eventLogInfo == null) { + return false + } + val compressionCodec = eventLogInfo.compressionCodec - if (!eventLogPaths.isEmpty) { - try { - val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec) - val ui = new SparkUI( - new SparkConf, replayBus, appName + " (completed)", "/history/" + app.id) - replayBus.replay() - app.desc.appUiUrl = ui.basePath - appIdToUI(app.id) = ui - webUi.attachSparkUI(ui) - return true - } catch { - case e: Exception => - logError("Exception in replaying log for application %s (%s)".format(appName, app.id), e) - } - } else { - logWarning("Application %s (%s) has no valid logs: %s".format(appName, app.id, eventLogDir)) + try { + val fileSystem = Utils.getHadoopFileSystem(eventLogFile) + val replayBus = new ReplayListenerBus(eventLogInfo.path, fileSystem, compressionCodec) + val ui = new SparkUI( + new SparkConf, replayBus, appName + " (completed)", "/history/" + app.id) + replayBus.replay() + app.desc.appUiUrl = ui.basePath + appIdToUI(app.id) = ui + webUi.attachSparkUI(ui) + return true + } catch { + case e: Exception => + logError("Exception in replaying log for application %s (%s)".format(appName, app.id), e) } false } diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index a90b0d475c04e..0f860ce533a43 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -17,11 +17,14 @@ package org.apache.spark.scheduler +import java.io.{BufferedOutputStream, FileOutputStream, IOException, PrintWriter} +import java.net.URI + import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataOutputStream, Path} import org.apache.hadoop.fs.permission.FsPermission import org.json4s.JsonAST.JValue import org.json4s.jackson.JsonMethods._ @@ -29,7 +32,7 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.{Logging, SparkConf, SparkContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io.CompressionCodec -import org.apache.spark.util.{FileLogger, JsonProtocol} +import org.apache.spark.util.{JsonProtocol, Utils} /** * A SparkListener that logs events to persistent storage. @@ -54,38 +57,93 @@ private[spark] class EventLoggingListener( private val testing = sparkConf.getBoolean("spark.eventLog.testing", false) private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024 private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/") - private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis - val logDir = logBaseDir + "/" + name + private val fileSystem = Utils.getHadoopFileSystem(new URI(logBaseDir)) + private lazy val compressionCodec = CompressionCodec.createCodec(sparkConf) + + // Only defined if the file system scheme is not local + private var hadoopDataStream: Option[FSDataOutputStream] = None - protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize, - shouldCompress, shouldOverwrite, Some(LOG_FILE_PERMISSIONS)) + private var writer: Option[PrintWriter] = None // For testing. Keep track of all JSON serialized events that have been logged. private[scheduler] val loggedEvents = new ArrayBuffer[JValue] + val logPath = { + val sb = new StringBuilder() + .append(logBaseDir) + .append("/") + .append(appName.replaceAll("[ :/]", "-").toLowerCase()) + .append("-") + .append(System.currentTimeMillis()) + .append("-") + .append(SparkContext.SPARK_VERSION) + if (shouldCompress) { + val codec = + sparkConf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC) + sb.append("-").append(codec) + } + sb.toString() + } + /** - * Begin logging events. - * If compression is used, log a file that indicates which compression library is used. + * Creates the log file in the configured log directory. + * + * The file name contains some metadata about its contents. It follows the following + * format: + * + * {{{ + * {app name}-{timestamp}.{spark version}[.{compression codec}][.inprogress] + * }}} + * + * Where: + * - "app name" is a fs-friendly version of the application's name, in lower case + * - "timestamp" is a timestamp generated by this logger + * - "spark version" is the version of spark that generated the logs + * - "compression codec" is an optional string with the name of the compression codec + * used to write the file + * - ".inprogress" will be present while the log file is still being written to, and + * removed after the application is finished. */ def start() { - logger.start() - logInfo("Logging events to %s".format(logDir)) - if (shouldCompress) { - val codec = - sparkConf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC) - logger.newFile(COMPRESSION_CODEC_PREFIX + codec) + if (!fileSystem.isDirectory(new Path(logBaseDir))) { + throw new IllegalArgumentException(s"Log directory $logBaseDir does not exist."); } - logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION) - logger.newFile(LOG_PREFIX + logger.fileIndex) + + val workingPath = logPath + IN_PROGRESS + val uri = new URI(workingPath) + val path = new Path(workingPath) + val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme + val isDefaultLocal = defaultFs == null || defaultFs == "file" + + /* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). + * Therefore, for local files, use FileOutputStream instead. */ + val dstream = + if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { + // Second parameter is whether to append + new FileOutputStream(uri.getPath) + } else { + hadoopDataStream = Some(fileSystem.create(path)) + hadoopDataStream.get + } + + fileSystem.setPermission(path, LOG_FILE_PERMISSIONS) + val bstream = new BufferedOutputStream(dstream, outputBufferSize) + val cstream = if (shouldCompress) compressionCodec.compressedOutputStream(bstream) else bstream + writer = Some(new PrintWriter(cstream)) + + logInfo("Logging events to %s".format(logPath)) } /** Log the event as JSON. */ private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) { val eventJson = JsonProtocol.sparkEventToJson(event) - logger.logLine(compact(render(eventJson))) + + writer.foreach(_.println(compact(render(eventJson)))) if (flushLogger) { - logger.flush() + writer.foreach(_.flush()) + hadoopDataStream.foreach(_.sync()) } + if (testing) { loggedEvents += eventJson } @@ -126,50 +184,28 @@ private[spark] class EventLoggingListener( * In addition, create an empty special file to indicate application completion. */ def stop() = { - logger.newFile(APPLICATION_COMPLETE) - logger.stop() + writer.foreach(_.close()) + + val target = new Path(logPath) + if (fileSystem.exists(target)) { + throw new IOException("Target log file already exists (%s)".format(logPath)) + } + fileSystem.rename(new Path(logPath + IN_PROGRESS), target) } + } private[spark] object EventLoggingListener extends Logging { val DEFAULT_LOG_DIR = "/tmp/spark-events" - val LOG_PREFIX = "EVENT_LOG_" - val SPARK_VERSION_PREFIX = "SPARK_VERSION_" - val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_" - val APPLICATION_COMPLETE = "APPLICATION_COMPLETE" + val IN_PROGRESS = ".inprogress" val LOG_FILE_PERMISSIONS = FsPermission.createImmutable(Integer.parseInt("770", 8).toShort) + // Regex for parsing log file names. See description of log file name format in start(). + val LOG_FILE_NAME_REGEX = s".+-[0-9]+-([0-9](?:\\.[0-9])*)(?:-(.+?))?(\\$IN_PROGRESS)?".r + // A cache for compression codecs to avoid creating the same codec many times private val codecMap = new mutable.HashMap[String, CompressionCodec] - def isEventLogFile(fileName: String): Boolean = { - fileName.startsWith(LOG_PREFIX) - } - - def isSparkVersionFile(fileName: String): Boolean = { - fileName.startsWith(SPARK_VERSION_PREFIX) - } - - def isCompressionCodecFile(fileName: String): Boolean = { - fileName.startsWith(COMPRESSION_CODEC_PREFIX) - } - - def isApplicationCompleteFile(fileName: String): Boolean = { - fileName == APPLICATION_COMPLETE - } - - def parseSparkVersion(fileName: String): String = { - if (isSparkVersionFile(fileName)) { - fileName.replaceAll(SPARK_VERSION_PREFIX, "") - } else "" - } - - def parseCompressionCodec(fileName: String): String = { - if (isCompressionCodecFile(fileName)) { - fileName.replaceAll(COMPRESSION_CODEC_PREFIX, "") - } else "" - } - /** * Parse the event logging information associated with the logs in the given directory. * @@ -177,47 +213,24 @@ private[spark] object EventLoggingListener extends Logging { * codec file (if event logs are compressed), and the application completion file (if the * application has run to completion). */ - def parseLoggingInfo(logDir: Path, fileSystem: FileSystem): EventLoggingInfo = { + def parseLoggingInfo(log: Path): EventLoggingInfo = { try { - val fileStatuses = fileSystem.listStatus(logDir) - val filePaths = - if (fileStatuses != null) { - fileStatuses.filter(!_.isDir).map(_.getPath).toSeq + val LOG_FILE_NAME_REGEX(version, codecName, inprogress) = log.getName() + val codec: Option[CompressionCodec] = if (codecName != null) { + val conf = new SparkConf() + conf.set("spark.io.compression.codec", codecName) + Some(CompressionCodec.createCodec(conf)) } else { - Seq[Path]() + None } - if (filePaths.isEmpty) { - logWarning("No files found in logging directory %s".format(logDir)) - } - EventLoggingInfo( - logPaths = filePaths.filter { path => isEventLogFile(path.getName) }, - sparkVersion = filePaths - .find { path => isSparkVersionFile(path.getName) } - .map { path => parseSparkVersion(path.getName) } - .getOrElse(""), - compressionCodec = filePaths - .find { path => isCompressionCodecFile(path.getName) } - .map { path => - val codec = EventLoggingListener.parseCompressionCodec(path.getName) - val conf = new SparkConf - conf.set("spark.io.compression.codec", codec) - codecMap.getOrElseUpdate(codec, CompressionCodec.createCodec(conf)) - }, - applicationComplete = filePaths.exists { path => isApplicationCompleteFile(path.getName) } - ) + EventLoggingInfo(log, version, codec, inprogress == null) } catch { case e: Exception => - logError("Exception in parsing logging info from directory %s".format(logDir), e) - EventLoggingInfo.empty + logError("Exception in parsing logging info from file %s".format(log), e) + null } } - /** - * Parse the event logging information associated with the logs in the given directory. - */ - def parseLoggingInfo(logDir: String, fileSystem: FileSystem): EventLoggingInfo = { - parseLoggingInfo(new Path(logDir), fileSystem) - } } @@ -225,11 +238,7 @@ private[spark] object EventLoggingListener extends Logging { * Information needed to process the event logs associated with an application. */ private[spark] case class EventLoggingInfo( - logPaths: Seq[Path], + path: Path, sparkVersion: String, compressionCodec: Option[CompressionCodec], applicationComplete: Boolean = false) - -private[spark] object EventLoggingInfo { - def empty = EventLoggingInfo(Seq[Path](), "", None, applicationComplete = false) -} diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index f89724d4ea196..2f6c4f408e65c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -35,51 +35,47 @@ import org.apache.spark.util.JsonProtocol * exactly one SparkListenerEvent. */ private[spark] class ReplayListenerBus( - logPaths: Seq[Path], + logPath: Path, fileSystem: FileSystem, compressionCodec: Option[CompressionCodec]) extends SparkListenerBus with Logging { private var replayed = false - if (logPaths.length == 0) { - logWarning("Log path provided contains no log files.") - } - /** * Replay each event in the order maintained in the given logs. * This should only be called exactly once. */ def replay() { assert(!replayed, "ReplayListenerBus cannot replay events more than once") - logPaths.foreach { path => - // Keep track of input streams at all levels to close them later - // This is necessary because an exception can occur in between stream initializations - var fileStream: Option[InputStream] = None - var bufferedStream: Option[InputStream] = None - var compressStream: Option[InputStream] = None - var currentLine = "" - try { - fileStream = Some(fileSystem.open(path)) - bufferedStream = Some(new BufferedInputStream(fileStream.get)) - compressStream = Some(wrapForCompression(bufferedStream.get)) - // Parse each line as an event and post the event to all attached listeners - val lines = Source.fromInputStream(compressStream.get).getLines() - lines.foreach { line => - currentLine = line - postToAll(JsonProtocol.sparkEventFromJson(parse(line))) - } - } catch { - case e: Exception => - logError("Exception in parsing Spark event log %s".format(path), e) - logError("Malformed line: %s\n".format(currentLine)) - } finally { - fileStream.foreach(_.close()) - bufferedStream.foreach(_.close()) - compressStream.foreach(_.close()) + // Keep track of input streams at all levels to close them later + // This is necessary because an exception can occur in between stream initializations + var fileStream: Option[InputStream] = None + var bufferedStream: Option[InputStream] = None + var compressStream: Option[InputStream] = None + var currentLine = "" + try { + fileStream = Some(fileSystem.open(logPath)) + bufferedStream = Some(new BufferedInputStream(fileStream.get)) + compressStream = Some(wrapForCompression(bufferedStream.get)) + + // Parse each line as an event and post the event to all attached listeners + val lines = Source.fromInputStream(compressStream.get).getLines() + lines.foreach { line => + currentLine = line + postToAll(JsonProtocol.sparkEventFromJson(parse(line))) } + } catch { + case e: Exception => + logError("Exception in parsing Spark event log %s".format(logPath), e) + logError("Malformed line: %s\n".format(currentLine)) + } finally { + fileStream.foreach(_.close()) + bufferedStream.foreach(_.close()) + compressStream.foreach(_.close()) } + replayed = true } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 9c07b3f7b695a..27d01bfe3c7da 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -59,7 +59,7 @@ private[spark] class SparkDeploySchedulerBackend( classPathEntries, libraryPathEntries, extraJavaOpts) val sparkHome = sc.getSparkHome() val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, - sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir)) + sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logPath)) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index b08f308fda1dd..856273e1d4e21 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -51,6 +51,7 @@ private[spark] abstract class WebUI( def getTabs: Seq[WebUITab] = tabs.toSeq def getHandlers: Seq[ServletContextHandler] = handlers.toSeq + def getSecurityManager: SecurityManager = securityManager /** Attach a tab to this UI, along with all of its attached pages. */ def attachTab(tab: WebUITab) { diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala deleted file mode 100644 index 0e6d21b22023a..0000000000000 --- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import java.io.{BufferedOutputStream, FileOutputStream, IOException, PrintWriter} -import java.net.URI -import java.text.SimpleDateFormat -import java.util.Date - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} -import org.apache.hadoop.fs.permission.FsPermission - -import org.apache.spark.{Logging, SparkConf} -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.io.CompressionCodec - -/** - * A generic class for logging information to file. - * - * @param logDir Path to the directory in which files are logged - * @param outputBufferSize The buffer size to use when writing to an output stream in bytes - * @param compress Whether to compress output - * @param overwrite Whether to overwrite existing files - */ -private[spark] class FileLogger( - logDir: String, - sparkConf: SparkConf, - hadoopConf: Configuration = SparkHadoopUtil.get.newConfiguration(), - outputBufferSize: Int = 8 * 1024, // 8 KB - compress: Boolean = false, - overwrite: Boolean = true, - dirPermissions: Option[FsPermission] = None) - extends Logging { - - private val dateFormat = new ThreadLocal[SimpleDateFormat]() { - override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") - } - - private val fileSystem = Utils.getHadoopFileSystem(new URI(logDir)) - var fileIndex = 0 - - // Only used if compression is enabled - private lazy val compressionCodec = CompressionCodec.createCodec(sparkConf) - - // Only defined if the file system scheme is not local - private var hadoopDataStream: Option[FSDataOutputStream] = None - - private var writer: Option[PrintWriter] = None - - /** - * Start this logger by creating the logging directory. - */ - def start() { - createLogDir() - } - - /** - * Create a logging directory with the given path. - */ - private def createLogDir() { - val path = new Path(logDir) - if (fileSystem.exists(path)) { - if (overwrite) { - logWarning("Log directory %s already exists. Overwriting...".format(logDir)) - // Second parameter is whether to delete recursively - fileSystem.delete(path, true) - } else { - throw new IOException("Log directory %s already exists!".format(logDir)) - } - } - if (!fileSystem.mkdirs(path)) { - throw new IOException("Error in creating log directory: %s".format(logDir)) - } - if (dirPermissions.isDefined) { - val fsStatus = fileSystem.getFileStatus(path) - if (fsStatus.getPermission.toShort != dirPermissions.get.toShort) { - fileSystem.setPermission(path, dirPermissions.get) - } - } - } - - /** - * Create a new writer for the file identified by the given path. - * If the permissions are not passed in, it will default to use the permissions - * (dirPermissions) used when class was instantiated. - */ - private def createWriter(fileName: String, perms: Option[FsPermission] = None): PrintWriter = { - val logPath = logDir + "/" + fileName - val uri = new URI(logPath) - val path = new Path(logPath) - val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme - val isDefaultLocal = defaultFs == null || defaultFs == "file" - - /* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). - * Therefore, for local files, use FileOutputStream instead. */ - val dstream = - if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") { - // Second parameter is whether to append - new FileOutputStream(uri.getPath, !overwrite) - } else { - hadoopDataStream = Some(fileSystem.create(path, overwrite)) - hadoopDataStream.get - } - - perms.orElse(dirPermissions).foreach { p => fileSystem.setPermission(path, p) } - val bstream = new BufferedOutputStream(dstream, outputBufferSize) - val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream - new PrintWriter(cstream) - } - - /** - * Log the message to the given writer. - * @param msg The message to be logged - * @param withTime Whether to prepend message with a timestamp - */ - def log(msg: String, withTime: Boolean = false) { - val writeInfo = if (!withTime) { - msg - } else { - val date = new Date(System.currentTimeMillis) - dateFormat.get.format(date) + ": " + msg - } - writer.foreach(_.print(writeInfo)) - } - - /** - * Log the message to the given writer as a new line. - * @param msg The message to be logged - * @param withTime Whether to prepend message with a timestamp - */ - def logLine(msg: String, withTime: Boolean = false) = log(msg + "\n", withTime) - - /** - * Flush the writer to disk manually. - * - * If the Hadoop FileSystem is used, the underlying FSDataOutputStream (r1.0.4) must be - * sync()'ed manually as it does not support flush(), which is invoked by when higher - * level streams are flushed. - */ - def flush() { - writer.foreach(_.flush()) - hadoopDataStream.foreach(_.sync()) - } - - /** - * Close the writer. Any subsequent calls to log or flush will have no effect. - */ - def close() { - writer.foreach(_.close()) - writer = None - } - - /** - * Start a writer for a new file, closing the existing one if it exists. - * @param fileName Name of the new file, defaulting to the file index if not provided. - * @param perms Permissions to put on the new file. - */ - def newFile(fileName: String = "", perms: Option[FsPermission] = None) { - fileIndex += 1 - writer.foreach(_.close()) - val name = fileName match { - case "" => fileIndex.toString - case _ => fileName - } - writer = Some(createWriter(name, perms)) - } - - /** - * Close all open writers, streams, and file systems. Any subsequent uses of this FileLogger - * instance will throw exceptions. - */ - def stop() { - hadoopDataStream.foreach(_.close()) - writer.foreach(_.close()) - fileSystem.close() - } -} diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala new file mode 100644 index 0000000000000..ca47671a62452 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.{File, FileOutputStream, OutputStreamWriter} + +import com.google.common.io.Files +import org.apache.hadoop.fs.Path +import org.json4s.jackson.JsonMethods._ +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.matchers.ShouldMatchers + +import org.apache.spark.SparkConf +import org.apache.spark.io._ +import org.apache.spark.scheduler._ +import org.apache.spark.util.{JsonProtocol, Utils} + +class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with ShouldMatchers { + + private var testDir: File = null + + private var provider: FsHistoryProvider = null + + before { + testDir = Files.createTempDir() + provider = new FsHistoryProvider(new SparkConf() + .set("spark.history.fs.logDirectory", testDir.getAbsolutePath()) + .set("spark.history.fs.updateInterval", "0")) + } + + after { + Utils.getHadoopFileSystem("/").delete(new Path(testDir.getAbsolutePath()), true) + } + + test("Parse new and old application logs") { + val conf = new SparkConf() + .set("spark.history.fs.logDirectory", testDir.getAbsolutePath()) + .set("spark.history.fs.updateInterval", "0") + val provider = new FsHistoryProvider(conf) + + // Write a new-style application log. + val logFile1 = new File(testDir, "app1-1-1.0") + writeFile(logFile1, + SparkListenerApplicationStart("app1", 1L, "test"), + SparkListenerApplicationEnd(2L) + ) + + // Write an unfinished app, new-style. + writeFile(new File(testDir, "app2-1-1.0.inprogress"), + SparkListenerApplicationStart("app2", 1L, "test") + ) + + // Wait so that the files' modification times will be different, ensuring the ordering + // the test expects. + Thread.sleep(1000) + + // Write an old-style application log. + val oldLog = new File(testDir, "app3-1234") + oldLog.mkdir() + writeFile(new File(oldLog, provider.SPARK_VERSION_PREFIX + "1.0")) + writeFile(new File(oldLog, provider.LOG_PREFIX + "1"), + SparkListenerApplicationStart("app3", 2L, "test"), + SparkListenerApplicationEnd(3L) + ) + writeFile(new File(oldLog, provider.APPLICATION_COMPLETE)) + + // Write an unfinished app, old-style. + val oldLog2 = new File(testDir, "app4-1234") + oldLog2.mkdir() + writeFile(new File(oldLog2, provider.SPARK_VERSION_PREFIX + "1.0")) + writeFile(new File(oldLog2, provider.LOG_PREFIX + "1"), + SparkListenerApplicationStart("app2", 2L, "test") + ) + + // Force a reload of data from the log directory, and check that both logs are loaded. + // Take the opportunity to check that the offset checks work as expected. + provider.checkForLogs() + + val (list, offset, count) = provider.getListing(4, 100) + list should not be (null) + offset should be (0) + count should be (2) + + list(0) should be (ApplicationHistoryInfo(oldLog.getName(), "app3", 2L, 3L, + oldLog.lastModified(), "test", null, null)) + list(1) should be (ApplicationHistoryInfo(logFile1.getName(), "app1", 1L, 2L, + logFile1.lastModified(), "test", null, null)) + + // Make sure the UI can be rendered. + list.foreach { case info => + val fullInfo = provider.getAppInfo(info.id) + fullInfo should not be null + fullInfo.ui should not be null + } + } + + test("Parse logs with compression codec set") { + val testCodecs = List((classOf[LZFCompressionCodec].getName(), true), + (classOf[SnappyCompressionCodec].getName(), true), + ("invalid.codec", false)) + + testCodecs.foreach { case (codec, valid) => + val logDir = new File(testDir, "test") + logDir.mkdir() + writeFile(new File(logDir, provider.SPARK_VERSION_PREFIX + "1.0")) + writeFile(new File(logDir, provider.LOG_PREFIX + "1"), + SparkListenerApplicationStart("app2", 2L, "test"), + SparkListenerApplicationEnd(3L) + ) + writeFile(new File(logDir, provider.COMPRESSION_CODEC_PREFIX + codec)) + + val logPath = new Path(logDir.getAbsolutePath()) + val info = provider.loadOldLoggingInfo(logPath) + if (valid) { + info.path.toUri().getPath() should be (logPath.toUri().getPath()) + info.sparkVersion should be ("1.0") + info.compressionCodec should not be (None) + info.compressionCodec.get.getClass().getName() should be (codec) + info.applicationComplete should be (false) + } else { + info should be (null) + } + } + } + + private def writeFile(file: File, events: SparkListenerEvent*) = { + val out = new OutputStreamWriter(new FileOutputStream(file), "UTF-8") + try { + events.foreach(e => out.write(compact(render(JsonProtocol.sparkEventToJson(e))) + "\n")) + } finally { + out.close() + } + } + +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 21e3db34b8b7a..1cce75cb779ac 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -25,8 +25,8 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.json4s.jackson.JsonMethods._ import org.scalatest.{BeforeAndAfter, FunSuite} -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.io.CompressionCodec +import org.apache.spark.{Logging, SparkConf, SparkContext} +import org.apache.spark.io._ import org.apache.spark.util.{JsonProtocol, Utils} import java.io.File @@ -34,40 +34,40 @@ import java.io.File /** * Test whether EventLoggingListener logs events properly. * - * This tests whether EventLoggingListener actually creates special files while logging events, - * whether the parsing of these special files is correct, and whether the logged events can be - * read and deserialized into actual SparkListenerEvents. + * This tests whether EventLoggingListener actually log files with expected name patterns while + * logging events, whether the parsing of the file names is correct, and whether the logged events + * can be read and deserialized into actual SparkListenerEvents. */ -class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { +class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Logging { private val fileSystem = Utils.getHadoopFileSystem("/") private val allCompressionCodecs = Seq[String]( "org.apache.spark.io.LZFCompressionCodec", "org.apache.spark.io.SnappyCompressionCodec" ) private var testDir: File = _ - private var logDirPath: Path = _ + private var testDirPath: Path = _ before { testDir = Files.createTempDir() testDir.deleteOnExit() - logDirPath = Utils.getFilePath(testDir, "spark-events") + testDirPath = new Path(testDir.getAbsolutePath()) } after { Utils.deleteRecursively(testDir) } - test("Parse names of special files") { + test("Parse names of log files") { testParsingFileName() } - test("Verify special files exist") { - testSpecialFilesExist() + test("Verify log file exist") { + testLogFileExists() } - test("Verify special files exist with compression") { + test("Verify log file names contain compression codec info") { allCompressionCodecs.foreach { codec => - testSpecialFilesExist(compressionCodec = Some(codec)) + testLogFileExists(compressionCodec = Some(codec)) } } @@ -109,79 +109,59 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { import EventLoggingListenerSuite._ /** - * Test whether names of special files are correctly identified and parsed. + * Test whether names of log files are correctly identified and parsed. */ private def testParsingFileName() { - val logPrefix = EventLoggingListener.LOG_PREFIX - val sparkVersionPrefix = EventLoggingListener.SPARK_VERSION_PREFIX - val compressionCodecPrefix = EventLoggingListener.COMPRESSION_CODEC_PREFIX - val applicationComplete = EventLoggingListener.APPLICATION_COMPLETE - assert(EventLoggingListener.isEventLogFile(logPrefix + "0")) - assert(EventLoggingListener.isEventLogFile(logPrefix + "100")) - assert(EventLoggingListener.isEventLogFile(logPrefix + "ANYTHING")) - assert(EventLoggingListener.isSparkVersionFile(sparkVersionPrefix + "0.9.1")) - assert(EventLoggingListener.isSparkVersionFile(sparkVersionPrefix + "1.0.0")) - assert(EventLoggingListener.isSparkVersionFile(sparkVersionPrefix + "ANYTHING")) - assert(EventLoggingListener.isApplicationCompleteFile(applicationComplete)) - allCompressionCodecs.foreach { codec => - assert(EventLoggingListener.isCompressionCodecFile(compressionCodecPrefix + codec)) - } - - // Negatives - assert(!EventLoggingListener.isEventLogFile("The greatest man of all mankind")) - assert(!EventLoggingListener.isSparkVersionFile("Will never falter in the face of death!")) - assert(!EventLoggingListener.isCompressionCodecFile("Unless he chooses to leave behind")) - assert(!EventLoggingListener.isApplicationCompleteFile("The very treasure he calls Macbeth")) - - // Verify that parsing is correct - assert(EventLoggingListener.parseSparkVersion(sparkVersionPrefix + "1.0.0") === "1.0.0") - allCompressionCodecs.foreach { codec => - assert(EventLoggingListener.parseCompressionCodec(compressionCodecPrefix + codec) === codec) - } + var tests = List( + // Log file name, Spark version, Compression codec, in progress + ("app1-1234-1.0.inprogress", "1.0", None, true), + ("app2-1234-0.9.1", "0.9.1", None, false), + ("app3-1234-0.9-org.apache.spark.io.LZFCompressionCodec", "0.9", + Some(classOf[LZFCompressionCodec]), false), + ("app-123456-1234-0.8-org.apache.spark.io.SnappyCompressionCodec.inprogress", "0.8", + Some(classOf[SnappyCompressionCodec]), true) + ) + + tests.foreach({ case (fname, version, codec, inProgress) => + logWarning(s"Testing: $fname") + val info = EventLoggingListener.parseLoggingInfo(new Path(fname)) + assert(info != null) + assert(version === info.sparkVersion) + assert(!inProgress === info.applicationComplete) + + val actualCodec = if (!info.compressionCodec.isEmpty) { + info.compressionCodec.get.getClass() + } else null + assert(codec.getOrElse(null) === actualCodec) + }) + + var invalid = List("app1", "app1-1.0", "app1-1234", "app1-abc-1.0", + "app1-1234-1.0-org.invalid.compression.Codec", + "app1-1234-1.0.not_in_progress") + + invalid.foreach(fname => assert(EventLoggingListener.parseLoggingInfo(new Path(fname)) == null)) } /** - * Test whether the special files produced by EventLoggingListener exist. - * - * There should be exactly one event log and one spark version file throughout the entire - * execution. If a compression codec is specified, then the compression codec file should - * also exist. Only after the application has completed does the test expect the application - * completed file to be present. + * Test whether the log file produced by EventLoggingListener exists and matches the expected + * name pattern. */ - private def testSpecialFilesExist(compressionCodec: Option[String] = None) { - - def assertFilesExist(logFiles: Array[FileStatus], loggerStopped: Boolean) { - val numCompressionCodecFiles = if (compressionCodec.isDefined) 1 else 0 - val numApplicationCompleteFiles = if (loggerStopped) 1 else 0 - assert(logFiles.size === 2 + numCompressionCodecFiles + numApplicationCompleteFiles) - assert(eventLogsExist(logFiles)) - assert(sparkVersionExists(logFiles)) - assert(compressionCodecExists(logFiles) === compressionCodec.isDefined) - assert(applicationCompleteExists(logFiles) === loggerStopped) - assertSparkVersionIsValid(logFiles) - compressionCodec.foreach { codec => - assertCompressionCodecIsValid(logFiles, codec) - } - } - + private def testLogFileExists(compressionCodec: Option[String] = None) { // Verify logging directory exists - val conf = getLoggingConf(logDirPath, compressionCodec) + val conf = getLoggingConf(testDirPath, compressionCodec) val eventLogger = new EventLoggingListener("test", conf) eventLogger.start() - val logPath = new Path(eventLogger.logDir) - assert(fileSystem.exists(logPath)) - val logDir = fileSystem.getFileStatus(logPath) - assert(logDir.isDir) - // Verify special files are as expected before stop() - var logFiles = fileSystem.listStatus(logPath) - assert(logFiles != null) - assertFilesExist(logFiles, loggerStopped = false) + val logPath = new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS) + assert(fileSystem.exists(logPath)) + val logStatus = fileSystem.getFileStatus(logPath) + assert(logStatus.isFile) + assert(EventLoggingListener.LOG_FILE_NAME_REGEX.pattern.matcher( + logPath.getName()).matches()) - // Verify special files are as expected after stop() + // Verify log is renamed after stop() eventLogger.stop() - logFiles = fileSystem.listStatus(logPath) - assertFilesExist(logFiles, loggerStopped = true) + assert(fileSystem.getFileStatus(new Path(eventLogger.logPath)).isFile()) } /** @@ -193,7 +173,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { private def testParsingLogInfo(compressionCodec: Option[String] = None) { def assertInfoCorrect(info: EventLoggingInfo, loggerStopped: Boolean) { - assert(info.logPaths.size > 0) + assert(info != null) assert(info.sparkVersion === SparkContext.SPARK_VERSION) assert(info.compressionCodec.isDefined === compressionCodec.isDefined) info.compressionCodec.foreach { codec => @@ -205,15 +185,16 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { } // Verify that all information is correctly parsed before stop() - val conf = getLoggingConf(logDirPath, compressionCodec) + val conf = getLoggingConf(testDirPath, compressionCodec) val eventLogger = new EventLoggingListener("test", conf) eventLogger.start() - var eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) + var eventLoggingInfo = EventLoggingListener.parseLoggingInfo( + new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS)) assertInfoCorrect(eventLoggingInfo, loggerStopped = false) // Verify that all information is correctly parsed after stop() eventLogger.stop() - eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) + eventLoggingInfo = EventLoggingListener.parseLoggingInfo(new Path(eventLogger.logPath)) assertInfoCorrect(eventLoggingInfo, loggerStopped = true) } @@ -224,7 +205,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { * exactly these two events are logged in the expected file. */ private def testEventLogging(compressionCodec: Option[String] = None) { - val conf = getLoggingConf(logDirPath, compressionCodec) + val conf = getLoggingConf(testDirPath, compressionCodec) val eventLogger = new EventLoggingListener("test", conf) val listenerBus = new LiveListenerBus val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", 125L, "Mickey") @@ -236,17 +217,17 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { listenerBus.addListener(eventLogger) listenerBus.postToAll(applicationStart) listenerBus.postToAll(applicationEnd) + eventLogger.stop() // Verify file contains exactly the two events logged - val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) - assert(eventLoggingInfo.logPaths.size > 0) - val lines = readFileLines(eventLoggingInfo.logPaths.head, eventLoggingInfo.compressionCodec) + val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(new Path(eventLogger.logPath)) + assert(eventLoggingInfo != null) + val lines = readFileLines(eventLoggingInfo.path, eventLoggingInfo.compressionCodec) assert(lines.size === 2) assert(lines(0).contains("SparkListenerApplicationStart")) assert(lines(1).contains("SparkListenerApplicationEnd")) assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === applicationStart) assert(JsonProtocol.sparkEventFromJson(parse(lines(1))) === applicationEnd) - eventLogger.stop() } /** @@ -254,12 +235,12 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { * This runs a simple Spark job and asserts that the expected events are logged when expected. */ private def testApplicationEventLogging(compressionCodec: Option[String] = None) { - val conf = getLoggingConf(logDirPath, compressionCodec) + val conf = getLoggingConf(testDirPath, compressionCodec) val sc = new SparkContext("local", "test", conf) assert(sc.eventLogger.isDefined) val eventLogger = sc.eventLogger.get - val expectedLogDir = logDirPath.toString - assert(eventLogger.logDir.startsWith(expectedLogDir)) + val expectedLogDir = testDir.getAbsolutePath() + assert(eventLogger.logPath.startsWith(expectedLogDir + "/")) // Begin listening for events that trigger asserts val eventExistenceListener = new EventExistenceListener(eventLogger) @@ -275,11 +256,15 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { /** * Assert that all of the specified events are logged by the given EventLoggingListener. + * + * This is done while the application is still running, so the log file contains the + * IN_PROGRESS suffix. */ private def assertEventsExist(eventLogger: EventLoggingListener, events: Seq[String]) { - val eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) - assert(eventLoggingInfo.logPaths.size > 0) - val lines = readFileLines(eventLoggingInfo.logPaths.head, eventLoggingInfo.compressionCodec) + val eventLoggingInfo = EventLoggingListener.parseLoggingInfo( + new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS)) + assert(eventLoggingInfo != null) + val lines = readFileLines(eventLoggingInfo.path, eventLoggingInfo.compressionCodec) val eventSet = mutable.Set(events: _*) lines.foreach { line => eventSet.foreach { event => @@ -354,39 +339,6 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { } } - - /* -------------------------------------------------------- * - * Helper methods for validating state of the special files * - * -------------------------------------------------------- */ - - private def eventLogsExist(logFiles: Array[FileStatus]): Boolean = { - logFiles.map(_.getPath.getName).exists(EventLoggingListener.isEventLogFile) - } - - private def sparkVersionExists(logFiles: Array[FileStatus]): Boolean = { - logFiles.map(_.getPath.getName).exists(EventLoggingListener.isSparkVersionFile) - } - - private def compressionCodecExists(logFiles: Array[FileStatus]): Boolean = { - logFiles.map(_.getPath.getName).exists(EventLoggingListener.isCompressionCodecFile) - } - - private def applicationCompleteExists(logFiles: Array[FileStatus]): Boolean = { - logFiles.map(_.getPath.getName).exists(EventLoggingListener.isApplicationCompleteFile) - } - - private def assertSparkVersionIsValid(logFiles: Array[FileStatus]) { - val file = logFiles.map(_.getPath.getName).find(EventLoggingListener.isSparkVersionFile) - assert(file.isDefined) - assert(EventLoggingListener.parseSparkVersion(file.get) === SparkContext.SPARK_VERSION) - } - - private def assertCompressionCodecIsValid(logFiles: Array[FileStatus], compressionCodec: String) { - val file = logFiles.map(_.getPath.getName).find(EventLoggingListener.isCompressionCodecFile) - assert(file.isDefined) - assert(EventLoggingListener.parseCompressionCodec(file.get) === compressionCodec) - } - } diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index d81499ac6abef..eeaa05b0022ab 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.scheduler import java.io.{File, PrintWriter} import com.google.common.io.Files +import org.apache.hadoop.fs.Path import org.json4s.jackson.JsonMethods._ import org.scalatest.{BeforeAndAfter, FunSuite} @@ -89,7 +90,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart)))) writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd)))) writer.close() - val replayer = new ReplayListenerBus(Seq(logFilePath), fileSystem, codec) + val replayer = new ReplayListenerBus(logFilePath, fileSystem, codec) val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath, codecName) val eventMonster = new EventMonster(conf) replayer.addListener(eventMonster) @@ -108,6 +109,8 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { */ private def testApplicationReplay(codecName: Option[String] = None) { val logDirPath = Utils.getFilePath(testDir, "test-replay") + fileSystem.mkdirs(logDirPath) + val conf = EventLoggingListenerSuite.getLoggingConf(logDirPath, codecName) val sc = new SparkContext("local-cluster[2,1,512]", "Test replay", conf) @@ -122,16 +125,11 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { val codec = codecName.map(getCompressionCodec) val applications = fileSystem.listStatus(logDirPath) assert(applications != null && applications.size > 0) - val eventLogDir = applications.sortBy(_.getAccessTime).last - assert(eventLogDir.isDir) - val logFiles = fileSystem.listStatus(eventLogDir.getPath) - assert(logFiles != null && logFiles.size > 0) - val logFile = logFiles.find(_.getPath.getName.startsWith("EVENT_LOG_")) - assert(logFile.isDefined) - val logFilePath = logFile.get.getPath + val eventLog = applications.sortBy(_.getAccessTime).last + assert(eventLog.isFile) // Replay events - val replayer = new ReplayListenerBus(Seq(logFilePath), fileSystem, codec) + val replayer = new ReplayListenerBus(eventLog.getPath(), fileSystem, codec) val eventMonster = new EventMonster(conf) replayer.addListener(eventMonster) replayer.replay() @@ -156,7 +154,9 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { * log the events. */ private class EventMonster(conf: SparkConf) extends EventLoggingListener("test", conf) { - logger.close() + + override def start() { } + } private def getCompressionCodec(codecName: String) = { diff --git a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala deleted file mode 100644 index 44332fc8dbc23..0000000000000 --- a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import java.io.{File, IOException} - -import scala.io.Source - -import com.google.common.io.Files -import org.apache.hadoop.fs.Path -import org.scalatest.{BeforeAndAfter, FunSuite} - -import org.apache.spark.SparkConf -import org.apache.spark.io.CompressionCodec - -/** - * Test writing files through the FileLogger. - */ -class FileLoggerSuite extends FunSuite with BeforeAndAfter { - private val fileSystem = Utils.getHadoopFileSystem("/") - private val allCompressionCodecs = Seq[String]( - "org.apache.spark.io.LZFCompressionCodec", - "org.apache.spark.io.SnappyCompressionCodec" - ) - private var testDir: File = _ - private var logDirPath: Path = _ - private var logDirPathString: String = _ - - before { - testDir = Files.createTempDir() - logDirPath = Utils.getFilePath(testDir, "test-file-logger") - logDirPathString = logDirPath.toString - } - - after { - Utils.deleteRecursively(testDir) - } - - test("Simple logging") { - testSingleFile() - } - - test ("Simple logging with compression") { - allCompressionCodecs.foreach { codec => - testSingleFile(Some(codec)) - } - } - - test("Logging multiple files") { - testMultipleFiles() - } - - test("Logging multiple files with compression") { - allCompressionCodecs.foreach { codec => - testMultipleFiles(Some(codec)) - } - } - - test("Logging when directory already exists") { - // Create the logging directory multiple times - new FileLogger(logDirPathString, new SparkConf, overwrite = true).start() - new FileLogger(logDirPathString, new SparkConf, overwrite = true).start() - new FileLogger(logDirPathString, new SparkConf, overwrite = true).start() - - // If overwrite is not enabled, an exception should be thrown - intercept[IOException] { - new FileLogger(logDirPathString, new SparkConf, overwrite = false).start() - } - } - - - /* ----------------- * - * Actual test logic * - * ----------------- */ - - /** - * Test logging to a single file. - */ - private def testSingleFile(codecName: Option[String] = None) { - val conf = getLoggingConf(codecName) - val codec = codecName.map { c => CompressionCodec.createCodec(conf) } - val logger = - if (codecName.isDefined) { - new FileLogger(logDirPathString, conf, compress = true) - } else { - new FileLogger(logDirPathString, conf) - } - logger.start() - assert(fileSystem.exists(logDirPath)) - assert(fileSystem.getFileStatus(logDirPath).isDir) - assert(fileSystem.listStatus(logDirPath).size === 0) - - logger.newFile() - val files = fileSystem.listStatus(logDirPath) - assert(files.size === 1) - val firstFile = files.head - val firstFilePath = firstFile.getPath - - logger.log("hello") - logger.flush() - assert(readFileContent(firstFilePath, codec) === "hello") - - logger.log(" world") - logger.close() - assert(readFileContent(firstFilePath, codec) === "hello world") - } - - /** - * Test logging to multiple files. - */ - private def testMultipleFiles(codecName: Option[String] = None) { - val conf = getLoggingConf(codecName) - val codec = codecName.map { c => CompressionCodec.createCodec(conf) } - val logger = - if (codecName.isDefined) { - new FileLogger(logDirPathString, conf, compress = true) - } else { - new FileLogger(logDirPathString, conf) - } - logger.start() - logger.newFile("Jean_Valjean") - logger.logLine("Who am I?") - logger.logLine("Destiny?") - logger.newFile("John_Valjohn") - logger.logLine("One") - logger.logLine("Two three...") - logger.newFile("Wolverine") - logger.logLine("There was a time") - logger.logLine("A time when our enemies knew honor.") - logger.close() - assert(readFileContent(new Path(logDirPath, "Jean_Valjean"), codec) === "Who am I?\nDestiny?") - assert(readFileContent(new Path(logDirPath, "John_Valjohn"), codec) === "One\nTwo three...") - assert(readFileContent(new Path(logDirPath, "Wolverine"), codec) === - "There was a time\nA time when our enemies knew honor.") - } - - /** - * Read the content of the file specified by the given path. - * If a compression codec is specified, use it to read the file. - */ - private def readFileContent(logPath: Path, codec: Option[CompressionCodec] = None): String = { - val fstream = fileSystem.open(logPath) - val cstream = codec.map(_.compressedInputStream(fstream)).getOrElse(fstream) - Source.fromInputStream(cstream).getLines().mkString("\n") - } - - private def getLoggingConf(codecName: Option[String]) = { - val conf = new SparkConf - codecName.foreach { c => conf.set("spark.io.compression.codec", c) } - conf - } - -}