From fdef4d6c496575033cb84894c993997cf967c594 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Mon, 13 Apr 2015 20:30:51 +0800 Subject: [PATCH 1/7] check if app is completed before clean it up --- .../deploy/history/FsHistoryProvider.scala | 27 +++++++------------ 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 9d40d8c8fd7a..2dd204b7ad17 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -273,35 +273,28 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis */ private def cleanLogs(): Unit = { try { - val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) - .getOrElse(Seq[FileStatus]()) val maxAge = conf.getLong("spark.history.fs.cleaner.maxAge.seconds", DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000 val now = System.currentTimeMillis() val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() + // Scan all logs from the log directory. + // Only completed applications older than the specified max age will be deleted. applications.values.foreach { info => - if (now - info.lastUpdated <= maxAge) { + if (now - info.lastUpdated <= maxAge || !info.completed) { appsToRetain += (info.id -> info) + } else { + try { + fs.delete(new Path(info.logPath), true) + } catch { + case t: IOException => logError(s"IOException in cleaning logs of ${info.logPath}", t) + appsToRetain += (info.id -> info) + } } } applications = appsToRetain - - // Scan all logs from the log directory. - // Only directories older than the specified max age will be deleted - statusList.foreach { dir => - try { - if (now - dir.getModificationTime() > maxAge) { - // if path is a directory and set to true, - // the directory is deleted else throws an exception - fs.delete(dir.getPath, true) - } - } catch { - case t: IOException => logError(s"IOException in cleaning logs of $dir", t) - } - } } catch { case t: Exception => logError("Exception in cleaning logs", t) } From 9872a9d5c9b0417a0c8d714a832af082e49ac958 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Mon, 13 Apr 2015 21:41:06 +0800 Subject: [PATCH 2/7] use the right path --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 2dd204b7ad17..b33dfd9b63ca 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -286,7 +286,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis appsToRetain += (info.id -> info) } else { try { - fs.delete(new Path(info.logPath), true) + fs.delete(new Path(logDir + "/" + info.logPath), true) } catch { case t: IOException => logError(s"IOException in cleaning logs of ${info.logPath}", t) appsToRetain += (info.id -> info) From 94adfe17001e6aa65719f398fd61047494247c5c Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Tue, 14 Apr 2015 11:49:01 +0800 Subject: [PATCH 3/7] leave expired apps alone to be deleted --- .../deploy/history/FsHistoryProvider.scala | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index b33dfd9b63ca..5566199f819b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -278,6 +278,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis val now = System.currentTimeMillis() val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() + val appsToClean = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() // Scan all logs from the log directory. // Only completed applications older than the specified max age will be deleted. @@ -285,16 +286,21 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis if (now - info.lastUpdated <= maxAge || !info.completed) { appsToRetain += (info.id -> info) } else { - try { - fs.delete(new Path(logDir + "/" + info.logPath), true) - } catch { - case t: IOException => logError(s"IOException in cleaning logs of ${info.logPath}", t) - appsToRetain += (info.id -> info) - } + appsToClean += (info.id -> info) } } applications = appsToRetain + + appsToClean.values.foreach { info => + try { + fs.delete(new Path(logDir + "/" + info.logPath), true) + } catch { + case t: IOException => + logError(s"IOException in cleaning logs of ${info.logPath}", t) + applications += (info.id -> info) + } + } } catch { case t: Exception => logError("Exception in cleaning logs", t) } From b0abca54d693399ec2ebd966309b0aded735dd06 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Wed, 15 Apr 2015 14:02:03 +0800 Subject: [PATCH 4/7] use global var to store apps to clean --- .../spark/deploy/history/FsHistoryProvider.scala | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 5566199f819b..4eeb5ac2cff9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -21,6 +21,7 @@ import java.io.{IOException, BufferedInputStream, FileNotFoundException, InputSt import java.util.concurrent.{ExecutorService, Executors, TimeUnit} import scala.collection.mutable +import scala.collection.mutable.ListBuffer import scala.concurrent.duration.Duration import com.google.common.util.concurrent.ThreadFactoryBuilder @@ -81,6 +82,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo] = new mutable.LinkedHashMap() + // List of applications to be deleted by event log cleaner. + private var appsToClean: ListBuffer[FsApplicationHistoryInfo] = _ + // Constants used to parse Spark 1.0.0 log directories. private[history] val LOG_PREFIX = "EVENT_LOG_" private[history] val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_" @@ -134,6 +138,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis TimeUnit.MILLISECONDS) if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) { + appsToClean = new ListBuffer[FsApplicationHistoryInfo] // A task that periodically cleans event logs on disk. pool.scheduleAtFixedRate(getRunner(cleanLogs), 0, CLEAN_INTERVAL_MS, TimeUnit.MILLISECONDS) @@ -278,7 +283,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis val now = System.currentTimeMillis() val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() - val appsToClean = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() // Scan all logs from the log directory. // Only completed applications older than the specified max age will be deleted. @@ -286,19 +290,21 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis if (now - info.lastUpdated <= maxAge || !info.completed) { appsToRetain += (info.id -> info) } else { - appsToClean += (info.id -> info) + appsToClean += info } } applications = appsToRetain - appsToClean.values.foreach { info => + appsToClean.foreach { info => try { - fs.delete(new Path(logDir + "/" + info.logPath), true) + val path = new Path(logDir + "/" + info.logPath) + if (fs.exists(path) && fs.delete(path, true)) { + appsToClean -= info + } } catch { case t: IOException => logError(s"IOException in cleaning logs of ${info.logPath}", t) - applications += (info.id -> info) } } } catch { From d7455d8df310d690d8104663dc39508011726d12 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Wed, 15 Apr 2015 14:09:13 +0800 Subject: [PATCH 5/7] slightly change when delete file --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 4eeb5ac2cff9..a32033092dc7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -299,9 +299,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis appsToClean.foreach { info => try { val path = new Path(logDir + "/" + info.logPath) - if (fs.exists(path) && fs.delete(path, true)) { - appsToClean -= info + if (fs.exists(path)) { + fs.delete(path, true) } + appsToClean -= info } catch { case t: IOException => logError(s"IOException in cleaning logs of ${info.logPath}", t) From d4d52515b5a240fe4f23530d04b3fa18acd84b9f Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Thu, 16 Apr 2015 09:30:33 +0800 Subject: [PATCH 6/7] per Marcelo's comments --- .../spark/deploy/history/FsHistoryProvider.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index a32033092dc7..8dd7a0367bbe 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -21,7 +21,6 @@ import java.io.{IOException, BufferedInputStream, FileNotFoundException, InputSt import java.util.concurrent.{ExecutorService, Executors, TimeUnit} import scala.collection.mutable -import scala.collection.mutable.ListBuffer import scala.concurrent.duration.Duration import com.google.common.util.concurrent.ThreadFactoryBuilder @@ -36,7 +35,6 @@ import org.apache.spark.ui.SparkUI import org.apache.spark.util.Utils import org.apache.spark.{Logging, SecurityManager, SparkConf} - /** * A class that provides application history from event logs stored in the file system. * This provider checks for new finished applications in the background periodically and @@ -83,7 +81,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis = new mutable.LinkedHashMap() // List of applications to be deleted by event log cleaner. - private var appsToClean: ListBuffer[FsApplicationHistoryInfo] = _ + private var appsToClean = new mutable.ListBuffer[FsApplicationHistoryInfo] // Constants used to parse Spark 1.0.0 log directories. private[history] val LOG_PREFIX = "EVENT_LOG_" @@ -138,7 +136,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis TimeUnit.MILLISECONDS) if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) { - appsToClean = new ListBuffer[FsApplicationHistoryInfo] // A task that periodically cleans event logs on disk. pool.scheduleAtFixedRate(getRunner(cleanLogs), 0, CLEAN_INTERVAL_MS, TimeUnit.MILLISECONDS) @@ -296,18 +293,21 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis applications = appsToRetain + val leftToClean = new mutable.ListBuffer[FsApplicationHistoryInfo] appsToClean.foreach { info => try { - val path = new Path(logDir + "/" + info.logPath) + val path = new Path(logDir, info.logPath) if (fs.exists(path)) { fs.delete(path, true) } - appsToClean -= info } catch { case t: IOException => logError(s"IOException in cleaning logs of ${info.logPath}", t) + leftToClean += info } } + + appsToClean = leftToClean } catch { case t: Exception => logError("Exception in cleaning logs", t) } From 4a533eb4f67ed16e0af2a2ccc19eb710e590c3c0 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Tue, 21 Apr 2015 10:40:54 +0800 Subject: [PATCH 7/7] treat ACE specially --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 2dc71e4e775a..be59565e1072 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -293,6 +293,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis fs.delete(path, true) } } catch { + case e: AccessControlException => + logInfo(s"No permission to delete ${info.logPath}, ignoring.") case t: IOException => logError(s"IOException in cleaning logs of ${info.logPath}", t) leftToClean += info