1818package org .apache .spark .deploy .history
1919
2020import java .io .FileNotFoundException
21- import java .util .concurrent .atomic .AtomicReference
2221
2322import scala .collection .mutable
2423
@@ -42,7 +41,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
4241 private var lastLogCheckTimeMs = - 1L
4342
4443 // List of applications, in order from newest to oldest.
45- private val appList = new AtomicReference [ Seq [ApplicationHistoryInfo ]]( Nil )
44+ @ volatile private var appList : Seq [ApplicationHistoryInfo ] = Nil
4645
4746 /**
4847 * A background thread that periodically checks for event log updates on disk.
@@ -88,7 +87,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
8887 }
8988
9089 override def getListing (offset : Int , count : Int ) = {
91- val list = appList.get()
90+ val list = appList
9291 val theOffset = if (offset < list.size) offset else 0
9392 (list.slice(theOffset, Math .min(theOffset + count, list.size)), theOffset, list.size)
9493 }
@@ -104,10 +103,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
104103
105104 /**
106105 * Builds the application list based on the current contents of the log directory.
107- * Tries to reuse as much of the data already in memory as possible, but not reading
108- * applications that hasn 't been updated since last time the logs were checked.
106+ * Tries to reuse as much of the data already in memory as possible, by not reading
107+ * applications that haven 't been updated since last time the logs were checked.
109108 */
110- def checkForLogs () = synchronized {
109+ def checkForLogs () = {
111110 lastLogCheckTimeMs = getMonotonicTime()
112111 logDebug(" Checking for logs. Time is now %d." .format(lastLogCheckTimeMs))
113112 try {
@@ -118,8 +117,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
118117 dir => fs.isFile(new Path (dir.getPath(), EventLoggingListener .APPLICATION_COMPLETE ))
119118 }
120119
121- var currentApps = Map [String , ApplicationHistoryInfo ](
122- appList.get(). map(app => (app.id -> app)):_* )
120+ val currentApps = Map [String , ApplicationHistoryInfo ](
121+ appList.map(app => (app.id -> app)):_* )
123122
124123 // For any application that either (i) is not listed or (ii) has changed since the last time
125124 // the listing was created (defined by the log dir's modification time), load the app's info.
@@ -138,7 +137,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
138137 }
139138 }
140139
141- appList.set( newApps.sortBy { info => - info.lastUpdated })
140+ appList = newApps.sortBy { info => - info.lastUpdated }
142141 } catch {
143142 case t : Throwable => logError(" Exception in checking for event log updates" , t)
144143 }
@@ -166,11 +165,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
166165 }
167166
168167 replayBus.replay()
169- val appName = appListener.appName
170- val sparkUser = appListener.sparkUser
171- val startTime = appListener.startTime
172- val endTime = appListener.endTime
173- val lastUpdated = getModificationTime(logDir)
174168 ApplicationHistoryInfo (appId,
175169 appListener.appName,
176170 appListener.startTime,
0 commit comments