@@ -39,7 +39,7 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
3939 private val fs = Utils .getHadoopFileSystem(logDir)
4040
4141 // A timestamp of when the disk was last accessed to check for log updates
42- private var lastLogCheckTime = - 1L
42+ private var lastLogCheckTimeMs = - 1L
4343
4444 // List of applications, in order from newest to oldest.
4545 private val appList = new AtomicReference [Seq [ApplicationHistoryInfo ]](Nil )
@@ -55,13 +55,13 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
5555 private val logCheckingThread = new Thread (" LogCheckingThread" ) {
5656 override def run () = Utils .logUncaughtExceptions {
5757 while (true ) {
58- val now = System .currentTimeMillis
59- if (now - lastLogCheckTime > UPDATE_INTERVAL_MS ) {
58+ val now = getMonotonicTime()
59+ if (now - lastLogCheckTimeMs > UPDATE_INTERVAL_MS ) {
6060 Thread .sleep(UPDATE_INTERVAL_MS )
6161 } else {
6262 // If the user has manually checked for logs recently, wait until
6363 // UPDATE_INTERVAL_MS after the last check time
64- Thread .sleep(lastLogCheckTime + UPDATE_INTERVAL_MS - now)
64+ Thread .sleep(lastLogCheckTimeMs + UPDATE_INTERVAL_MS - now)
6565 }
6666 checkForLogs()
6767 }
@@ -108,13 +108,12 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
108108 * applications that hasn't been updated since last time the logs were checked.
109109 */
110110 def checkForLogs () = synchronized {
111- lastLogCheckTime = System .currentTimeMillis
112- logDebug(" Checking for logs. Time is now %d." .format(lastLogCheckTime ))
111+ lastLogCheckTimeMs = getMonotonicTime()
112+ logDebug(" Checking for logs. Time is now %d." .format(lastLogCheckTimeMs ))
113113 try {
114114 val logStatus = fs.listStatus(new Path (logDir))
115115 val logDirs = if (logStatus != null ) logStatus.filter(_.isDir).toSeq else Seq [FileStatus ]()
116116 val logInfos = logDirs
117- .sortBy { dir => getModificationTime(dir) }
118117 .filter {
119118 dir => fs.isFile(new Path (dir.getPath(), EventLoggingListener .APPLICATION_COMPLETE ))
120119 }
@@ -125,7 +124,7 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
125124 // For any application that either (i) is not listed or (ii) has changed since the last time
126125 // the listing was created (defined by the log dir's modification time), load the app's info.
127126 // Otherwise just reuse what's already in memory.
128- val newApps = new mutable.ListBuffer [ApplicationHistoryInfo ]
127+ val newApps = new mutable.ArrayBuffer [ApplicationHistoryInfo ]
129128 for (dir <- logInfos) {
130129 val curr = currentApps.getOrElse(dir.getPath().getName(), null )
131130 if (curr == null || curr.lastUpdated < getModificationTime(dir)) {
@@ -198,4 +197,7 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
198197 }
199198 }
200199
200+ /** Returns the system's mononotically increasing time. */
201+ private def getMonotonicTime () = System .nanoTime() / (1000 * 1000 )
202+
201203}
0 commit comments