Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
package org.apache.spark.deploy.history

import java.io.{File, FileNotFoundException, IOException}
import java.util.{Date, ServiceLoader, UUID}
import java.util.{Date, ServiceLoader}
import java.util.concurrent.{ExecutorService, TimeUnit}
import java.util.zip.{ZipEntry, ZipOutputStream}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.io.Source
import scala.util.Try
import scala.xml.Node

Expand Down Expand Up @@ -58,10 +59,10 @@ import org.apache.spark.util.kvstore._
*
* == How new and updated attempts are detected ==
*
* - New attempts are detected in [[checkForLogs]]: the log dir is scanned, and any
* entries in the log dir whose modification time is greater than the last scan time
* are considered new or updated. These are replayed to create a new attempt info entry
* and update or create a matching application info element in the list of applications.
* - New attempts are detected in [[checkForLogs]]: the log dir is scanned, and any entries in the
* log dir whose size changed since the last scan time are considered new or updated. These are
* replayed to create a new attempt info entry and update or create a matching application info
* element in the list of applications.
* - Updated attempts are also found in [[checkForLogs]] -- if the attempt's log file has grown, the
* attempt is replaced by another one with a larger log size.
*
Expand Down Expand Up @@ -125,6 +126,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class comment is no longer correct when it talks about finding new attempts based on modification time. You should have a listing entry for every file in the dir


private val storePath = conf.get(LOCAL_STORE_DIR).map(new File(_))
private val fastInProgressParsing = conf.get(FAST_IN_PROGRESS_PARSING)

