@@ -28,16 +28,21 @@ import org.apache.spark.scheduler._
2828import org .apache .spark .status .api .v1
2929import org .apache .spark .storage ._
3030import org .apache .spark .ui .SparkUI
31+ import org .apache .spark .ui .scope ._
3132import org .apache .spark .util .kvstore .KVStore
3233
3334/**
3435 * A Spark listener that writes application information to a data store. The types written to the
3536 * store are defined in the `storeTypes.scala` file and are based on the public REST API.
37+ *
38+ * @param lastUpdateTime When replaying logs, the log's last update time, so that the duration of
39+ * unfinished tasks can be more accurately calculated (see SPARK-21922).
3640 */
3741private [spark] class AppStatusListener (
3842 kvstore : KVStore ,
3943 conf : SparkConf ,
40- live : Boolean ) extends SparkListener with Logging {
44+ live : Boolean ,
45+ lastUpdateTime : Option [Long ] = None ) extends SparkListener with Logging {
4146
4247 import config ._
4348
@@ -50,13 +55,16 @@ private[spark] class AppStatusListener(
5055 // operations that we can live without when rapidly processing incoming task events.
5156 private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD ) else - 1L
5257
58+ private val maxGraphRootNodes = conf.get(MAX_RETAINED_ROOT_NODES )
59+
5360 // Keep track of live entities, so that task metrics can be efficiently updated (without
5461 // causing too many writes to the underlying store, and other expensive operations).
5562 private val liveStages = new HashMap [(Int , Int ), LiveStage ]()
5663 private val liveJobs = new HashMap [Int , LiveJob ]()
5764 private val liveExecutors = new HashMap [String , LiveExecutor ]()
5865 private val liveTasks = new HashMap [Long , LiveTask ]()
5966 private val liveRDDs = new HashMap [Int , LiveRDD ]()
67+ private val pools = new HashMap [String , SchedulerPool ]()
6068
6169 override def onOtherEvent (event : SparkListenerEvent ): Unit = event match {
6270 case SparkListenerLogStart (version) => sparkVersion = version
@@ -210,16 +218,15 @@ private[spark] class AppStatusListener(
210218 missingStages.map(_.numTasks).sum
211219 }
212220
213- val lastStageInfo = event.stageInfos.lastOption
221+ val lastStageInfo = event.stageInfos.sortBy(_.stageId). lastOption
214222 val lastStageName = lastStageInfo.map(_.name).getOrElse(" (Unknown Stage Name)" )
215-
216223 val jobGroup = Option (event.properties)
217224 .flatMap { p => Option (p.getProperty(SparkContext .SPARK_JOB_GROUP_ID )) }
218225
219226 val job = new LiveJob (
220227 event.jobId,
221228 lastStageName,
222- Some (new Date (event.time)),
229+ if (event.time > 0 ) Some (new Date (event.time)) else None ,
223230 event.stageIds,
224231 jobGroup,
225232 numTasks)
@@ -234,17 +241,51 @@ private[spark] class AppStatusListener(
234241 stage.jobIds += event.jobId
235242 liveUpdate(stage, now)
236243 }
244+
245+ // Create the graph data for all the job's stages.
246+ event.stageInfos.foreach { stage =>
247+ val graph = RDDOperationGraph .makeOperationGraph(stage, maxGraphRootNodes)
248+ val uigraph = new RDDOperationGraphWrapper (
249+ stage.stageId,
250+ graph.edges,
251+ graph.outgoingEdges,
252+ graph.incomingEdges,
253+ newRDDOperationCluster(graph.rootCluster))
254+ kvstore.write(uigraph)
255+ }
256+ }
257+
258+ private def newRDDOperationCluster (cluster : RDDOperationCluster ): RDDOperationClusterWrapper = {
259+ new RDDOperationClusterWrapper (
260+ cluster.id,
261+ cluster.name,
262+ cluster.childNodes,
263+ cluster.childClusters.map(newRDDOperationCluster))
237264 }
238265
239266 override def onJobEnd (event : SparkListenerJobEnd ): Unit = {
240267 liveJobs.remove(event.jobId).foreach { job =>
268+ val now = System .nanoTime()
269+
270+ // Check if there are any pending stages that match this job; mark those as skipped.
271+ job.stageIds.foreach { sid =>
272+ val pending = liveStages.filter { case ((id, _), _) => id == sid }
273+ pending.foreach { case (key, stage) =>
274+ stage.status = v1.StageStatus .SKIPPED
275+ job.skippedStages += stage.info.stageId
276+ job.skippedTasks += stage.info.numTasks
277+ liveStages.remove(key)
278+ update(stage, now)
279+ }
280+ }
281+
241282 job.status = event.jobResult match {
242283 case JobSucceeded => JobExecutionStatus .SUCCEEDED
243284 case JobFailed (_) => JobExecutionStatus .FAILED
244285 }
245286
246- job.completionTime = Some (new Date (event.time))
247- update(job, System .nanoTime() )
287+ job.completionTime = if (event.time > 0 ) Some (new Date (event.time)) else None
288+ update(job, now )
248289 }
249290 }
250291
@@ -262,12 +303,24 @@ private[spark] class AppStatusListener(
262303 .toSeq
263304 stage.jobIds = stage.jobs.map(_.jobId).toSet
264305
306+ stage.schedulingPool = Option (event.properties).flatMap { p =>
307+ Option (p.getProperty(" spark.scheduler.pool" ))
308+ }.getOrElse(SparkUI .DEFAULT_POOL_NAME )
309+
310+ stage.description = Option (event.properties).flatMap { p =>
311+ Option (p.getProperty(SparkContext .SPARK_JOB_DESCRIPTION ))
312+ }
313+
265314 stage.jobs.foreach { job =>
266315 job.completedStages = job.completedStages - event.stageInfo.stageId
267316 job.activeStages += 1
268317 liveUpdate(job, now)
269318 }
270319
320+ val pool = pools.getOrElseUpdate(stage.schedulingPool, new SchedulerPool (stage.schedulingPool))
321+ pool.stageIds = pool.stageIds + event.stageInfo.stageId
322+ update(pool, now)
323+
271324 event.stageInfo.rddInfos.foreach { info =>
272325 if (info.storageLevel.isValid) {
273326 liveUpdate(liveRDDs.getOrElseUpdate(info.id, new LiveRDD (info)), now)
@@ -279,7 +332,7 @@ private[spark] class AppStatusListener(
279332
280333 override def onTaskStart (event : SparkListenerTaskStart ): Unit = {
281334 val now = System .nanoTime()
282- val task = new LiveTask (event.taskInfo, event.stageId, event.stageAttemptId)
335+ val task = new LiveTask (event.taskInfo, event.stageId, event.stageAttemptId, lastUpdateTime )
283336 liveTasks.put(event.taskInfo.taskId, task)
284337 liveUpdate(task, now)
285338
@@ -318,6 +371,8 @@ private[spark] class AppStatusListener(
318371 val now = System .nanoTime()
319372
320373 val metricsDelta = liveTasks.remove(event.taskInfo.taskId).map { task =>
374+ task.info = event.taskInfo
375+
321376 val errorMessage = event.reason match {
322377 case Success =>
323378 None
@@ -337,11 +392,15 @@ private[spark] class AppStatusListener(
337392 delta
338393 }.orNull
339394
340- val (completedDelta, failedDelta) = event.reason match {
395+ val (completedDelta, failedDelta, killedDelta ) = event.reason match {
341396 case Success =>
342- (1 , 0 )
397+ (1 , 0 , 0 )
398+ case _ : TaskKilled =>
399+ (0 , 0 , 1 )
400+ case _ : TaskCommitDenied =>
401+ (0 , 0 , 1 )
343402 case _ =>
344- (0 , 1 )
403+ (0 , 1 , 0 )
345404 }
346405
347406 liveStages.get((event.stageId, event.stageAttemptId)).foreach { stage =>
@@ -350,20 +409,37 @@ private[spark] class AppStatusListener(
350409 }
351410 stage.activeTasks -= 1
352411 stage.completedTasks += completedDelta
412+ if (completedDelta > 0 ) {
413+ stage.completedIndices.add(event.taskInfo.index)
414+ }
353415 stage.failedTasks += failedDelta
416+ stage.killedTasks += killedDelta
417+ if (killedDelta > 0 ) {
418+ stage.killedSummary = killedTasksSummary(event.reason, stage.killedSummary)
419+ }
354420 maybeUpdate(stage, now)
355421
422+ // Store both stage ID and task index in a single long variable for tracking at job level.
423+ val taskIndex = (event.stageId.toLong << Integer .SIZE ) | event.taskInfo.index
356424 stage.jobs.foreach { job =>
357425 job.activeTasks -= 1
358426 job.completedTasks += completedDelta
427+ if (completedDelta > 0 ) {
428+ job.completedIndices.add(taskIndex)
429+ }
359430 job.failedTasks += failedDelta
431+ job.killedTasks += killedDelta
432+ if (killedDelta > 0 ) {
433+ job.killedSummary = killedTasksSummary(event.reason, job.killedSummary)
434+ }
360435 maybeUpdate(job, now)
361436 }
362437
363438 val esummary = stage.executorSummary(event.taskInfo.executorId)
364439 esummary.taskTime += event.taskInfo.duration
365440 esummary.succeededTasks += completedDelta
366441 esummary.failedTasks += failedDelta
442+ esummary.killedTasks += killedDelta
367443 if (metricsDelta != null ) {
368444 esummary.metrics.update(metricsDelta)
369445 }
@@ -422,6 +498,11 @@ private[spark] class AppStatusListener(
422498 liveUpdate(job, now)
423499 }
424500
501+ pools.get(stage.schedulingPool).foreach { pool =>
502+ pool.stageIds = pool.stageIds - event.stageInfo.stageId
503+ update(pool, now)
504+ }
505+
425506 stage.executorSummaries.values.foreach(update(_, now))
426507 update(stage, now)
427508 }
@@ -482,11 +563,15 @@ private[spark] class AppStatusListener(
482563 /** Flush all live entities' data to the underlying store. */
483564 def flush (): Unit = {
484565 val now = System .nanoTime()
485- liveStages.values.foreach(update(_, now))
566+ liveStages.values.foreach { stage =>
567+ update(stage, now)
568+ stage.executorSummaries.values.foreach(update(_, now))
569+ }
486570 liveJobs.values.foreach(update(_, now))
487571 liveExecutors.values.foreach(update(_, now))
488572 liveTasks.values.foreach(update(_, now))
489573 liveRDDs.values.foreach(update(_, now))
574+ pools.values.foreach(update(_, now))
490575 }
491576
492577 private def updateRDDBlock (event : SparkListenerBlockUpdated , block : RDDBlockId ): Unit = {
@@ -628,6 +713,20 @@ private[spark] class AppStatusListener(
628713 stage
629714 }
630715
716+ private def killedTasksSummary (
717+ reason : TaskEndReason ,
718+ oldSummary : Map [String , Int ]): Map [String , Int ] = {
719+ reason match {
720+ case k : TaskKilled =>
721+ oldSummary.updated(k.reason, oldSummary.getOrElse(k.reason, 0 ) + 1 )
722+ case denied : TaskCommitDenied =>
723+ val reason = denied.toErrorString
724+ oldSummary.updated(reason, oldSummary.getOrElse(reason, 0 ) + 1 )
725+ case _ =>
726+ oldSummary
727+ }
728+ }
729+
631730 private def update (entity : LiveEntity , now : Long ): Unit = {
632731 entity.write(kvstore, now)
633732 }
0 commit comments