@@ -32,6 +32,8 @@ import org.apache.spark.util.Utils
3232private [history] class FsHistoryProvider (conf : SparkConf ) extends ApplicationHistoryProvider
3333 with Logging {
3434
35+ private val NOT_STARTED = " <Not Started>"
36+
3537 // Interval between each check for event log updates
3638 private val UPDATE_INTERVAL_MS = conf.getInt(" spark.history.fs.updateInterval" ,
3739 conf.getInt(" spark.history.updateInterval" , 10 )) * 1000
@@ -47,8 +49,15 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
4749 // A timestamp of when the disk was last accessed to check for log updates
4850 private var lastLogCheckTimeMs = - 1L
4951
50- // List of applications, in order from newest to oldest.
51- @ volatile private var appList : Seq [ApplicationHistoryInfo ] = Nil
52+ // The modification time of the newest log detected during the last scan. This is used
53+ // to ignore logs that are older during subsequent scans, to avoid processing data that
54+ // is already known.
55+ private var lastModifiedTime = - 1L
56+
57+ // Mapping of application IDs to their metadata, in descending end time order. Apps are inserted
58+ // into the map in order, so the LinkedHashMap maintains the correct ordering.
59+ @ volatile private var applications : mutable.LinkedHashMap [String , FsApplicationHistoryInfo ]
60+ = new mutable.LinkedHashMap ()
5261
5362 /**
5463 * A background thread that periodically checks for event log updates on disk.
@@ -93,15 +102,35 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
93102 logCheckingThread.start()
94103 }
95104
96- override def getListing () = appList
105+ override def getListing () = applications.values
97106
98- override def getAppUI (appId : String ): SparkUI = {
107+ override def getAppUI (appId : String ): Option [ SparkUI ] = {
99108 try {
100- val appLogDir = fs.getFileStatus(new Path (resolvedLogDir.toString, appId))
101- val (_, ui) = loadAppInfo(appLogDir, renderUI = true )
102- ui
109+ applications.get(appId).map { info =>
110+ val (replayBus, appListener) = createReplayBus(fs.getFileStatus(
111+ new Path (logDir, info.logDir)))
112+ val ui = {
113+ val conf = this .conf.clone()
114+ val appSecManager = new SecurityManager (conf)
115+ new SparkUI (conf, appSecManager, replayBus, appId,
116+ s " ${HistoryServer .UI_PATH_PREFIX }/ $appId" )
117+ // Do not call ui.bind() to avoid creating a new server for each application
118+ }
119+
120+ replayBus.replay()
121+
122+ ui.setAppName(s " ${appListener.appName.getOrElse(NOT_STARTED )} ( $appId) " )
123+
124+ val uiAclsEnabled = conf.getBoolean(" spark.history.ui.acls.enable" , false )
125+ ui.getSecurityManager.setAcls(uiAclsEnabled)
126+ // make sure to set admin acls before view acls so they are properly picked up
127+ ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(" " ))
128+ ui.getSecurityManager.setViewAcls(appListener.sparkUser.getOrElse(NOT_STARTED ),
129+ appListener.viewAcls.getOrElse(" " ))
130+ ui
131+ }
103132 } catch {
104- case e : FileNotFoundException => null
133+ case e : FileNotFoundException => None
105134 }
106135 }
107136
@@ -119,84 +148,79 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
119148 try {
120149 val logStatus = fs.listStatus(new Path (resolvedLogDir))
121150 val logDirs = if (logStatus != null ) logStatus.filter(_.isDir).toSeq else Seq [FileStatus ]()
122- val logInfos = logDirs.filter { dir =>
123- fs.isFile(new Path (dir.getPath, EventLoggingListener .APPLICATION_COMPLETE ))
124- }
125151
126- val currentApps = Map [String , ApplicationHistoryInfo ](
127- appList.map(app => app.id -> app):_* )
128-
129- // For any application that either (i) is not listed or (ii) has changed since the last time
130- // the listing was created (defined by the log dir's modification time), load the app's info.
131- // Otherwise just reuse what's already in memory.
132- val newApps = new mutable.ArrayBuffer [ApplicationHistoryInfo ](logInfos.size)
133- for (dir <- logInfos) {
134- val curr = currentApps.getOrElse(dir.getPath().getName(), null )
135- if (curr == null || curr.lastUpdated < getModificationTime(dir)) {
152+ // Load all new logs from the log directory. Only directories that have a modification time
153+ // later than the last known log directory will be loaded.
154+ var newLastModifiedTime = lastModifiedTime
155+ val logInfos = logDirs
156+ .filter { dir =>
157+ if (fs.isFile(new Path (dir.getPath(), EventLoggingListener .APPLICATION_COMPLETE ))) {
158+ val modTime = getModificationTime(dir)
159+ newLastModifiedTime = math.max(newLastModifiedTime, modTime)
160+ modTime > lastModifiedTime
161+ } else {
162+ false
163+ }
164+ }
165+ .flatMap { dir =>
136166 try {
137- val (app, _) = loadAppInfo(dir, renderUI = false )
138- newApps += app
167+ val (replayBus, appListener) = createReplayBus(dir)
168+ replayBus.replay()
169+ Some (new FsApplicationHistoryInfo (
170+ dir.getPath().getName(),
171+ appListener.appId.getOrElse(dir.getPath().getName()),
172+ appListener.appName.getOrElse(NOT_STARTED ),
173+ appListener.startTime.getOrElse(- 1L ),
174+ appListener.endTime.getOrElse(- 1L ),
175+ getModificationTime(dir),
176+ appListener.sparkUser.getOrElse(NOT_STARTED )))
139177 } catch {
140- case e : Exception => logError(s " Failed to load app info from directory $dir. " )
178+ case e : Exception =>
179+ logInfo(s " Failed to load application log data from $dir. " , e)
180+ None
181+ }
182+ }
183+ .sortBy { info => - info.endTime }
184+
185+ lastModifiedTime = newLastModifiedTime
186+
187+ // When there are new logs, merge the new list with the existing one, maintaining
188+ // the expected ordering (descending end time). Maintaining the order is important
189+ // to avoid having to sort the list every time there is a request for the log list.
190+ if (! logInfos.isEmpty) {
191+ val newApps = new mutable.LinkedHashMap [String , FsApplicationHistoryInfo ]()
192+ def addIfAbsent (info : FsApplicationHistoryInfo ) = {
193+ if (! newApps.contains(info.id)) {
194+ newApps += (info.id -> info)
141195 }
142- } else {
143- newApps += curr
144196 }
145- }
146197
147- appList = newApps.sortBy { info => - info.endTime }
198+ val newIterator = logInfos.iterator.buffered
199+ val oldIterator = applications.values.iterator.buffered
200+ while (newIterator.hasNext && oldIterator.hasNext) {
201+ if (newIterator.head.endTime > oldIterator.head.endTime) {
202+ addIfAbsent(newIterator.next)
203+ } else {
204+ addIfAbsent(oldIterator.next)
205+ }
206+ }
207+ newIterator.foreach(addIfAbsent)
208+ oldIterator.foreach(addIfAbsent)
209+
210+ applications = newApps
211+ }
148212 } catch {
149213 case t : Throwable => logError(" Exception in checking for event log updates" , t)
150214 }
151215 }
152216
153- /**
154- * Parse the application's logs to find out the information we need to build the
155- * listing page.
156- *
157- * When creating the listing of available apps, there is no need to load the whole UI for the
158- * application. The UI is requested by the HistoryServer (by calling getAppInfo()) when the user
159- * clicks on a specific application.
160- *
161- * @param logDir Directory with application's log files.
162- * @param renderUI Whether to create the SparkUI for the application.
163- * @return A 2-tuple `(app info, ui)`. `ui` will be null if `renderUI` is false.
164- */
165- private def loadAppInfo (logDir : FileStatus , renderUI : Boolean ) = {
166- val path = logDir.getPath
167- val appId = path.getName
217+ private def createReplayBus (logDir : FileStatus ): (ReplayListenerBus , ApplicationEventListener ) = {
218+ val path = logDir.getPath()
168219 val elogInfo = EventLoggingListener .parseLoggingInfo(path, fs)
169220 val replayBus = new ReplayListenerBus (elogInfo.logPaths, fs, elogInfo.compressionCodec)
170221 val appListener = new ApplicationEventListener
171222 replayBus.addListener(appListener)
172-
173- val ui : SparkUI = if (renderUI) {
174- val conf = this .conf.clone()
175- val appSecManager = new SecurityManager (conf)
176- new SparkUI (conf, appSecManager, replayBus, appId,
177- HistoryServer .UI_PATH_PREFIX + s " / $appId" )
178- // Do not call ui.bind() to avoid creating a new server for each application
179- } else {
180- null
181- }
182-
183- replayBus.replay()
184- val appInfo = ApplicationHistoryInfo (
185- appId,
186- appListener.appName,
187- appListener.startTime,
188- appListener.endTime,
189- getModificationTime(logDir),
190- appListener.sparkUser)
191-
192- if (ui != null ) {
193- val uiAclsEnabled = conf.getBoolean(" spark.history.ui.acls.enable" , false )
194- ui.getSecurityManager.setAcls(uiAclsEnabled)
195- // make sure to set admin acls before view acls so properly picked up
196- ui.getSecurityManager.setAdminAcls(appListener.adminAcls)
197- ui.getSecurityManager.setViewAcls(appListener.sparkUser, appListener.viewAcls)
198- }
199- (appInfo, ui)
223+ (replayBus, appListener)
200224 }
201225
202226 /** Return when this directory was last modified. */
@@ -219,3 +243,13 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
219243 private def getMonotonicTimeMs () = System .nanoTime() / (1000 * 1000 )
220244
221245}
246+
247+ private class FsApplicationHistoryInfo (
248+ val logDir : String ,
249+ id : String ,
250+ name : String ,
251+ startTime : Long ,
252+ endTime : Long ,
253+ lastUpdated : Long ,
254+ sparkUser : String )
255+ extends ApplicationHistoryInfo (id, name, startTime, endTime, lastUpdated, sparkUser)
0 commit comments