// Visible for testing.
private[history] val listing: KVStore = storePath.map { path =>
Expand Down Expand Up @@ -402,13 +404,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
*/
private[history] def checkForLogs(): Unit = {
try {
val newLastScanTime = getNewLastScanTime()
val newLastScanTime = clock.getTimeMillis()
logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")

val updated = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil)
.filter { entry =>
!entry.isDirectory() &&
// FsHistoryProvider generates a hidden file which can't be read. Accidentally
// FsHistoryProvider used to generate a hidden file which can't be read. Accidentally
// reading a garbage file is safe, but we would log an error which can be scary to
// the end-user.
!entry.getPath().getName().startsWith(".") &&
Expand All @@ -417,15 +419,24 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
.filter { entry =>
try {
val info = listing.read(classOf[LogInfo], entry.getPath().toString())
if (info.fileSize < entry.getLen()) {
// Log size has changed, it should be parsed.
true
} else {

if (info.appId.isDefined) {
// If the SHS view has a valid application, update the time the file was last seen so
// that the entry is not deleted from the SHS listing.
if (info.appId.isDefined) {
listing.write(info.copy(lastProcessed = newLastScanTime))
// that the entry is not deleted from the SHS listing. Also update the file size, in
// case the code below decides we don't need to parse the log.
listing.write(info.copy(lastProcessed = newLastScanTime, fileSize = entry.getLen()))
}

if (info.fileSize < entry.getLen()) {
if (info.appId.isDefined && fastInProgressParsing) {
// When fast in-progress parsing is on, we don't need to re-parse when the
// size changes, but we do need to invalidate any existing UIs.
invalidateUI(info.appId.get, info.attemptId)
false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need to update info.fileSize here too? Because you skip the re-parse, you don't update it in mergeApplicationListings. So i think once you hit this condition once, you'll always invalidate the UI on every iteration.

} else {
true
}
} else {
false
}
} catch {
Expand All @@ -449,7 +460,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val tasks = updated.map { entry =>
try {
replayExecutor.submit(new Runnable {
override def run(): Unit = mergeApplicationListing(entry, newLastScanTime)
override def run(): Unit = mergeApplicationListing(entry, newLastScanTime, true)
})
} catch {
// let the iteration over the updated entries break, since an exception on
Expand Down Expand Up @@ -542,25 +553,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

private[history] def getNewLastScanTime(): Long = {
val fileName = "." + UUID.randomUUID().toString
val path = new Path(logDir, fileName)
val fos = fs.create(path)

try {
fos.close()
fs.getFileStatus(path).getModificationTime
} catch {
case e: Exception =>
logError("Exception encountered when attempting to update last scan time", e)
lastScanTime.get()
} finally {
if (!fs.delete(path, true)) {
logWarning(s"Error deleting ${path}")
}
}
}

override def writeEventLogs(
appId: String,
attemptId: Option[String],
Expand Down Expand Up @@ -607,7 +599,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
/**
* Replay the given log file, saving the application in the listing db.
*/
protected def mergeApplicationListing(fileStatus: FileStatus, scanTime: Long): Unit = {
protected def mergeApplicationListing(
fileStatus: FileStatus,
scanTime: Long,
enableOptimizations: Boolean): Unit = {
val eventsFilter: ReplayEventsFilter = { eventString =>
eventString.startsWith(APPL_START_EVENT_PREFIX) ||
eventString.startsWith(APPL_END_EVENT_PREFIX) ||
Expand All @@ -616,32 +611,118 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

val logPath = fileStatus.getPath()
val appCompleted = isCompleted(logPath.getName())
val reparseChunkSize = conf.get(END_EVENT_REPARSE_CHUNK_SIZE)

// Enable halt support in listener if:
// - app in progress && fast parsing enabled
// - skipping to end event is enabled (regardless of in-progress state)
val shouldHalt = enableOptimizations &&
((!appCompleted && fastInProgressParsing) || reparseChunkSize > 0)

val bus = new ReplayListenerBus()
val listener = new AppListingListener(fileStatus, clock)
val listener = new AppListingListener(fileStatus, clock, shouldHalt)
bus.addListener(listener)
replay(fileStatus, bus, eventsFilter = eventsFilter)

val (appId, attemptId) = listener.applicationInfo match {
case Some(app) =>
// Invalidate the existing UI for the reloaded app attempt, if any. See LoadedAppUI for a
// discussion on the UI lifecycle.
synchronized {
activeUIs.get((app.info.id, app.attempts.head.info.attemptId)).foreach { ui =>
ui.invalidate()
ui.ui.store.close()

logInfo(s"Parsing $logPath for listing data...")
Utils.tryWithResource(EventLoggingListener.openEventLog(logPath, fs)) { in =>
bus.replay(in, logPath.toString, !appCompleted, eventsFilter)
}

// If enabled above, the listing listener will halt parsing when there's enough information to
// create a listing entry. When the app is completed, or fast parsing is disabled, we still need
// to replay until the end of the log file to try to find the app end event. Instead of reading
// and parsing line by line, this code skips bytes from the underlying stream so that it is
// positioned somewhere close to the end of the log file.
//
// Because the application end event is written while some Spark subsystems such as the
// scheduler are still active, there is no guarantee that the end event will be the last
// in the log. So, to be safe, the code uses a configurable chunk to be re-parsed at
// the end of the file, and retries parsing the whole log later if the needed data is
// still not found.
//
// Note that skipping bytes in compressed files is still not cheap, but there are still some
// minor gains over the normal log parsing done by the replay bus.
//
// This code re-opens the file so that it knows where it's skipping to. This isn't as cheap as
// just skipping from the current position, but there isn't a a good way to detect what the
// current position is, since the replay listener bus buffers data internally.
val lookForEndEvent = shouldHalt && (appCompleted || !fastInProgressParsing)
if (lookForEndEvent && listener.applicationInfo.isDefined) {
Utils.tryWithResource(EventLoggingListener.openEventLog(logPath, fs)) { in =>
val target = fileStatus.getLen() - reparseChunkSize
if (target > 0) {
logInfo(s"Looking for end event; skipping $target bytes from $logPath...")
var skipped = 0L
while (skipped < target) {
skipped += in.skip(target - skipped)
}
}

val source = Source.fromInputStream(in).getLines()

// Because skipping may leave the stream in the middle of a line, read the next line
// before replaying.
if (target > 0) {
source.next()
}

bus.replay(source, logPath.toString, !appCompleted, eventsFilter)
}
}

logInfo(s"Finished parsing $logPath")

listener.applicationInfo match {
case Some(app) if !lookForEndEvent || app.attempts.head.info.completed =>
// In this case, we either didn't care about the end event, or we found it. So the
// listing data is good.
invalidateUI(app.info.id, app.attempts.head.info.attemptId)
addListing(app)
(Some(app.info.id), app.attempts.head.info.attemptId)
listing.write(LogInfo(logPath.toString(), scanTime, Some(app.info.id),
app.attempts.head.info.attemptId, fileStatus.getLen()))

// For a finished log, remove the corresponding "in progress" entry from the listing DB if
// the file is really gone.
if (appCompleted) {
val inProgressLog = logPath.toString() + EventLoggingListener.IN_PROGRESS
try {
// Fetch the entry first to avoid an RPC when it's already removed.
listing.read(classOf[LogInfo], inProgressLog)
if (!fs.isFile(new Path(inProgressLog))) {
listing.delete(classOf[LogInfo], inProgressLog)
}
} catch {
case _: NoSuchElementException =>
}
}

case Some(_) =>
// In this case, the attempt is still not marked as finished but was expected to. This can
// mean the end event is before the configured threshold, so call the method again to
// re-parse the whole log.
logInfo(s"Reparsing $logPath since end event was not found.")
mergeApplicationListing(fileStatus, scanTime, false)

case _ =>
// If the app hasn't written down its app ID to the logs, still record the entry in the
// listing db, with an empty ID. This will make the log eligible for deletion if the app
// does not make progress after the configured max log age.
(None, None)
listing.write(LogInfo(logPath.toString(), scanTime, None, None, fileStatus.getLen()))
}
}

/**
* Invalidate an existing UI for a given app attempt. See LoadedAppUI for a discussion on the
* UI lifecycle.
*/
private def invalidateUI(appId: String, attemptId: Option[String]): Unit = {
synchronized {
activeUIs.get((appId, attemptId)).foreach { ui =>
ui.invalidate()
ui.ui.store.close()
}
}
listing.write(LogInfo(logPath.toString(), scanTime, appId, attemptId, fileStatus.getLen()))
}

/**
Expand Down Expand Up @@ -696,29 +777,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

/**
* Replays the events in the specified log file on the supplied `ReplayListenerBus`.
* `ReplayEventsFilter` determines what events are replayed.
*/
private def replay(
eventLog: FileStatus,
bus: ReplayListenerBus,
eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = {
val logPath = eventLog.getPath()
val isCompleted = !logPath.getName().endsWith(EventLoggingListener.IN_PROGRESS)
logInfo(s"Replaying log path: $logPath")
// Note that the eventLog may have *increased* in size since when we grabbed the filestatus,
// and when we read the file here. That is OK -- it may result in an unnecessary refresh
// when there is no update, but will not result in missing an update. We *must* prevent
// an error the other way -- if we report a size bigger (ie later) than the file that is
// actually read, we may never refresh the app. FileStatus is guaranteed to be static
// after it's created, so we get a file size that is no bigger than what is actually read.
Utils.tryWithResource(EventLoggingListener.openEventLog(logPath, fs)) { in =>
bus.replay(in, logPath.toString, !isCompleted, eventsFilter)
logInfo(s"Finished parsing $logPath")
}
}

/**
* Rebuilds the application state store from its event log.
*/
Expand All @@ -741,8 +799,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
} replayBus.addListener(listener)

try {
replay(eventLog, replayBus)
val path = eventLog.getPath()
logInfo(s"Parsing $path to re-build UI...")
Utils.tryWithResource(EventLoggingListener.openEventLog(path, fs)) { in =>
replayBus.replay(in, path.toString(), maybeTruncated = !isCompleted(path.toString()))
}
trackingStore.close(false)
logInfo(s"Finished parsing $path")
} catch {
case e: Exception =>
Utils.tryLogNonFatalError {
Expand Down Expand Up @@ -881,6 +944,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

private def isCompleted(name: String): Boolean = {
!name.endsWith(EventLoggingListener.IN_PROGRESS)
}

}

private[history] object FsHistoryProvider {
Expand Down Expand Up @@ -945,11 +1012,17 @@ private[history] class ApplicationInfoWrapper(

}

private[history] class AppListingListener(log: FileStatus, clock: Clock) extends SparkListener {
private[history] class AppListingListener(
log: FileStatus,
clock: Clock,
haltEnabled: Boolean) extends SparkListener {

private val app = new MutableApplicationInfo()
private val attempt = new MutableAttemptInfo(log.getPath().getName(), log.getLen())

private var gotEnvUpdate = false
private var halted = false

override def onApplicationStart(event: SparkListenerApplicationStart): Unit = {
app.id = event.appId.orNull
app.name = event.appName
Expand All @@ -958,6 +1031,8 @@ private[history] class AppListingListener(log: FileStatus, clock: Clock) extends
attempt.startTime = new Date(event.time)
attempt.lastUpdated = new Date(clock.getTimeMillis())
attempt.sparkUser = event.sparkUser

checkProgress()
}

override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = {
Expand All @@ -968,11 +1043,18 @@ private[history] class AppListingListener(log: FileStatus, clock: Clock) extends
}

override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = {
val allProperties = event.environmentDetails("Spark Properties").toMap
attempt.viewAcls = allProperties.get("spark.ui.view.acls")
attempt.adminAcls = allProperties.get("spark.admin.acls")
attempt.viewAclsGroups = allProperties.get("spark.ui.view.acls.groups")
attempt.adminAclsGroups = allProperties.get("spark.admin.acls.groups")
// Only parse the first env update, since any future changes don't have any effect on
// the ACLs set for the UI.
if (!gotEnvUpdate) {
val allProperties = event.environmentDetails("Spark Properties").toMap
attempt.viewAcls = allProperties.get("spark.ui.view.acls")
attempt.adminAcls = allProperties.get("spark.admin.acls")
attempt.viewAclsGroups = allProperties.get("spark.ui.view.acls.groups")
attempt.adminAclsGroups = allProperties.get("spark.admin.acls.groups")

gotEnvUpdate = true
checkProgress()
}
}

override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
Expand All @@ -989,6 +1071,17 @@ private[history] class AppListingListener(log: FileStatus, clock: Clock) extends
}
}

/**
* Throws a halt exception to stop replay if enough data to create the app listing has been
* read.
*/
private def checkProgress(): Unit = {
if (haltEnabled && !halted && app.id != null && gotEnvUpdate) {
halted = true
throw new HaltReplayException()
}
}

private class MutableApplicationInfo {
var id: String = null
var name: String = null
Expand Down
Loading