@@ -21,6 +21,7 @@ import java.io.{IOException, BufferedInputStream, FileNotFoundException, InputSt
2121import java .util .concurrent .{ExecutorService , Executors , TimeUnit }
2222
2323import scala .collection .mutable
24+ import scala .collection .mutable .ListBuffer
2425import scala .concurrent .duration .Duration
2526
2627import com .google .common .util .concurrent .ThreadFactoryBuilder
@@ -81,6 +82,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
8182 @ volatile private var applications : mutable.LinkedHashMap [String , FsApplicationHistoryInfo ]
8283 = new mutable.LinkedHashMap ()
8384
85+ // List of applications to be deleted by event log cleaner.
86+ private var appsToClean : ListBuffer [FsApplicationHistoryInfo ] = _
87+
8488 // Constants used to parse Spark 1.0.0 log directories.
8589 private [history] val LOG_PREFIX = " EVENT_LOG_"
8690 private [history] val SPARK_VERSION_PREFIX = EventLoggingListener .SPARK_VERSION_KEY + " _"
@@ -134,6 +138,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
134138 TimeUnit .MILLISECONDS )
135139
136140 if (conf.getBoolean(" spark.history.fs.cleaner.enabled" , false )) {
141+ appsToClean = new ListBuffer [FsApplicationHistoryInfo ]
137142 // A task that periodically cleans event logs on disk.
138143 pool.scheduleAtFixedRate(getRunner(cleanLogs), 0 , CLEAN_INTERVAL_MS ,
139144 TimeUnit .MILLISECONDS )
@@ -278,27 +283,28 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
278283
279284 val now = System .currentTimeMillis()
280285 val appsToRetain = new mutable.LinkedHashMap [String , FsApplicationHistoryInfo ]()
281- val appsToClean = new mutable.LinkedHashMap [String , FsApplicationHistoryInfo ]()
282286
283287 // Scan all logs from the log directory.
284288 // Only completed applications older than the specified max age will be deleted.
285289 applications.values.foreach { info =>
286290 if (now - info.lastUpdated <= maxAge || ! info.completed) {
287291 appsToRetain += (info.id -> info)
288292 } else {
289- appsToClean += ( info.id -> info)
293+ appsToClean += info
290294 }
291295 }
292296
293297 applications = appsToRetain
294298
295- appsToClean.values. foreach { info =>
299+ appsToClean.foreach { info =>
296300 try {
297- fs.delete(new Path (logDir + " /" + info.logPath), true )
301+ val path = new Path (logDir + " /" + info.logPath)
302+ if (fs.exists(path) && fs.delete(path, true )) {
303+ appsToClean -= info
304+ }
298305 } catch {
299306 case t : IOException =>
300307 logError(s " IOException in cleaning logs of ${info.logPath}" , t)
301- applications += (info.id -> info)
302308 }
303309 }
304310 } catch {
0 commit comments