@@ -19,6 +19,7 @@ package org.apache.spark.deploy.history
1919
2020import java .io .{BufferedInputStream , FileNotFoundException , InputStream , IOException , OutputStream }
2121import java .util .concurrent .{ExecutorService , Executors , TimeUnit }
22+ import java .util .UUID
2223import java .util .zip .{ZipEntry , ZipOutputStream }
2324
2425import scala .collection .mutable
@@ -73,7 +74,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
7374 // The modification time of the newest log detected during the last scan. This is used
7475 // to ignore logs that are older during subsequent scans, to avoid processing data that
7576 // is already known.
76- private var lastModifiedTime = - 1L
77+ private var lastScanTime = - 1L
7778
7879 // Mapping of application IDs to their metadata, in descending end time order. Apps are inserted
7980 // into the map in order, so the LinkedHashMap maintains the correct ordering.
@@ -179,15 +180,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
179180 */
180181 private [history] def checkForLogs (): Unit = {
181182 try {
183+ val newLastScanTime = getNewLastScanTime()
182184 val statusList = Option (fs.listStatus(new Path (logDir))).map(_.toSeq)
183185 .getOrElse(Seq [FileStatus ]())
184- var newLastModifiedTime = lastModifiedTime
185186 val logInfos : Seq [FileStatus ] = statusList
186187 .filter { entry =>
187188 try {
188189 getModificationTime(entry).map { time =>
189- newLastModifiedTime = math.max(newLastModifiedTime, time)
190- time >= lastModifiedTime
190+ time >= lastScanTime
191191 }.getOrElse(false )
192192 } catch {
193193 case e : AccessControlException =>
@@ -224,12 +224,29 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
224224 }
225225 }
226226
227- lastModifiedTime = newLastModifiedTime
227+ lastScanTime = newLastScanTime
228228 } catch {
229229 case e : Exception => logError(" Exception in checking for event log updates" , e)
230230 }
231231 }
232232
233+ private def getNewLastScanTime (): Long = {
234+ val fileName = " ." + UUID .randomUUID().toString
235+ val path = new Path (logDir, fileName)
236+ val fos = fs.create(path)
237+
238+ try {
239+ fos.close()
240+ fs.getFileStatus(path).getModificationTime
241+ } catch {
242+ case e : Exception =>
243+ logError(" Exception encountered when attempting to update last scan time" , e)
244+ lastScanTime
245+ } finally {
246+ fs.delete(path)
247+ }
248+ }
249+
233250 override def writeEventLogs (
234251 appId : String ,
235252 attemptId : Option [String ],
0 commit comments