Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import org.apache.spark.ui.SparkUI
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SecurityManager, SparkConf}


/**
* A class that provides application history from event logs stored in the file system.
* This provider checks for new finished applications in the background periodically and
Expand Down Expand Up @@ -76,6 +75,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
@volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
= new mutable.LinkedHashMap()

// List of applications to be deleted by event log cleaner.
private var appsToClean = new mutable.ListBuffer[FsApplicationHistoryInfo]

// Constants used to parse Spark 1.0.0 log directories.
private[history] val LOG_PREFIX = "EVENT_LOG_"
private[history] val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
Expand Down Expand Up @@ -266,34 +268,40 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
*/
private def cleanLogs(): Unit = {
try {
val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
.getOrElse(Seq[FileStatus]())
val maxAge = conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", "7d") * 1000

val now = System.currentTimeMillis()
val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()

// Scan all logs from the log directory.
// Only completed applications older than the specified max age will be deleted.
applications.values.foreach { info =>
if (now - info.lastUpdated <= maxAge) {
if (now - info.lastUpdated <= maxAge || !info.completed) {
appsToRetain += (info.id -> info)
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This else block needs to stay after line 297 (applications = appsToRetain). That ensures that the code will make a best effort at only deleting app data after they have been removed from the app list.

A user may still may be looking at an old version of the list, and thus may still be able to click on an invalid link, but at least the HS's internal state is consistent.

appsToClean += info
}
}

applications = appsToRetain

// Scan all logs from the log directory.
// Only directories older than the specified max age will be deleted
statusList.foreach { dir =>
val leftToClean = new mutable.ListBuffer[FsApplicationHistoryInfo]
appsToClean.foreach { info =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like this would probably be more efficient:

while (appsToClean.nonEmpty) {
  val info = appsToClean.last
  try {
    ...
    appsToClean.remove(appsToClean.size - 1)
  } catch {
    ...
  }
}

But probably not a big deal in this context.

One thing to note is that if someone adds logs with the wrong permissions, this code will never be able to delete them, so those logs will forever be in the appsToClean list. It might be worth it to treat AccessControlException especially here and just give up trying to clean up logs with the wrong permissions.

try {
if (now - dir.getModificationTime() > maxAge) {
// if path is a directory and set to true,
// the directory is deleted else throws an exception
fs.delete(dir.getPath, true)
val path = new Path(logDir, info.logPath)
if (fs.exists(path)) {
fs.delete(path, true)
}
} catch {
case t: IOException => logError(s"IOException in cleaning logs of $dir", t)
case e: AccessControlException =>
logInfo(s"No permission to delete ${info.logPath}, ignoring.")
case t: IOException =>
logError(s"IOException in cleaning logs of ${info.logPath}", t)
leftToClean += info
}
}

appsToClean = leftToClean
} catch {
case t: Exception => logError("Exception in cleaning logs", t)
}
Expand Down