Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ private[spark] class ApplicationDescription(
val command: Command,
val sparkHome: Option[String],
var appUiUrl: String,
val eventLogDir: Option[String] = None)
val eventLogFile: Option[String] = None)
extends Serializable {

val user = System.getProperty("user.name", "<unknown>")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.history

import java.io.FileNotFoundException
import java.util.concurrent.atomic.AtomicReference

import scala.collection.mutable

import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.Utils

private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
with Logging {

// Interval between each check for event log updates
private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval", 10) * 1000

private val logDir = conf.get("spark.history.fs.logDirectory")
private val fs = Utils.getHadoopFileSystem(logDir)

// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheckTimeMs = -1L

// List of applications, in order from newest to oldest.
private val appList = new AtomicReference[Seq[ApplicationHistoryInfo]](Nil)

// Constants used to parse Spark 1.0.0 log directories.
private[history] val LOG_PREFIX = "EVENT_LOG_"
private[history] val SPARK_VERSION_PREFIX = "SPARK_VERSION_"
private[history] val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_"
private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"

/**
* A background thread that periodically checks for event log updates on disk.
*
* If a log check is invoked manually in the middle of a period, this thread re-adjusts the
* time at which it performs the next log check to maintain the same period as before.
*
* TODO: Add a mechanism to update manually.
*/
private val logCheckingThread = new Thread("LogCheckingThread") {
override def run() = Utils.logUncaughtExceptions {
while (true) {
val now = getMonotonicTime()
if (now - lastLogCheckTimeMs > UPDATE_INTERVAL_MS) {
Thread.sleep(UPDATE_INTERVAL_MS)
} else {
// If the user has manually checked for logs recently, wait until
// UPDATE_INTERVAL_MS after the last check time
Thread.sleep(lastLogCheckTimeMs + UPDATE_INTERVAL_MS - now)
}
checkForLogs()
}
}
}

initialize()

private def initialize() {
// Validate the log directory.
val path = new Path(logDir)
if (!fs.exists(path)) {
throw new IllegalArgumentException(
"Logging directory specified does not exist: %s".format(logDir))
}
if (!fs.getFileStatus(path).isDir) {
throw new IllegalArgumentException(
"Logging directory specified is not a directory: %s".format(logDir))
}

checkForLogs()

// Treat 0 as "disable the background thread", mostly for testing.
if (UPDATE_INTERVAL_MS > 0) {
logCheckingThread.setDaemon(true)
logCheckingThread.start()
}
}

override def getListing(offset: Int, count: Int) = {
val list = appList.get()
val theOffset = if (offset < list.size) offset else 0
(list.slice(theOffset, Math.min(theOffset + count, list.size)), theOffset, list.size)
}

override def getAppInfo(appId: String): ApplicationHistoryInfo = {
try {
val appLogDir = fs.getFileStatus(new Path(logDir, appId))
loadAppInfo(appLogDir, true)
} catch {
case e: FileNotFoundException => null
}
}

/**
* Builds the application list based on the current contents of the log directory.
* Tries to reuse as much of the data already in memory as possible, but not reading
* applications that hasn't been updated since last time the logs were checked.
*/
private[history] def checkForLogs() = synchronized {
lastLogCheckTimeMs = getMonotonicTime()
logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs))
try {
val matcher = EventLoggingListener.LOG_FILE_NAME_REGEX
val logInfos = fs.listStatus(new Path(logDir))
.filter { entry =>
if (entry.isDir()) {
fs.exists(new Path(entry.getPath(), APPLICATION_COMPLETE))
} else {
try {
val matcher(version, codecName, inprogress) = entry.getPath().getName()
inprogress == null
} catch {
case e: Exception => false
}
}
}

var currentApps = Map[String, ApplicationHistoryInfo](
appList.get().map(app => (app.id -> app)):_*)

// For any application that either (i) is not listed or (ii) has changed since the last time
// the listing was created (defined by the log dir's modification time), load the app's info.
// Otherwise just reuse what's already in memory.
val newApps = new mutable.ListBuffer[ApplicationHistoryInfo]
for (log <- logInfos) {
val curr = currentApps.getOrElse(log.getPath().getName(), null)
if (curr == null || curr.lastUpdated < log.getModificationTime()) {
try {
val info = loadAppInfo(log, false)
if (info != null) {
newApps += info
}
} catch {
case e: Exception => logError(s"Failed to load app info from directory $log.")
}
} else {
newApps += curr
}
}

appList.set(newApps.sortBy { info => -info.lastUpdated })
} catch {
case t: Throwable => logError("Exception in checking for event log updates", t)
}
}

/**
* Parse the application's logs to find out the information we need to build the
* listing page.
*/
private def loadAppInfo(log: FileStatus, renderUI: Boolean): ApplicationHistoryInfo = {
val elogInfo = if (log.isFile()) {
EventLoggingListener.parseLoggingInfo(log.getPath())
} else {
loadOldLoggingInfo(log.getPath())
}

if (elogInfo == null) {
return null
}


val (logFile, lastUpdated) = if (log.isFile()) {
(elogInfo.path, log.getModificationTime())
} else {
// For old-style log directories, need to find the actual log file.
val status = fs.listStatus(elogInfo.path)
.filter(e => e.getPath().getName().startsWith(LOG_PREFIX))(0)
(status.getPath(), status.getModificationTime())
}

val appId = elogInfo.path.getName

val replayBus = new ReplayListenerBus(logFile, fs, elogInfo.compressionCodec)
val appListener = new ApplicationEventListener
replayBus.addListener(appListener)

val ui: SparkUI = if (renderUI) {
val conf = this.conf.clone()
val appSecManager = new SecurityManager(conf)
new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId)
// Do not call ui.bind() to avoid creating a new server for each application
} else {
null
}

replayBus.replay()
val appName = appListener.appName
val sparkUser = appListener.sparkUser
val startTime = appListener.startTime
val endTime = appListener.endTime
ApplicationHistoryInfo(appId,
appListener.appName,
appListener.startTime,
appListener.endTime,
lastUpdated,
appListener.sparkUser,
if (renderUI) appListener.viewAcls else null,
ui)
}

/**
* Load the app log information from a Spark 1.0.0 log directory, for backwards compatibility.
* This assumes that the log directory contains a single event log file, which is the case for
* directories generated by the code in that release.
*/
private[history] def loadOldLoggingInfo(dir: Path): EventLoggingInfo = {
val children = fs.listStatus(dir)
var eventLogPath: Path = null
var sparkVersion: String = null
var codecName: String = null
var applicationCompleted: Boolean = false

children.foreach(child => child.getPath().getName() match {
case name if name.startsWith(LOG_PREFIX) =>
eventLogPath = child.getPath()

case ver if ver.startsWith(SPARK_VERSION_PREFIX) =>
sparkVersion = ver.substring(SPARK_VERSION_PREFIX.length())

case codec if codec.startsWith(COMPRESSION_CODEC_PREFIX) =>
codecName = codec.substring(COMPRESSION_CODEC_PREFIX.length())

case complete if complete == APPLICATION_COMPLETE =>
applicationCompleted = true

case _ =>
})

val codec = try {
if (codecName != null) {
Some(CompressionCodec.createCodec(conf, codecName))
} else None
} catch {
case e: Exception =>
logError(s"Unknown compression codec $codecName.")
return null
}

if (eventLogPath == null || sparkVersion == null) {
logInfo(s"$dir is not a Spark application log directory.")
return null
}

EventLoggingInfo(dir, sparkVersion, codec, applicationCompleted)
}

/** Returns the system's mononotically increasing time. */
private def getMonotonicTime() = System.nanoTime() / (1000 * 1000)

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,28 @@ import org.apache.spark.ui.{WebUIPage, UIUtils}

private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {

val pageSize = 20

def render(request: HttpServletRequest): Seq[Node] = {
val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated }
val appTable = UIUtils.listingTable(appHeader, appRow, appRows)
val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt
val requestedFirst = (requestedPage - 1) * pageSize
val (apps, actualFirst, totalCount) = parent.getApplicationList(requestedFirst, pageSize)
val actualPage = (actualFirst / pageSize) + 1
val last = Math.min(actualFirst + pageSize, totalCount) - 1
val pageCount = totalCount / pageSize + (if (totalCount % pageSize > 0) 1 else 0)

val appTable = UIUtils.listingTable(appHeader, appRow, apps)
val content =
<div class="row-fluid">
<div class="span12">
<ul class="unstyled">
<li><strong>Event Log Location: </strong> {parent.baseLogDir}</li>
</ul>
{
if (parent.appIdToInfo.size > 0) {
if (totalCount > 0) {
<h4>
Showing {parent.appIdToInfo.size}/{parent.getNumApplications}
Completed Application{if (parent.getNumApplications > 1) "s" else ""}
Showing {actualFirst + 1}-{last + 1} of {totalCount}
<span style="float: right">
{if (actualPage > 1) <a href={"/?page=" + (actualPage - 1)}>&lt;</a>}
{if (actualPage < pageCount) <a href={"/?page=" + (actualPage + 1)}>&gt;</a>}
</span>
</h4> ++
appTable
} else {
Expand All @@ -56,26 +64,23 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
"Completed",
"Duration",
"Spark User",
"Log Directory",
"Last Updated")

private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
val appName = if (info.started) info.name else info.logDirPath.getName
val uiAddress = parent.getAddress + info.ui.basePath
val appName = if (info.started) info.name else info.id
val uiAddress = "/history/" + info.id
val startTime = if (info.started) UIUtils.formatDate(info.startTime) else "Not started"
val endTime = if (info.completed) UIUtils.formatDate(info.endTime) else "Not completed"
val difference = if (info.started && info.completed) info.endTime - info.startTime else -1L
val duration = if (difference > 0) UIUtils.formatDuration(difference) else "---"
val sparkUser = if (info.started) info.sparkUser else "Unknown user"
val logDirectory = info.logDirPath.getName
val lastUpdated = UIUtils.formatDate(info.lastUpdated)
<tr>
<td><a href={uiAddress}>{appName}</a></td>
<td>{startTime}</td>
<td>{endTime}</td>
<td>{duration}</td>
<td>{sparkUser}</td>
<td>{logDirectory}</td>
<td>{lastUpdated}</td>
</tr>
}
Expand Down
Loading